fahadabdul opened a new issue, #63507:
URL: https://github.com/apache/airflow/issues/63507

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.1.2
   
   ### What happened?
   
   Asset-triggered DAGs are intermittently triggered twice for the same asset 
event when running with 2 schedulers (HA mode). This appears to be the same 
issue as #54491, which was reported against Airflow 3.0.4, but the problem is 
still present in Airflow 3.1.2.
   
   A single asset event (whether from a task outlet or from an `AssetWatcher` 
with `MessageQueueTrigger`) results in two DAG runs being created for the same 
consumer DAG. The Airflow UI confirms this — the Asset Events tab for the 
affected runs shows **"2 Triggered Dag Runs"** pointing to the same consumer 
DAG from a single asset event.
   
   This was reproduced with two patterns:
   
   1. **Simple outlet-based**: A producer DAG emits an asset event via 
`outlets`, and a consumer DAG is scheduled on that asset. With 2 schedulers, 
duplicate consumer runs are intermittently created.
   
   2. **AssetWatcher + MessageQueueTrigger (SQS)**: A consumer DAG uses 
`AssetWatcher` with `MessageQueueTrigger` to listen for SQS messages. When a 
message arrives, the triggerer emits an asset event. With 2 schedulers, 
duplicate consumer runs are intermittently created.
   
   In both cases, the duplicate runs have **identical timestamps to the 
microsecond** in their run IDs but **different run ID hashes**, confirming that 
both schedulers created a run from the same `asset_dag_run_queue` record.
   
   Case 1:
   <img width="1738" height="522" alt="Image" 
src="https://github.com/user-attachments/assets/1406d662-033d-48fe-9577-5cf9c7feb30c";
 />
   
   Case 2:
   
   <img width="1734" height="231" alt="Image" 
src="https://github.com/user-attachments/assets/09ca6cd3-f339-4b5c-96e6-1e6cf2a3676b";
 />
   
   ### What you think should happen instead?
   
   A single asset event should trigger exactly one DAG run for the consumer 
DAG, regardless of how many schedulers are running.
   
   As identified in #54491, the root cause appears to be a missing row-level 
lock when fetching records from `asset_dag_run_queue` in 
`airflow-core/src/airflow/models/dag.py`. The current flow is:
   
   1. Fetch all records from `asset_dag_run_queue` (**no row-level lock**)
   2. Determine which DAGs need runs (with row-level lock)
   3. Create DAG runs
   4. Delete records from `asset_dag_run_queue`
   
   The gap between step 1 and step 4 allows a second scheduler to read the same 
record before the first scheduler deletes it, causing both to create a DAG run 
for the same event.
   
   Applying a row-level lock at step 1 should prevent this race condition.
   
   ### How to reproduce
   
   ### Prerequisites
   - Airflow 3.1.2 with **2 schedulers** (HA mode)
   - PostgreSQL metadata database
   
   ### Reproduction DAGs
   
   **Producer DAG** — emits an asset event every ~30 seconds using 
`@continuous` schedule:
   
   ```python
   from airflow.sdk import Asset, dag, task
   from datetime import datetime
   import pendulum
   import time
   
   test_asset = Asset("test://reproduction-asset-duplicate-runs")
   
   @dag(
       dag_id="producer_asset_trigger",
       schedule="@continuous",
       start_date=pendulum.now().subtract(minutes=10),
       catchup=False,
       max_active_runs=1,
   )
   def producer_asset_trigger():
       @task(outlets=[test_asset])
       def emit_asset_event():
           ts = datetime.now().isoformat()
           print(f"Emitting asset event at {ts}")
           time.sleep(30)
           return {"emitted_at": ts}
   
       emit_asset_event()
   
   producer_asset_trigger()
   ```
   
   **Consumer DAG** — scheduled on the asset, logs run details:
   
   ```python
   from airflow.sdk import Asset, dag, task
   import pendulum
   
   test_asset = Asset("test://reproduction-asset-duplicate-runs")
   
   @dag(
       dag_id="consumer_asset_listener",
       schedule=[test_asset],
       start_date=pendulum.now().subtract(days=1),
       catchup=False,
   )
   def consumer_asset_listener():
       @task
       def process_event(**context):
           run_id = context["dag_run"].run_id
           triggering_events = context.get("triggering_asset_events", {})
           print(f"Run ID: {run_id}")
           for asset, events in triggering_events.items():
               print(f"Asset: {asset}, Events: {len(events)}")
   
       process_event()
   
   consumer_asset_listener()
   ```
   
   ### Steps
   
   1. Deploy both DAGs to an Airflow instance with **2 schedulers** (HA)
   2. Unpause both DAGs
   3. Let them run for 15-30 minutes (~30+ asset events)
   4. Check the consumer DAG's run history
   
   ### Expected result
   Each producer run creates exactly one consumer run (1:1 mapping).
   
   ### Actual result
   Some asset events create **two consumer runs** with identical timestamps but 
different run ID hashes.
   
   Examples of duplicate run IDs from our reproduction:
   ```
   asset_triggered__2026-03-12T11:35:09.550275+00:00_vBRUPZHu
   asset_triggered__2026-03-12T11:35:09.550275+00:00_IOcGWY58
   ```
   ```
   asset_triggered__2026-03-12T11:44:17.521734+00:00_R3JlEANp
   asset_triggered__2026-03-12T11:44:17.521734+00:00_CSaWJHzX
   ```
   
   The Airflow UI Asset Events tab confirms this — showing "2 Triggered Dag 
Runs" for a single asset event.
   
   <img width="1184" height="789" alt="Image" 
src="https://github.com/user-attachments/assets/3098ac8c-3485-4dac-97c2-792ffaf1f108";
 />
   
   This was also reproduced using `AssetWatcher` + `MessageQueueTrigger` (SQS) 
pattern, confirming the issue is in the scheduler's asset event processing, not 
in how the event is created.
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   Providers
   Package Name                                Version  
   apache-airflow-providers-amazon      9.22.0  
   apache-airflow-providers-celery            3.16.0    
   apache-airflow-providers-common-compat       1.13.1  
   apache-airflow-providers-common-io   1.7.1   
   apache-airflow-providers-common-messaging    0.2     
   apache-airflow-providers-common-sql  1.32.0  
   apache-airflow-providers-elasticsearch       6.4.4    
   apache-airflow-providers-http        6.0.0   
   apache-airflow-providers-openlineage 2.10.2  
   apache-airflow-providers-smtp        2.4.2   
   apache-airflow-providers-standard    1.11.1  
   astronomer-providers-logging 1.6.4   
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   2 schedulers (HA mode)
   PostgreSQL metadata database
   
   ### Anything else?
   
   This appears to be the same root cause as #54491 (reported against 3.0.4), 
which identified the missing row-level lock on `asset_dag_run_queue` reads in 
`airflow-core/src/airflow/models/dag.py` (lines 2146-2148). That issue was 
closed but no fix appears to have been merged — the problem persists in 3.1.2.
   
   The issue is intermittent by nature (race condition), but in our testing it 
reproduced consistently within 15-30 minutes of running with 2 schedulers. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to