stankiewicz opened a new issue, #38982:
URL: https://github.com/apache/beam/issues/38982

   ### What would you like to happen?
   
   ## Goal
   Optimize the performance and atomicity of the BigQuery `FILE_LOADS` copy 
pipeline branch in the Python SDK by aligning it with the Java SDK's 
architectural design.
   
   Specifically:
   1. **Atomicity**: For `WRITE_TRUNCATE` and `WRITE_EMPTY` pipelines, group 
all temporary tables for a single destination and copy them using a **single 
multi-source copy job**.
   2. **Performance**: For `WRITE_APPEND` pipelines, maintain running all 
partition copy jobs asynchronously (non-blocking) as proposed in [PR 
38981](https://github.com/apache/beam/pull/38981).
   3. **Compatibility**: Implement these changes without modifying pipeline 
graph, isolate changes to `TriggerCopyJobs` DoFn.
   
   ---
   
   ## Background & Current State
   
   When writing to multiple/dynamic destinations or handling large volumes 
requiring partition split, Apache Beam loads data into unique temporary tables 
in BigQuery, then triggers copy jobs to merge them into the final destination.
   
   ### 1. Python Copy Configuration Limitation
   Currently, the Python SDK's copy tool in 
[bigquery_tools.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L497-L522)
 only supports a single source table reference (`from_table_reference` mapped 
to the singular `sourceTable` field):
   ```python
   configuration=bigquery.JobConfiguration(
       copy=bigquery.JobConfigurationTableCopy(
           destinationTable=to_table_reference,
           sourceTable=from_table_reference, # Singular
           createDisposition=create_disposition,
           writeDisposition=write_disposition,
       )
   )
   ```
   
   In contrast, the Java SDK's counterpart in 
[WriteRename.java](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java#L310-L330)
 uses the plural `setSourceTables` to pass a list of temporary tables directly 
to a single copy job configuration:
   ```java
   JobConfigurationTableCopy copyConfig =
       new JobConfigurationTableCopy()
           .setSourceTables(tempTables) // Plural list of source tables
           .setDestinationTable(ref);
   ```
   
   ### 2. Blocking Execution Model
   Currently, `TriggerCopyJobs` processes copy requests sequentially in 
[process_one](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L535-L578).
 If `wait_for_job` is resolved to `True`, the worker thread blocks 
synchronously on the BigQuery copy job completion:
   ```python
   if wait_for_job:
     self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
   ```
   
   ### 3. Pipeline Grouping Modes
   `TriggerCopyJobs` runs in two grouping modes depending on 
`self.write_disposition`:
   * **For `WRITE_TRUNCATE` / `WRITE_EMPTY`**: The pipeline groups all 
partition load job outputs for a destination into a single list before calling 
`TriggerCopyJobs`. Therefore, the `element_list` parameter contains all 
temporary tables for that destination.
   * **For `WRITE_APPEND`**: The pipeline performs no grouping. `element_list` 
contains exactly one temporary table reference at a time.
   
   ---
   
   ## Proposed Requirements
   
   ### 1. Enable Multi-Source Copy in `BigQueryWrapper`
   Modify `BigQueryWrapper._insert_copy_job` (or introduce an internal helper 
method) in 
[bigquery_tools.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_tools.py)
 to handle either a single table reference or a list of table references. 
   * If a list of source tables is provided, map it to the `sourceTables` field 
of `JobConfigurationTableCopy` instead of `sourceTable`.
   
   ### 2. Copy All Temp Tables at Once for Truncate / Empty
   Update 
[TriggerCopyJobs.process](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L528-L535):
   * When `self.write_disposition` is `WRITE_TRUNCATE` or `WRITE_EMPTY`, 
`element_list` is a list of all temporary tables for a destination table.
   * The transform should combine all temporary tables into a single source 
list and launch a **single copy job** to the destination.
   * This ensures that the truncation and copying of all partitions occur in a 
single atomic BigQuery transaction, matching the atomicity guarantees of the 
Java SDK.
   
   ### 3. Asynchronous Execution for Append Mode
   * When `self.write_disposition` is `WRITE_APPEND`, the copy jobs must be 
invoked asynchronously (non-blocking) during `process` (with `wait_for_job = 
False`), allowing parallel execution, see [PR 
38981](https://github.com/apache/beam/pull/38981).
   
   ### 4. Backward Compatibility
   To avoid breaking streaming update pipelines do not modify graph and limit 
changes to TriggerCopyJobs'` process implementation. 
   
   
   ### Issue Priority
   
   Priority: 2 (default / most feature requests should be filed as P2)
   
   ### Issue Components
   
   - [x] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Prism Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to