shahar1 commented on PR #61148:
URL: https://github.com/apache/airflow/pull/61148#issuecomment-3875598803

   > # Implement BigQueryStreamingBufferEmptySensor to handle DML operations on 
streaming tables
   > **Fixes #59408**
   > 
   > ## Problem
   > When using BigQuery DML operators (UPDATE, DELETE, MERGE) on tables with 
active streaming buffers, tasks fail with: This is a documented BigQuery 
limitation. Currently, Airflow has no built-in mechanism to wait for the buffer 
to flush before executing DML operations, causing repeated failures until it 
eventually clears (within 90 minutes per Google Cloud documentation).
   > 
   > ## Solution
   > This PR implements `BigQueryStreamingBufferEmptySensor` - a composable 
sensor that allows users to explicitly wait for a BigQuery table's streaming 
buffer to empty before proceeding with DML operations.
   > 
   > This aligns with Airflow's design philosophy by providing:
   > 
   > * **Explicit control** over pipeline dependencies
   > * **Composable operations** that can be combined with other tasks
   > * **Non-blocking execution** via deferrable mode
   > 
   > ## Changes
   > ### 1. New Sensor: `BigQueryStreamingBufferEmptySensor`
   > * Polls BigQuery table metadata to check if streaming buffer is empty
   > * Supports both synchronous polling and deferrable (async) execution
   > * 90-minute timeout aligned with Google Cloud's streaming buffer flush 
guarantee
   > * Full support for GCP service account impersonation
   > * Consistent with existing `BigQueryTableExistenceSensor` implementation
   > 
   > ### 2. New Trigger: `BigQueryStreamingBufferEmptyTrigger`
   > * Async trigger for deferrable sensor execution
   > * Continuous polling of table metadata via BigQueryTableAsyncHook
   > * Event-driven callback integration
   > * Comprehensive error handling
   > 
   > ### 3. Documentation & Examples
   > * Added complete documentation section in `bigquery.rst`
   > * 3 usage examples (sync, deferrable, async modes)
   > * Real-world workflow examples with streaming INSERT/UPDATE/DELETE 
operations
   > * 90-minute timeout configuration with explanations
   > 
   > ## Usage Example
   > ```python
   > from airflow import DAG
   > from airflow.providers.google.cloud.sensors.bigquery import 
BigQueryStreamingBufferEmptySensor
   > from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   > 
   > with DAG('bigquery_dml_pipeline'):
   >     # Wait for streaming buffer to be empty
   >     wait_buffer = BigQueryStreamingBufferEmptySensor(
   >         task_id='wait_buffer_empty',
   >         project_id='my-project',
   >         dataset_id='my_dataset',
   >         table_id='my_table',
   >         deferrable=True,  # Non-blocking execution
   >         timeout=5400,  # 90 minutes
   >     )
   >     
   >     # Then safely execute DML operation
   >     update_table = BigQueryInsertJobOperator(
   >         task_id='update_table',
   >         configuration={
   >             'query': {
   >                 'query': 'UPDATE my_dataset.my_table SET 
status="processed"',
   >                 'useLegacySql': False,
   >             }
   >         },
   >     )
   >     
   >     wait_buffer >> update_table
   > ```
   
   Thanks for your contribution! 
   Could you please attach a screenshot of a the system tests after running?
   
   CC: @VladaZakharova @MaksYermak 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to