Arunodoy18 opened a new pull request, #59736:
URL: https://github.com/apache/airflow/pull/59736

   This sensor checks if a BigQuery table's streaming buffer is empty before 
allowing DML operations (UPDATE, DELETE, MERGE) to proceed.
   
   Solves the issue where DML statements fail with 'would affect rows in the 
streaming buffer' error when tables receive continuous streaming inserts.
   
   Added comprehensive tests and example DAG demonstrating usage.
   ## Description
   
   This PR adds `BigQueryTableStreamingBufferEmptySensor` to solve a critical 
issue affecting users who run DML operations on BigQuery tables populated via 
streaming inserts.
   
   
   ### User Impact
   
   When Apache Airflow DAGs execute DML statements (UPDATE, DELETE, MERGE) on 
BigQuery tables that receive continuous streaming inserts (via Dataflow, CDC 
pipelines, BigQuery Storage Write API, or `tabledata.insertAll`), tasks fail 
with:
   
   **This causes:**
   - ❌ Pipeline failures requiring manual intervention
   - ❌ Wasted compute resources on repeated retries
   - ❌ Delayed data processing (hours of retry attempts)
   - ❌ SLA violations in production environments
   - ❌ Fragile pipelines requiring workarounds (sleep commands, external 
scripts)
   
   ### Root Cause
   
   **BigQuery's documented limitation:** DML operations cannot modify rows 
still residing in the streaming buffer. The buffer typically flushes within 
minutes but can take up to 90 minutes under certain conditions.
   
   **Airflow's current behavior:** Operators like `BigQueryInsertJobOperator` 
immediately execute DML statements without checking buffer state, causing:
   1. Job fails with streaming buffer error
   2. Airflow retries the task (same error)
   3. Repeated failures until buffer happens to be empty
   4. No visibility into why failures occur or when it's safe to retry
   
   ### Why This Matters
   
   This issue is **increasingly common** because:
   - Modern data pipelines heavily use streaming ingestion (real-time CDC, 
event streams, IoT data)
   - BigQuery recommends streaming API for high-throughput ingestion
   - Users need to run incremental DML operations (deduplication, status 
updates, soft deletes) on the same tables
   - Google Cloud Composer users report this as a frequent pain point
   
   ## Solution
   
   ### What This PR Adds
   
   A new sensor `BigQueryTableStreamingBufferEmptySensor` that:
   
   1. **Checks streaming buffer state** via BigQuery Tables API
   2. **Waits until buffer is empty** before allowing downstream DML operations
   3. **Provides visibility** through logging (estimated rows in buffer)
   4. **Follows Airflow patterns** (poke_interval, timeout, mode='reschedule')
   5. **Non-blocking** when using `mode='reschedule'` (frees worker slots)
   
   ### Implementation Details
   
   ```python
   class BigQueryTableStreamingBufferEmptySensor(BaseSensorOperator):
       """
       Checks if a BigQuery table's streaming buffer is empty.
       
       The sensor queries table metadata and checks the streamingBuffer field:
       - If streamingBuffer is None → buffer empty → sensor succeeds
       - If streamingBuffer exists → checks estimated_rows → waits
       """
   
   closes **59408
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to