shahar1 commented on PR #61148:
URL: https://github.com/apache/airflow/pull/61148#issuecomment-3875598803
> # Implement BigQueryStreamingBufferEmptySensor to handle DML operations on
streaming tables
> **Fixes #59408**
>
> ## Problem
> When using BigQuery DML operators (UPDATE, DELETE, MERGE) on tables with
active streaming buffers, tasks fail with: This is a documented BigQuery
limitation. Currently, Airflow has no built-in mechanism to wait for the buffer
to flush before executing DML operations, causing repeated failures until it
eventually clears (within 90 minutes per Google Cloud documentation).
>
> ## Solution
> This PR implements `BigQueryStreamingBufferEmptySensor` - a composable
sensor that allows users to explicitly wait for a BigQuery table's streaming
buffer to empty before proceeding with DML operations.
>
> This aligns with Airflow's design philosophy by providing:
>
> * **Explicit control** over pipeline dependencies
> * **Composable operations** that can be combined with other tasks
> * **Non-blocking execution** via deferrable mode
>
> ## Changes
> ### 1. New Sensor: `BigQueryStreamingBufferEmptySensor`
> * Polls BigQuery table metadata to check if streaming buffer is empty
> * Supports both synchronous polling and deferrable (async) execution
> * 90-minute timeout aligned with Google Cloud's streaming buffer flush
guarantee
> * Full support for GCP service account impersonation
> * Consistent with existing `BigQueryTableExistenceSensor` implementation
>
> ### 2. New Trigger: `BigQueryStreamingBufferEmptyTrigger`
> * Async trigger for deferrable sensor execution
> * Continuous polling of table metadata via BigQueryTableAsyncHook
> * Event-driven callback integration
> * Comprehensive error handling
>
> ### 3. Documentation & Examples
> * Added complete documentation section in `bigquery.rst`
> * 3 usage examples (sync, deferrable, async modes)
> * Real-world workflow examples with streaming INSERT/UPDATE/DELETE
operations
> * 90-minute timeout configuration with explanations
>
> ## Usage Example
> ```python
> from airflow import DAG
> from airflow.providers.google.cloud.sensors.bigquery import
BigQueryStreamingBufferEmptySensor
> from airflow.providers.google.cloud.operators.bigquery import
BigQueryInsertJobOperator
>
> with DAG('bigquery_dml_pipeline'):
> # Wait for streaming buffer to be empty
> wait_buffer = BigQueryStreamingBufferEmptySensor(
> task_id='wait_buffer_empty',
> project_id='my-project',
> dataset_id='my_dataset',
> table_id='my_table',
> deferrable=True, # Non-blocking execution
> timeout=5400, # 90 minutes
> )
>
> # Then safely execute DML operation
> update_table = BigQueryInsertJobOperator(
> task_id='update_table',
> configuration={
> 'query': {
> 'query': 'UPDATE my_dataset.my_table SET
status="processed"',
> 'useLegacySql': False,
> }
> },
> )
>
> wait_buffer >> update_table
> ```
Thanks for your contribution!
Could you please attach a screenshot of a the system tests after running?
CC: @VladaZakharova @MaksYermak
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]