Copilot commented on code in PR #62931:
URL: https://github.com/apache/airflow/pull/62931#discussion_r3066477710
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+
+ session.add_all(
+ [
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ assert dr.creating_job_id == scheduler_job.id
+
+ @pytest.mark.need_serialized_dag
+ @conf_vars({("scheduler", "use_job_schedule"): "False"})
+ def
test_asset_triggered_dag_runs_created_when_use_job_schedule_false(self,
session, dag_maker):
+ """
+ Test that asset-triggered dag runs are still created when
use_job_schedule=False.
+
+ use_job_schedule=False should only disable time-based (cron/interval)
scheduling,
+ not asset-triggered scheduling.
+ """
+ asset1 = Asset(uri="test://asset-use-job-schedule",
name="asset_use_job_schedule", group="test")
+
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="task", bash_command="echo 1",
outlets=[asset1])
+ dr = dag_maker.create_dagrun(
+ run_id="run1",
+ logical_date=(DEFAULT_DATE + timedelta(days=100)),
+ data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE +
timedelta(days=11)),
+ )
+
+ asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset1.uri))
+
+ event1 = AssetEvent(
+ asset_id=asset1_id,
+ source_task_id="task",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ session.add(event1)
+
+ with dag_maker(dag_id="asset-consumer", schedule=[asset1]):
+ pass
+ consumer_dag = dag_maker.dag
+
+ session.add(AssetDagRunQueue(asset_id=asset1_id,
target_dag_id=consumer_dag.dag_id))
Review Comment:
`_create_dag_runs_asset_triggered` orders by `AssetDagRunQueue.created_at`
and uses the newest row to compute `triggered_date`. This test inserts an ADRQ
row without setting `created_at`, which can be NULL depending on model defaults
and can make ordering/triggered_date nondeterministic (or error when coercing).
Set `created_at=timezone.utcnow()` (or a fixed timestamp) for deterministic
behavior.
```suggestion
session.add(
AssetDagRunQueue(
asset_id=asset1_id,
target_dag_id=consumer_dag.dag_id,
created_at=timezone.utcnow(),
)
)
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+
+ session.add_all(
+ [
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
Review Comment:
Correct typo in comment: 'simluarted' → 'simulated'.
```suggestion
# This situation is simulated by _lock_only_selected_asset
below
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+
+ session.add_all(
+ [
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ assert dr.creating_job_id == scheduler_job.id
+
+ @pytest.mark.need_serialized_dag
+ @conf_vars({("scheduler", "use_job_schedule"): "False"})
+ def
test_asset_triggered_dag_runs_created_when_use_job_schedule_false(self,
session, dag_maker):
+ """
+ Test that asset-triggered dag runs are still created when
use_job_schedule=False.
+
+ use_job_schedule=False should only disable time-based (cron/interval)
scheduling,
+ not asset-triggered scheduling.
+ """
+ asset1 = Asset(uri="test://asset-use-job-schedule",
name="asset_use_job_schedule", group="test")
+
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="task", bash_command="echo 1",
outlets=[asset1])
+ dr = dag_maker.create_dagrun(
+ run_id="run1",
+ logical_date=(DEFAULT_DATE + timedelta(days=100)),
+ data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE +
timedelta(days=11)),
+ )
+
+ asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset1.uri))
+
+ event1 = AssetEvent(
+ asset_id=asset1_id,
+ source_task_id="task",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ session.add(event1)
+
+ with dag_maker(dag_id="asset-consumer", schedule=[asset1]):
+ pass
+ consumer_dag = dag_maker.dag
+
+ session.add(AssetDagRunQueue(asset_id=asset1_id,
target_dag_id=consumer_dag.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ with create_session() as session:
+ self.job_runner._create_dagruns_for_dags(session, session)
+
+ # Asset-triggered dag run should be created even when
use_job_schedule=False
+ created_run = session.scalars(
+ select(DagRun).where(DagRun.dag_id == consumer_dag.dag_id)
+ ).one_or_none()
Review Comment:
This test rebinds the `session` fixture name to a new `create_session()`
context, then queries using that (now context-managed/closed) session, and also
passes a `Session` where `_create_dagruns_for_dags` expects a
`CommitProhibitorGuard`. Additionally, the ADRQ/event rows were only
`flush()`ed on the fixture session; a separate DB session may not see them
without a commit. Use the provided fixture session for both setup and scheduler
call, invoke `_create_dagruns_for_dags` under `with prohibit_commit(session) as
guard: ...`, and avoid shadowing the `session` variable (e.g.,
`sched_session`). If using two sessions intentionally, commit the setup
transaction and `expire_all()`/refresh before asserting.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
Review Comment:
The `Asset` instances are created as `Asset(\"...\")` (positional arg is the
asset URI), but the lookup filters by `AssetModel.uri == asset_1.name` /
`asset_2.name`. If `name` is unset or differs from `uri`, these queries can
return `None` and subsequent ADRQ inserts will break. Filter by `asset_1.uri` /
`asset_2.uri` (or set `name=` explicitly and query the appropriate column
consistently).
```suggestion
asset_1_id =
session.scalar(select(AssetModel.id).where(AssetModel.uri == asset_1.uri))
asset_2_id =
session.scalar(select(AssetModel.id).where(AssetModel.uri == asset_2.uri))
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+
+ session.add_all(
+ [
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ assert dr.creating_job_id == scheduler_job.id
+
+ @pytest.mark.need_serialized_dag
+ @conf_vars({("scheduler", "use_job_schedule"): "False"})
+ def
test_asset_triggered_dag_runs_created_when_use_job_schedule_false(self,
session, dag_maker):
+ """
+ Test that asset-triggered dag runs are still created when
use_job_schedule=False.
+
+ use_job_schedule=False should only disable time-based (cron/interval)
scheduling,
+ not asset-triggered scheduling.
+ """
+ asset1 = Asset(uri="test://asset-use-job-schedule",
name="asset_use_job_schedule", group="test")
+
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="task", bash_command="echo 1",
outlets=[asset1])
+ dr = dag_maker.create_dagrun(
+ run_id="run1",
+ logical_date=(DEFAULT_DATE + timedelta(days=100)),
+ data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE +
timedelta(days=11)),
+ )
+
+ asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset1.uri))
+
+ event1 = AssetEvent(
+ asset_id=asset1_id,
+ source_task_id="task",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ session.add(event1)
+
+ with dag_maker(dag_id="asset-consumer", schedule=[asset1]):
+ pass
+ consumer_dag = dag_maker.dag
+
+ session.add(AssetDagRunQueue(asset_id=asset1_id,
target_dag_id=consumer_dag.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ with create_session() as session:
+ self.job_runner._create_dagruns_for_dags(session, session)
+
+ # Asset-triggered dag run should be created even when
use_job_schedule=False
+ created_run = session.scalars(
+ select(DagRun).where(DagRun.dag_id == consumer_dag.dag_id)
+ ).one_or_none()
+ assert created_run is not None
+ assert created_run.state == State.QUEUED
+
+ @pytest.mark.need_serialized_dag
+ @conf_vars({("scheduler", "use_job_schedule"): "False"})
+ def test_time_based_dag_runs_not_created_when_use_job_schedule_false(self,
session, dag_maker):
+ """
+ Test that time-based dag runs are NOT created when
use_job_schedule=False.
+ """
+ with dag_maker(
+ dag_id="time-based-dag",
+ schedule="@daily",
+ start_date=DEFAULT_DATE,
+ session=session,
+ ):
+ BashOperator(task_id="task", bash_command="echo 1")
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ with create_session() as session:
+ self.job_runner._create_dagruns_for_dags(session, session)
+
+ # Time-based dag run should NOT be created when use_job_schedule=False
+ created_run = session.scalars(select(DagRun).where(DagRun.dag_id ==
"time-based-dag")).one_or_none()
+ assert created_run is None
Review Comment:
Same issues as the asset-triggered test: `session` is shadowed by a new
`create_session()` context, `_create_dagruns_for_dags` is called with the wrong
first argument type (guard vs session), and uncommitted fixture data may not be
visible across sessions. Run the scheduler method using the fixture `session`
under a `prohibit_commit(...)` guard, or explicitly commit/refresh when
crossing session boundaries.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1853,7 +1852,9 @@ def _create_dagruns_for_dags(self, guard:
CommitProhibitorGuard, session: Sessio
# filter asset partition triggered Dags
if d.dag_id not in partition_dag_ids
}
- self._create_dag_runs(non_asset_dags, session)
+ # Only create time-based dag runs if use_job_schedule is enabled
+ if self._scheduler_use_job_schedule:
+ self._create_dag_runs(non_asset_dags, session)
Review Comment:
When `use_job_schedule` is False, `_create_dagruns_for_dags` still builds
`all_dags_needing_dag_runs`, `asset_triggered_dags`, and `non_asset_dags`
(including the `difference(...)` set operations) even though time-based runs
will be skipped. A small optimization is to compute `non_asset_dags` only
inside the `if self._scheduler_use_job_schedule:` block (and keep the
asset-triggered list creation), reducing unnecessary set work for users who
intentionally disable time scheduling.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]