stankiewicz opened a new issue, #38982: URL: https://github.com/apache/beam/issues/38982
### What would you like to happen? ## Goal Optimize the performance and atomicity of the BigQuery `FILE_LOADS` copy pipeline branch in the Python SDK by aligning it with the Java SDK's architectural design. Specifically: 1. **Atomicity**: For `WRITE_TRUNCATE` and `WRITE_EMPTY` pipelines, group all temporary tables for a single destination and copy them using a **single multi-source copy job**. 2. **Performance**: For `WRITE_APPEND` pipelines, maintain running all partition copy jobs asynchronously (non-blocking) as proposed in [PR 38981](https://github.com/apache/beam/pull/38981). 3. **Compatibility**: Implement these changes without modifying pipeline graph, isolate changes to `TriggerCopyJobs` DoFn. --- ## Background & Current State When writing to multiple/dynamic destinations or handling large volumes requiring partition split, Apache Beam loads data into unique temporary tables in BigQuery, then triggers copy jobs to merge them into the final destination. ### 1. Python Copy Configuration Limitation Currently, the Python SDK's copy tool in [bigquery_tools.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L497-L522) only supports a single source table reference (`from_table_reference` mapped to the singular `sourceTable` field): ```python configuration=bigquery.JobConfiguration( copy=bigquery.JobConfigurationTableCopy( destinationTable=to_table_reference, sourceTable=from_table_reference, # Singular createDisposition=create_disposition, writeDisposition=write_disposition, ) ) ``` In contrast, the Java SDK's counterpart in [WriteRename.java](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java#L310-L330) uses the plural `setSourceTables` to pass a list of temporary tables directly to a single copy job configuration: ```java JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() .setSourceTables(tempTables) // Plural list of source tables .setDestinationTable(ref); ``` ### 2. Blocking Execution Model Currently, `TriggerCopyJobs` processes copy requests sequentially in [process_one](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L535-L578). If `wait_for_job` is resolved to `True`, the worker thread blocks synchronously on the BigQuery copy job completion: ```python if wait_for_job: self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) ``` ### 3. Pipeline Grouping Modes `TriggerCopyJobs` runs in two grouping modes depending on `self.write_disposition`: * **For `WRITE_TRUNCATE` / `WRITE_EMPTY`**: The pipeline groups all partition load job outputs for a destination into a single list before calling `TriggerCopyJobs`. Therefore, the `element_list` parameter contains all temporary tables for that destination. * **For `WRITE_APPEND`**: The pipeline performs no grouping. `element_list` contains exactly one temporary table reference at a time. --- ## Proposed Requirements ### 1. Enable Multi-Source Copy in `BigQueryWrapper` Modify `BigQueryWrapper._insert_copy_job` (or introduce an internal helper method) in [bigquery_tools.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_tools.py) to handle either a single table reference or a list of table references. * If a list of source tables is provided, map it to the `sourceTables` field of `JobConfigurationTableCopy` instead of `sourceTable`. ### 2. Copy All Temp Tables at Once for Truncate / Empty Update [TriggerCopyJobs.process](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L528-L535): * When `self.write_disposition` is `WRITE_TRUNCATE` or `WRITE_EMPTY`, `element_list` is a list of all temporary tables for a destination table. * The transform should combine all temporary tables into a single source list and launch a **single copy job** to the destination. * This ensures that the truncation and copying of all partitions occur in a single atomic BigQuery transaction, matching the atomicity guarantees of the Java SDK. ### 3. Asynchronous Execution for Append Mode * When `self.write_disposition` is `WRITE_APPEND`, the copy jobs must be invoked asynchronously (non-blocking) during `process` (with `wait_for_job = False`), allowing parallel execution, see [PR 38981](https://github.com/apache/beam/pull/38981). ### 4. Backward Compatibility To avoid breaking streaming update pipelines do not modify graph and limit changes to TriggerCopyJobs'` process implementation. ### Issue Priority Priority: 2 (default / most feature requests should be filed as P2) ### Issue Components - [x] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Prism Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
