Mercury2699 opened a new issue, #68045:
URL: https://github.com/apache/airflow/issues/68045

   # Duplicate DAG runs for PartitionedAssetTimetable with multiple schedulers 
(HA) — race not covered by #60773
   
   ## Apache Airflow version
   
   3.2.1
   
   ## What happened?
   
   Asset-triggered DAGs using `PartitionedAssetTimetable` or 
`CronPartitionTimetable` are intermittently triggered twice for the same asset 
event when running with 2 schedulers (HA mode).
   
   PR #60773 ("Add row lock to ADRQ before Dag run creation") fixed this race 
for **non-partitioned** asset scheduling by adding `with_row_locks` on 
`AssetDagRunQueue` reads. However, the **partitioned asset** code path in 
`scheduler_job_runner.py` appears to have a separate or parallel 
DagRun-creation flow that is not protected by the same row lock. The result is 
that two schedulers can both read the same unprocessed partitioned asset event 
and each independently create a DagRun.
   
   This is confirmed reproducible on MWAA 3.2.1 (image 
`2026_05_12_23_43_Airflow_Image_3_2_1`) which includes #60773 (shipped in 
3.2.0).
   
   ## What you think should happen instead?
   
   A single partitioned asset event should trigger exactly one DAG run, 
regardless of how many schedulers are running.
   
   The partitioned-asset scheduling path should receive the same row-lock 
treatment that #60773 applied to the non-partitioned path.
   
   ## How to reproduce
   
   ### Prerequisites
   - Airflow 3.2.1 with **2 schedulers** (HA mode)
   - PostgreSQL metadata database
   
   ### DAGs
   
   **Producer** — emits a partitioned asset event every 5 minutes via 
`CronPartitionTimetable`:
   
   ```python
   import time
   from datetime import datetime, timedelta
   import pendulum
   from airflow.sdk import DAG, Asset, CronPartitionTimetable
   from airflow.providers.standard.operators.python import PythonOperator
   
   ASSET_SHARED = Asset("test_v7b_shared_events")
   
   def simulate_processing(**context):
       dag_run = context["dag_run"]
       partition_key = getattr(dag_run, "partition_key", None) or 
getattr(dag_run, "logical_date", None)
       print(f"Processing partition: {partition_key}")
       time.sleep(10)
   
   with DAG(
       dag_id="test_partitioned_asset_producer",
       start_date=pendulum.now("UTC").subtract(hours=2),
       schedule=CronPartitionTimetable("*/5 * * * *", timezone="UTC"),
       catchup=False,
       max_active_runs=1,
       tags=["repro", "partitioned_asset_race"],
   ) as dag:
       PythonOperator(
           task_id="process_data",
           python_callable=simulate_processing,
           outlets=[ASSET_SHARED],
       )
   ```
   
   **Consumer Alpha** — uses `PartitionedAssetTimetable`:
   
   ```python
   import time
   from datetime import datetime
   from airflow.sdk import DAG, Asset, PartitionedAssetTimetable
   from airflow.providers.standard.operators.python import PythonOperator
   
   ASSET_SHARED = Asset("test_v7b_shared_events")
   
   def process_event(**context):
       print(f"Consumer ALPHA - Run ID: {context['dag_run'].run_id}")
       time.sleep(5)
   
   with DAG(
       dag_id="test_partitioned_asset_consumer_alpha",
       start_date=datetime(2024, 1, 1),
       schedule=PartitionedAssetTimetable(assets=ASSET_SHARED),
       catchup=False,
       tags=["repro", "partitioned_asset_race", "consumer"],
   ) as dag:
       PythonOperator(task_id="process_event", python_callable=process_event)
   ```
   
   **Consumer Beta** — identical schedule, different DAG ID:
   
   ```python
   import time
   from datetime import datetime
   from airflow.sdk import DAG, Asset, PartitionedAssetTimetable
   from airflow.providers.standard.operators.python import PythonOperator
   
   ASSET_SHARED = Asset("test_v7b_shared_events")
   
   def process_event(**context):
       print(f"Consumer BETA - Run ID: {context['dag_run'].run_id}")
       time.sleep(5)
   
   with DAG(
       dag_id="test_partitioned_asset_consumer_beta",
       start_date=datetime(2024, 1, 1),
       schedule=PartitionedAssetTimetable(assets=ASSET_SHARED),
       catchup=False,
       tags=["repro", "partitioned_asset_race", "consumer"],
   ) as dag:
       PythonOperator(task_id="process_event", python_callable=process_event)
   ```
   
   ### Steps
   1. Deploy all 3 DAGs with **2 schedulers** (HA).
   2. Unpause all DAGs.
   3. Let run for 1+ hours.
   4. Check consumer DAG run history for duplicates.
   
   ### Key conditions
   - Must have **2+ schedulers** (HA mode)
   - Must have **2+ consumer DAGs** on the same asset (single consumer does NOT 
trigger the bug)
   - Must use **`PartitionedAssetTimetable`** — plain `Asset` schedule 
(non-partitioned) does NOT reproduce (covered by #60773)
   - The issue is intermittent (~40ms race window between schedulers)
   
   ### Evidence
   
   Duplicate run IDs from production reproduction (timestamps identical, hashes 
differ):
   ```
   asset_triggered__2026-06-02T10:30:14.149756+00:00_EMV7bEJl
   asset_triggered__2026-06-02T10:30:14.265600+00:00_ePLMw1Xd
   ```
   
   Scheduler logs show both `job_id=3` and `job_id=6` independently creating 
consumer runs within 40ms of each other from the same producer completion event.
   
   ## Root cause analysis
   
   PR #60773 added `with_row_locks` to the ADRQ fetch in 
`_create_dag_runs_asset_triggered`. However, when `PartitionedAssetTimetable` 
is used, the DagRun creation appears to flow through a different code path 
(likely involving `AssetPartitionDagRun` / partition-specific logic) that does 
not acquire the same exclusive lock before creating the run.
   
   The fix should extend the row-lock strategy from #60773 to cover the 
partitioned-asset scheduling path in `scheduler_job_runner.py`.
   
   ## Related issues
   - #63507 — Duplicate DAG runs for asset-triggered scheduling (closed by 
#60773, non-partitioned)
   - #54491 — Original report (Airflow 3.0.4, non-partitioned)
   - #60773 — The fix PR (row lock on ADRQ, merged Mar 17 2026)
   - #59183, #61831, #61433, #62441 — Related partition concurrency fixes in 
3.2.0/3.2.1
   
   ## Are you willing to submit PR?
   - Yes I am willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to