kaxil commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r2963242749


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -493,10 +493,10 @@ def _get_or_create_apdr(
 
     @classmethod
     def _queue_dagruns_nonpartitioned_slow_path(
-        cls, asset_id: int, dags_to_queue: set[DagModel], session: Session
+        cls, asset_id: int, dags_to_queue: set[DagModel], event: AssetEvent, 
session: Session
     ) -> None:
         def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
-            item = AssetDagRunQueue(target_dag_id=dag.dag_id, 
asset_id=asset_id)
+            item = AssetDagRunQueue(target_dag_id=dag.dag_id, 
asset_id=asset_id, created_at=event.timestamp)

Review Comment:
   The slow path uses `session.merge()` which unconditionally overwrites the 
existing ADRQ row, including `created_at`. If two events for the same asset 
arrive out of order (as in the diagram — J1 slower than J2), the late `merge()` 
resets `created_at` *backward*.
   
   The Postgres path (line 524) guards against this with `WHERE created_at < 
excluded.created_at`, but the slow path has no equivalent guard. This means the 
race condition the PR fixes on Postgres is still present on MySQL/SQLite.
   
   To fix, the nested-transaction block should check the existing row's 
`created_at` before overwriting:
   ```python
   with session.begin_nested():
       existing = session.get(AssetDagRunQueue, {"target_dag_id": dag.dag_id, 
"asset_id": asset_id})
       if existing and existing.created_at >= event.timestamp:
           return dag.dag_id  # already queued with a newer timestamp
       session.merge(item)
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2074,29 +2074,31 @@ def _create_dag_runs_asset_triggered(
                     .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
                 )
             )
-
-            dag_run = dag.create_dagrun(
-                run_id=DagRun.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=triggered_date
-                ),
-                logical_date=None,
-                data_interval=None,
-                run_after=triggered_date,
-                run_type=DagRunType.ASSET_TRIGGERED,
-                triggered_by=DagRunTriggeredByType.ASSET,
-                state=DagRunState.QUEUED,
-                creating_job_id=self.job.id,
-                session=session,
-            )
-            Stats.incr("asset.triggered_dagruns")
-            dag_run.consumed_asset_events.extend(asset_events)
+            if asset_events:
+                dag_run = dag.create_dagrun(
+                    run_id=DagRun.generate_run_id(
+                        run_type=DagRunType.ASSET_TRIGGERED, 
logical_date=None, run_after=triggered_date
+                    ),
+                    logical_date=None,
+                    data_interval=None,
+                    run_after=triggered_date,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                )
+                Stats.incr("asset.triggered_dagruns")
+                dag_run.consumed_asset_events.extend(asset_events)
 
             # Delete only consumed ADRQ rows to avoid dropping newly queued 
events
-            # (e.g. DagRun triggered by asset A while a new event for asset B 
arrives).
+            # (e.g. 1. DagRun triggered by asset A while a new event for asset 
B arrives.
+            # 2. DagRun triggered by asset A while new event for asset A 
upsert to ADRQ)
             adrq_pks = [(record.asset_id, record.target_dag_id) for record in 
queued_adrqs]

Review Comment:
   When `asset_events` is empty (the `if` on line 2077 is false), this code 
still deletes ADRQ rows matching `created_at <= triggered_date`. The intent 
(per the PR description) is correct — stale ADRQ rows from already-consumed 
events should be cleaned up without creating a DagRun.
   
   However, this is a silent data-loss path: operators have no way to tell that 
events were "missed" by one scheduler loop and cleaned up in the next. A 
`log.warning("ADRQ rows deleted without creating DagRun — events already 
consumed", dag_id=..., triggered_date=...)` would make this observable. Without 
it, debugging "my asset-triggered DAG didn't run" becomes very difficult.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4943,6 +4946,54 @@ def _lock_only_selected_asset(query, **_):
         ).one_or_none()
         assert adrq_2 is not None
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_when_concurrent_asset_events_created(self, 
session: Session, dag_maker):
+        asset = Asset(name="test_asset")
+        with dag_maker(dag_id="consumer", schedule=asset, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="simulate-asset-outlet", bash_command="echo 
1")
+        dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+        asset_event_ts = timezone.utcnow()
+        session.add_all(
+            [
+                # Asset Event 1
+                AssetEvent(asset_id=asset_id, timestamp=asset_event_ts),
+                # Asset Event 2
+                AssetEvent(asset_id=asset_id, timestamp=asset_event_ts),
+                # Bound with Asset Event 2
+                AssetDagRunQueue(
+                    asset_id=asset_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+            ]
+        )
+        session.flush()
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])

Review Comment:
   `create_session()` opens a separate session that auto-commits on `__exit__`. 
This doesn't match production, where the scheduler controls the session 
lifecycle. In particular:
   
   - The auto-commit masks bugs where the code relies on uncommitted state 
being visible within the same session.
   - `session` (the fixture parameter) and the `create_session()` session are 
independent — objects from one may be stale in the other.
   
   The same pattern appears in the second new test (line ~5147). Use the 
`session` fixture directly instead.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5075,6 +5126,39 @@ def test_no_create_dag_runs_when_dag_disabled(self, 
session, dag_maker, disable,
         assert len(session.scalars(adrq_q).all()) == 1
         assert session.scalars(adrq_q).one().target_dag_id == "consumer"
 
+    @pytest.mark.need_serialized_dag
+    def test_no_create_dag_runs_when_no_asset_event(self, session: Session, 
dag_maker):
+        asset = Asset(name="test_asset")
+        with dag_maker(dag_id="consumer", schedule=asset, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+        # Simulate an ADRQ row being updated while the scheduler is creating 
asset-triggered DagRuns.
+        # Each asset event can update or insert ADRQ rows.
+        # Assume the matching asset events were already consumed by an earlier 
ADRQ row.
+        adrq = AssetDagRunQueue(
+            asset_id=asset_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+        )
+        session.add(adrq)
+        session.flush()
+        adrq.created_at = timezone.utcnow() + timedelta(seconds=1)
+        session.merge(adrq)
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        with create_session() as session:
+            self.job_runner._create_dag_runs_asset_triggered(
+                dag_models=[dag_model],
+                session=session,
+            )
+        dr = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none()
+        assert dr is None
+        _adrq = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.asset_id == asset_id, 
AssetDagRunQueue.target_dag_id == dag_model.dag_id

Review Comment:
   This test asserts the ADRQ is deleted (`_adrq is None`) even though no 
DagRun was created (`dr is None`). This is the "silent cleanup" path from the 
scheduler changes.
   
   The test proves the code *works* but doesn't assert that the operator is 
*informed*. If you add a `log.warning()` in the scheduler (as suggested in the 
other comment), this test should also assert the warning was emitted — that's 
the contract operators will depend on.



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -230,7 +230,7 @@ def register_asset_change(
 
         asset_event = AssetEvent(**event_kwargs)
         session.add(asset_event)
-        session.flush()  # Ensure the event is written earlier than ADRQ 
entries below.
+        session.commit()  # Ensure the event is written earlier than ADRQ 
entries below.

Review Comment:
   The race condition in the diagram is real — I traced through the scenario 
and confirmed that a slow-committing task can orphan its `AssetEvent`. So 
fixing this is worthwhile.
   
   That said, `session.commit()` mid-function is risky:
   
   1. **Transaction atomicity**: `register_asset_change` receives `session` 
from its caller. If the caller wraps this in a broader transaction (e.g., 
changing TI state + registering asset changes), this commits everything 
prematurely. Any failure *after* this line leaves the event committed but the 
rest (ADRQ, alias associations) rolled back.
   
   2. **Detached instances**: After `commit()`, SQLAlchemy expires all loaded 
objects (unless `expire_on_commit=False`). The code below continues to access 
`asset_model.scheduled_dags`, `asset_alias_models`, etc. — these may trigger 
lazy loads on an expired/detached object, risking `DetachedInstanceError`.
   
   3. **Partial failure**: The ADRQ insertion (below) happens after the commit. 
If it fails, the event is already committed without a corresponding ADRQ row — 
the scheduler will never pick it up.
   
   Consider: a separate short-lived session just for the event commit, so the 
caller's transaction stays intact. Or explore whether aligning ADRQ 
`created_at` with `event.timestamp` (the other part of this PR) is sufficient 
on its own without the early commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to