Copilot commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3301687788
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2202,42 +2193,52 @@ def _create_dag_runs_asset_triggered(
),
),
AssetEvent.timestamp <= triggered_date,
- AssetEvent.timestamp >
func.coalesce(cte.c.previous_dag_run_run_after, date.min),
+ AssetEvent.id.not_in(
+ select(association_table.c.event_id)
+ .join(DagRun, DagRun.id ==
association_table.c.dag_run_id)
+ .where(DagRun.dag_id == dag.dag_id)
Review Comment:
Using `NOT IN (SELECT ...)` here can be significantly slower than a
correlated `NOT EXISTS` on larger tables (and `NOT IN` also has tricky
null-semantics, even if this column is expected to be non-null). Consider
switching to a `NOT EXISTS` anti-join pattern (or an ORM relationship-based
`~exists(...)` clause) that can be planned more efficiently and is more robust.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5316,6 +5318,171 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun since the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+ session.add_all(
+ [
+ AssetEvent(
+ asset_id=asset_1_id,
+ timestamp=timezone.utcnow(),
+ ),
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ AssetEvent(asset_id=asset_2_id, timestamp=timezone.utcnow()),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simulated by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[MockExecutor(do_update=False)])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
Review Comment:
This test uses randomness (`random.randint(0, 1)`) and a relatively large
event count, which can introduce non-deterministic timing and intermittent
timeouts/flakiness in CI. To make it more stable, consider removing randomness
(e.g., deterministic sleeps), reducing `ASSET_EVENT_COUNT`, and/or structuring
the concurrency so completion ordering doesn’t depend on random delays.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5316,6 +5318,171 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun since the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+ session.add_all(
+ [
+ AssetEvent(
+ asset_id=asset_1_id,
+ timestamp=timezone.utcnow(),
+ ),
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ AssetEvent(asset_id=asset_2_id, timestamp=timezone.utcnow()),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simulated by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[MockExecutor(do_update=False)])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ futures = []
+ consumed_asset_events = []
+
+ def create_asset_events(sleep):
+ import time
+
+ from sqlalchemy import inspect
+
+ with create_session() as session:
+ now = timezone.utcnow()
+ asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+ session.add(asset_event)
+ session.commit()
+ time.sleep(sleep) # sleep to simulate slow performance
+ asset_manager = AssetManager()
+ dialect_name = inspect(session.get_bind()).dialect.name
+ if dialect_name in ("postgresql", "sqlite"):
+
asset_manager._queue_dagruns_nonpartitioned_conflict_update(
+ asset_id=asset_id,
+ dags_to_queue=[dag_model],
+ event=asset_event,
+ session=session,
+ dialect_name=dialect_name,
+ )
+ elif dialect_name == "mysql":
+ asset_manager._queue_dagruns_nonpartitioned_mysql(
+ asset_id=asset_id, dags_to_queue=[dag_model],
event=asset_event, session=session
+ )
+
+ return asset_event
+
+ with (
+ ThreadPoolExecutor() as exec,
+ caplog.at_level(
+ "WARNING",
+ logger="airflow.jobs.scheduler_job_runner",
+ ),
+ ):
+ for _ in range(ASSET_EVENT_COUNT):
+ future = exec.submit(create_asset_events, random.randint(0, 1))
+ futures.append(future)
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[MockExecutor(do_update=False)])
+ seen_dr_ids: set[int] = set()
+ for future in as_completed(futures, timeout=60):
+ asset = future.result()
+
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+ session.commit()
+ all_drs = session.scalars(select(DagRun).where(DagRun.dag_id
== dag_model.dag_id)).all()
+ for dr in all_drs:
+ if dr.id not in seen_dr_ids:
+ seen_dr_ids.add(dr.id)
+ consumed_asset_events += dr.consumed_asset_events
+ total_consumed_asset_events = len(consumed_asset_events)
+ assert total_consumed_asset_events == ASSET_EVENT_COUNT
+ assert len(set(consumed_asset_events)) == total_consumed_asset_events,
(
Review Comment:
This uniqueness assertion relies on Python object identity/hashability of
ORM instances, which can give false results if the same DB row is represented
by different instances (e.g., across session boundaries or if the identity map
is cleared). It’s more reliable to assert uniqueness by primary key (e.g.,
`event.id`) rather than the ORM object itself.
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -136,6 +137,40 @@ def _add_one(asset: SerializedAsset) -> AssetModel:
return [_add_one(a) for a in assets]
+ @classmethod
+ def create_asset_event(cls, *, event_kwargs: dict, session: Session) ->
AssetEvent:
+ """
+ Persist an :class:`AssetEvent` row and return it, bound to *session*.
+
+ For non-SQLite backends a short-lived independent session is used so
+ that the row is committed (and therefore visible to the scheduler's
+ session) before the caller continues. SQLite does not support
+ concurrent connections, so the event is added directly to the caller's
+ *session* and flushed instead.
+ """
+ if get_dialect_name(session) == "sqlite":
+ # SQLite cannot have two concurrent connections to the same file,
so
+ # opening a second session would deadlock. Add directly and flush
so
+ # the object gets an id without committing the outer transaction.
+ asset_event = AssetEvent(**event_kwargs)
+ session.add(asset_event)
+ session.flush()
+ return asset_event
+
+ # Create a short-lived session to populate asset event in db.
+ # This is to ensure the asset event is committed and visible to other
sessions.
+ # e.g. Scheduler's session when it looks for new asset events to
trigger dags via ADRQ.
+ # Use ``scoped=False`` to get a truly independent session with its own
connection/transaction.
+ with create_session(scoped=False) as ae_session:
+ _asset_event = AssetEvent(**event_kwargs)
+ ae_session.add(_asset_event)
+ ae_session.flush()
+ asset_event_id = _asset_event.id
+
+ # Re-load the now-committed AssetEvent into the caller's session so
that
+ # subsequent relationship operations work correctly.
+ return session.get_one(AssetEvent, asset_event_id)
Review Comment:
Creating/committing `AssetEvent` in an independent session while ADRQ rows
and other related state changes are still in the caller’s transaction breaks
atomicity: if the caller later rolls back, the committed `AssetEvent` can
become a false-positive signal that may be picked up by a future ADRQ-triggered
run. Since the scheduler is gated by ADRQ anyway, committing only the event
early provides little benefit but introduces inconsistency risk. Prefer keeping
`AssetEvent` insertion in the caller’s session/transaction (flush only), or
move *all* trigger inputs (event + ADRQ mutations) into the same
independently-committed transaction so they remain consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]