Arunodoy18 opened a new pull request, #59736:
URL: https://github.com/apache/airflow/pull/59736
This sensor checks if a BigQuery table's streaming buffer is empty before
allowing DML operations (UPDATE, DELETE, MERGE) to proceed.
Solves the issue where DML statements fail with 'would affect rows in the
streaming buffer' error when tables receive continuous streaming inserts.
Added comprehensive tests and example DAG demonstrating usage.
## Description
This PR adds `BigQueryTableStreamingBufferEmptySensor` to solve a critical
issue affecting users who run DML operations on BigQuery tables populated via
streaming inserts.
### User Impact
When Apache Airflow DAGs execute DML statements (UPDATE, DELETE, MERGE) on
BigQuery tables that receive continuous streaming inserts (via Dataflow, CDC
pipelines, BigQuery Storage Write API, or `tabledata.insertAll`), tasks fail
with:
**This causes:**
- ❌ Pipeline failures requiring manual intervention
- ❌ Wasted compute resources on repeated retries
- ❌ Delayed data processing (hours of retry attempts)
- ❌ SLA violations in production environments
- ❌ Fragile pipelines requiring workarounds (sleep commands, external
scripts)
### Root Cause
**BigQuery's documented limitation:** DML operations cannot modify rows
still residing in the streaming buffer. The buffer typically flushes within
minutes but can take up to 90 minutes under certain conditions.
**Airflow's current behavior:** Operators like `BigQueryInsertJobOperator`
immediately execute DML statements without checking buffer state, causing:
1. Job fails with streaming buffer error
2. Airflow retries the task (same error)
3. Repeated failures until buffer happens to be empty
4. No visibility into why failures occur or when it's safe to retry
### Why This Matters
This issue is **increasingly common** because:
- Modern data pipelines heavily use streaming ingestion (real-time CDC,
event streams, IoT data)
- BigQuery recommends streaming API for high-throughput ingestion
- Users need to run incremental DML operations (deduplication, status
updates, soft deletes) on the same tables
- Google Cloud Composer users report this as a frequent pain point
## Solution
### What This PR Adds
A new sensor `BigQueryTableStreamingBufferEmptySensor` that:
1. **Checks streaming buffer state** via BigQuery Tables API
2. **Waits until buffer is empty** before allowing downstream DML operations
3. **Provides visibility** through logging (estimated rows in buffer)
4. **Follows Airflow patterns** (poke_interval, timeout, mode='reschedule')
5. **Non-blocking** when using `mode='reschedule'` (frees worker slots)
### Implementation Details
```python
class BigQueryTableStreamingBufferEmptySensor(BaseSensorOperator):
"""
Checks if a BigQuery table's streaming buffer is empty.
The sensor queries table metadata and checks the streamingBuffer field:
- If streamingBuffer is None → buffer empty → sensor succeeds
- If streamingBuffer exists → checks estimated_rows → waits
"""
closes **59408
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]