radhwene opened a new pull request, #61148:
URL: https://github.com/apache/airflow/pull/61148
# Implement BigQueryStreamingBufferEmptySensor to handle DML operations on
streaming tables
**Fixes #59408**
## Problem
When using BigQuery DML operators (UPDATE, DELETE, MERGE) on tables with
active streaming buffers, tasks fail with:
This is a documented BigQuery limitation. Currently, Airflow has no built-in
mechanism to wait for the buffer to flush before executing DML operations,
causing repeated failures until it eventually clears (within 90 minutes per
Google Cloud documentation).
## Solution
This PR implements `BigQueryStreamingBufferEmptySensor` - a composable
sensor that allows users to explicitly wait for a BigQuery table's streaming
buffer to empty before proceeding with DML operations.
This aligns with Airflow's design philosophy by providing:
- **Explicit control** over pipeline dependencies
- **Composable operations** that can be combined with other tasks
- **Non-blocking execution** via deferrable mode
## Changes
### 1. New Sensor: `BigQueryStreamingBufferEmptySensor`
- Polls BigQuery table metadata to check if streaming buffer is empty
- Supports both synchronous polling and deferrable (async) execution
- 90-minute timeout aligned with Google Cloud's streaming buffer flush
guarantee
- Full support for GCP service account impersonation
- Consistent with existing `BigQueryTableExistenceSensor` implementation
### 2. New Trigger: `BigQueryStreamingBufferEmptyTrigger`
- Async trigger for deferrable sensor execution
- Continuous polling of table metadata via BigQueryTableAsyncHook
- Event-driven callback integration
- Comprehensive error handling
### 3. Documentation & Examples
- Added complete documentation section in `bigquery.rst`
- 3 usage examples (sync, deferrable, async modes)
- Real-world workflow examples with streaming INSERT/UPDATE/DELETE
operations
- 90-minute timeout configuration with explanations
## Usage Example
```python
from airflow import DAG
from airflow.providers.google.cloud.sensors.bigquery import
BigQueryStreamingBufferEmptySensor
from airflow.providers.google.cloud.operators.bigquery import
BigQueryInsertJobOperator
with DAG('bigquery_dml_pipeline'):
# Wait for streaming buffer to be empty
wait_buffer = BigQueryStreamingBufferEmptySensor(
task_id='wait_buffer_empty',
project_id='my-project',
dataset_id='my_dataset',
table_id='my_table',
deferrable=True, # Non-blocking execution
timeout=5400, # 90 minutes
)
# Then safely execute DML operation
update_table = BigQueryInsertJobOperator(
task_id='update_table',
configuration={
'query': {
'query': 'UPDATE my_dataset.my_table SET status="processed"',
'useLegacySql': False,
}
},
)
wait_buffer >> update_table
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]