radhwene opened a new pull request, #61148:
URL: https://github.com/apache/airflow/pull/61148

   # Implement BigQueryStreamingBufferEmptySensor to handle DML operations on 
streaming tables
   
   **Fixes #59408**
   
   ## Problem
   When using BigQuery DML operators (UPDATE, DELETE, MERGE) on tables with 
active streaming buffers, tasks fail with: 
   This is a documented BigQuery limitation. Currently, Airflow has no built-in 
mechanism to wait for the buffer to flush before executing DML operations, 
causing repeated failures until it eventually clears (within 90 minutes per 
Google Cloud documentation).
   
   ## Solution
   This PR implements `BigQueryStreamingBufferEmptySensor` - a composable 
sensor that allows users to explicitly wait for a BigQuery table's streaming 
buffer to empty before proceeding with DML operations.
   
   This aligns with Airflow's design philosophy by providing:
   - **Explicit control** over pipeline dependencies
   - **Composable operations** that can be combined with other tasks
   - **Non-blocking execution** via deferrable mode
   
   ## Changes
   
   ### 1. New Sensor: `BigQueryStreamingBufferEmptySensor`
   -  Polls BigQuery table metadata to check if streaming buffer is empty
   -  Supports both synchronous polling and deferrable (async) execution
   -  90-minute timeout aligned with Google Cloud's streaming buffer flush 
guarantee
   -  Full support for GCP service account impersonation
   -  Consistent with existing `BigQueryTableExistenceSensor` implementation
   
   ### 2. New Trigger: `BigQueryStreamingBufferEmptyTrigger`
   -  Async trigger for deferrable sensor execution
   -  Continuous polling of table metadata via BigQueryTableAsyncHook
   -  Event-driven callback integration
   -  Comprehensive error handling
   
   ### 3. Documentation & Examples
   -   Added complete documentation section in `bigquery.rst`
   -  3 usage examples (sync, deferrable, async modes)
   -  Real-world workflow examples with streaming INSERT/UPDATE/DELETE 
operations
   -  90-minute timeout configuration with explanations
   
   ## Usage Example
   
   ```python
   from airflow import DAG
   from airflow.providers.google.cloud.sensors.bigquery import 
BigQueryStreamingBufferEmptySensor
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   
   with DAG('bigquery_dml_pipeline'):
       # Wait for streaming buffer to be empty
       wait_buffer = BigQueryStreamingBufferEmptySensor(
           task_id='wait_buffer_empty',
           project_id='my-project',
           dataset_id='my_dataset',
           table_id='my_table',
           deferrable=True,  # Non-blocking execution
           timeout=5400,  # 90 minutes
       )
       
       # Then safely execute DML operation
       update_table = BigQueryInsertJobOperator(
           task_id='update_table',
           configuration={
               'query': {
                   'query': 'UPDATE my_dataset.my_table SET status="processed"',
                   'useLegacySql': False,
               }
           },
       )
       
       wait_buffer >> update_table


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to