Copilot commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3066503475
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4870,176 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+ session.add_all(
+ [
+ AssetEvent(
+ asset_id=asset_1_id,
+ timestamp=timezone.utcnow(),
+ ),
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ AssetEvent(asset_id=asset_2_id, timestamp=timezone.utcnow()),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
Review Comment:
Typo in comment: "simluarted" should be "simulated".
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4870,176 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
Review Comment:
Grammar in comment is off: "We do not create a new DagRun seems ..." reads
incorrectly; consider replacing "seems" with "since" (or rephrasing) to clarify
the intent.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4870,176 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+ session.add_all(
+ [
+ AssetEvent(
+ asset_id=asset_1_id,
+ timestamp=timezone.utcnow(),
+ ),
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ AssetEvent(asset_id=asset_2_id, timestamp=timezone.utcnow()),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ futures = []
+ consumed_asset_events = []
+
+ def create_asset_events(sleep):
+ import time
+
+ from sqlalchemy import inspect
+
+ with create_session() as session:
+ now = timezone.utcnow()
+ asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+ session.add(asset_event)
+ session.commit()
+ time.sleep(sleep) # sleep to simulate slow performance
+ asset_manager = AssetManager()
+ dialect_name = inspect(session.get_bind()).dialect.name
+ if dialect_name in ("postgresql", "sqlite"):
+
asset_manager._queue_dagruns_nonpartitioned_conflict_update(
+ asset_id=asset_id,
+ dags_to_queue=[dag_model],
+ event=asset_event,
+ session=session,
+ dialect_name=dialect_name,
+ )
+ elif dialect_name == "mysql":
+ asset_manager._queue_dagruns_nonpartitioned_mysql(
+ asset_id=asset_id, dags_to_queue=[dag_model],
event=asset_event, session=session
+ )
+
+ return asset_event
+
+ with (
+ ThreadPoolExecutor() as exec,
+ caplog.at_level(
+ "WARNING",
+ logger="airflow.jobs.scheduler_job_runner",
+ ),
+ ):
+ for _ in range(ASSET_EVENT_COUNT):
+ future = exec.submit(create_asset_events, random.randint(0, 1))
+ futures.append(future)
Review Comment:
This test uses randomness (`random.randint(0, 1)`) to vary timing, which can
make failures non-reproducible and introduce flakes. Consider making the
interleaving deterministic (e.g., a fixed pattern of delays or explicit
synchronization) so the test outcome doesn’t depend on random scheduling.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4870,176 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+ session.add_all(
+ [
+ AssetEvent(
+ asset_id=asset_1_id,
+ timestamp=timezone.utcnow(),
+ ),
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ AssetEvent(asset_id=asset_2_id, timestamp=timezone.utcnow()),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ futures = []
+ consumed_asset_events = []
+
+ def create_asset_events(sleep):
+ import time
+
+ from sqlalchemy import inspect
+
+ with create_session() as session:
Review Comment:
There are several imports inside the test function / nested helper (`import
random`, `from concurrent.futures ...`, `import time`, `from sqlalchemy import
inspect`). Airflow generally keeps imports at module scope; please move these
to the top of the file unless there is a specific reason they must be local
(e.g., expensive import or circular dependency).
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -224,34 +225,40 @@ def register_asset_change(
source_run_id=task_instance.run_id,
source_map_index=task_instance.map_index,
)
+ # Create a short live session to populate asset event in db.
Review Comment:
Comment wording: "Create a short live session" should be "short-lived
session".
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -553,14 +571,46 @@ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
cls.logger().debug("consuming dag ids %s", queued_dag_ids)
@classmethod
- def _queue_dagruns_nonpartitioned_postgres(
- cls, asset_id: int, dags_to_queue: set[DagModel], session: Session
+ def _queue_dagruns_nonpartitioned_mysql(
+ cls, asset_id: int, dags_to_queue: set[DagModel], event: AssetEvent,
session: Session
+ ) -> None:
+ from sqlalchemy import case
+ from sqlalchemy.dialects.mysql import insert
+
+ values = [{"target_dag_id": dag.dag_id} for dag in dags_to_queue]
+ stmt = insert(AssetDagRunQueue).values(asset_id=asset_id,
created_at=event.timestamp)
+
+ update_stmt = stmt.on_duplicate_key_update(
+ created_at=case(
+ (stmt.inserted.created_at >= AssetDagRunQueue.created_at,
stmt.inserted.created_at),
+ else_=AssetDagRunQueue.created_at,
+ )
+ )
+ session.execute(update_stmt, values)
+
+ @classmethod
+ def _queue_dagruns_nonpartitioned_conflict_update(
+ cls,
+ asset_id: int,
+ dags_to_queue: set[DagModel],
+ event: AssetEvent,
+ session: Session,
+ dialect_name: str,
) -> None:
- from sqlalchemy.dialects.postgresql import insert
+ """Handle ON CONFLICT DO UPDATE upsert for dialects that support it
(postgresql, sqlite)."""
+ if dialect_name == "postgresql":
+ from sqlalchemy.dialects.postgresql import insert
+ else:
+ from sqlalchemy.dialects.sqlite import insert
Review Comment:
New dialect-specific imports are inside method bodies (`from sqlalchemy
import case`, `from sqlalchemy.dialects... import insert`). Unless there’s a
circular-import or lazy-loading justification, please move these imports to
module scope for consistency and to make dependency/typing analysis easier.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2118,45 +2118,52 @@ def _create_dag_runs_asset_triggered(
.order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
)
)
-
- dag_run = dag.create_dagrun(
- run_id=DagRun.generate_run_id(
- run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=triggered_date
- ),
- logical_date=None,
- data_interval=None,
- run_after=triggered_date,
- run_type=DagRunType.ASSET_TRIGGERED,
- triggered_by=DagRunTriggeredByType.ASSET,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
- )
- Stats.incr("asset.triggered_dagruns")
- dag_run.consumed_asset_events.extend(asset_events)
- self.log.info(
- "Created asset-triggered DagRun for '%s': run_id=%s, consumed
%d asset events",
- dag.dag_id,
- dag_run.run_id,
- len(asset_events),
- )
-
- # Delete only consumed ADRQ rows to avoid dropping newly queued
events
- # (e.g. DagRun triggered by asset A while a new event for asset B
arrives).
- adrq_pks = [(record.asset_id, record.target_dag_id) for record in
queued_adrqs]
- result = cast(
- "CursorResult",
- session.execute(
- delete(AssetDagRunQueue).where(
- tuple_(AssetDagRunQueue.asset_id,
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
- )
- ),
- )
- self.log.info(
- "Deleted %d ADRQ rows for '%s'",
- result.rowcount,
- dag.dag_id,
- )
+ if asset_events:
+ dag_run = dag.create_dagrun(
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.ASSET_TRIGGERED,
logical_date=None, run_after=triggered_date
+ ),
+ logical_date=None,
+ data_interval=None,
+ run_after=triggered_date,
+ run_type=DagRunType.ASSET_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.ASSET,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+ Stats.incr("asset.triggered_dagruns")
+ dag_run.consumed_asset_events.extend(asset_events)
+ self.log.info(
+ "Created asset-triggered DagRun for '%s': run_id=%s,
consumed %d asset events",
+ dag.dag_id,
+ dag_run.run_id,
+ len(asset_events),
+ )
+ # Delete only consumed ADRQ rows to avoid dropping newly
queued events
+ # (e.g. 1. DagRun triggered by asset A while a new event for
asset B arrives.
+ # 2. DagRun triggered by asset A while new event for asset A
upsert to ADRQ)
+ adrq_pks = [(record.asset_id, record.target_dag_id) for record
in queued_adrqs]
+ result = cast(
+ "CursorResult",
+ session.execute(
+ delete(AssetDagRunQueue).where(
+ tuple_(AssetDagRunQueue.asset_id,
AssetDagRunQueue.target_dag_id).in_(adrq_pks),
+ AssetDagRunQueue.created_at <= triggered_date,
+ )
+ ),
+ )
+ self.log.info(
+ "Deleted %d ADRQ rows for '%s'",
+ result.rowcount,
+ dag.dag_id,
+ )
+ else:
+ self.log.warning(
+ "No DagRun created for '%s' at '%s' - asset events already
consumed",
+ dag.dag_id,
+ triggered_date,
+ )
Review Comment:
When no eligible `AssetEvent`s are found, this logs a WARNING but leaves the
ADRQ rows in place. Since `_create_dag_runs_asset_triggered` runs repeatedly,
this can produce repeated WARNINGs for the same ADRQ rows and create log noise
during normal races. Consider lowering this to DEBUG/INFO or
rate-limiting/aggregating the message if this condition is expected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]