Copilot commented on code in PR #62931:
URL: https://github.com/apache/airflow/pull/62931#discussion_r3066477710


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self, 
session, dag_maker):
+        asset = Asset(uri="test://asset-for-stale-trigger-date", 
name="asset-for-stale-trigger-date")
+        with dag_maker(dag_id="asset-consumer-stale-trigger-date", 
schedule=[asset], session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+
+        queued_at = timezone.utcnow()
+        session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id, 
asset_id=asset_id, created_at=queued_at))
+        session.flush()
+
+        # Simulate another scheduler consuming ADRQ rows after we computed 
triggered_date_by_dag.
+        
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_model.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        self.job_runner._create_dag_runs_asset_triggered(
+            dag_models=[dag_model],
+            session=session,
+        )
+
+        # We do not create a new DagRun seems the ADRQ has already been 
consumed
+        assert session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none() is None
+
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+        self, session: Session, dag_maker
+    ):
+        asset_1 = Asset("ready-to-trigger-a-Dag-run")
+        asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+        with dag_maker(dag_id="asset-consumer-delete-selected", 
schedule=asset_1 | asset_2, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_1.name))
+        asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_2.name))
+
+        session.add_all(
+            [
+                # The ADRQ that should triggers the Dag run creation
+                AssetDagRunQueue(
+                    asset_id=asset_1_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+                # The ADRQ that arrives after the Dag run creation but before 
ADRQ clean up
+                # This situation is simluarted by _lock_only_selected_asset 
below
+                AssetDagRunQueue(
+                    asset_id=asset_2_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+            ]
+        )
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        def _lock_only_selected_asset(query, **_):
+            # Simulate SKIP LOCKED behavior where this scheduler can only 
consume one ADRQ row.
+            return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+        with patch("airflow.jobs.scheduler_job_runner.with_row_locks", 
side_effect=_lock_only_selected_asset):
+            self.job_runner._create_dag_runs_asset_triggered(
+                dag_models=[dag_model],
+                session=session,
+            )
+
+        dr = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none()
+
+        assert dr is not None
+
+        adrq_1 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_1_id,
+            )
+        ).one_or_none()
+        assert adrq_1 is None
+        adrq_2 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_2_id,
+            )
+        ).one_or_none()
+        assert adrq_2 is not None
+
+        assert dr.creating_job_id == scheduler_job.id
+
+    @pytest.mark.need_serialized_dag
+    @conf_vars({("scheduler", "use_job_schedule"): "False"})
+    def 
test_asset_triggered_dag_runs_created_when_use_job_schedule_false(self, 
session, dag_maker):
+        """
+        Test that asset-triggered dag runs are still created when 
use_job_schedule=False.
+
+        use_job_schedule=False should only disable time-based (cron/interval) 
scheduling,
+        not asset-triggered scheduling.
+        """
+        asset1 = Asset(uri="test://asset-use-job-schedule", 
name="asset_use_job_schedule", group="test")
+
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset1])
+        dr = dag_maker.create_dagrun(
+            run_id="run1",
+            logical_date=(DEFAULT_DATE + timedelta(days=100)),
+            data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE + 
timedelta(days=11)),
+        )
+
+        asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset1.uri))
+
+        event1 = AssetEvent(
+            asset_id=asset1_id,
+            source_task_id="task",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+        )
+        session.add(event1)
+
+        with dag_maker(dag_id="asset-consumer", schedule=[asset1]):
+            pass
+        consumer_dag = dag_maker.dag
+
+        session.add(AssetDagRunQueue(asset_id=asset1_id, 
target_dag_id=consumer_dag.dag_id))

Review Comment:
   `_create_dag_runs_asset_triggered` orders by `AssetDagRunQueue.created_at` 
and uses the newest row to compute `triggered_date`. This test inserts an ADRQ 
row without setting `created_at`, which can be NULL depending on model defaults 
and can make ordering/triggered_date nondeterministic (or error when coercing). 
Set `created_at=timezone.utcnow()` (or a fixed timestamp) for deterministic 
behavior.
   ```suggestion
           session.add(
               AssetDagRunQueue(
                   asset_id=asset1_id,
                   target_dag_id=consumer_dag.dag_id,
                   created_at=timezone.utcnow(),
               )
           )
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self, 
session, dag_maker):
+        asset = Asset(uri="test://asset-for-stale-trigger-date", 
name="asset-for-stale-trigger-date")
+        with dag_maker(dag_id="asset-consumer-stale-trigger-date", 
schedule=[asset], session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+
+        queued_at = timezone.utcnow()
+        session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id, 
asset_id=asset_id, created_at=queued_at))
+        session.flush()
+
+        # Simulate another scheduler consuming ADRQ rows after we computed 
triggered_date_by_dag.
+        
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_model.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        self.job_runner._create_dag_runs_asset_triggered(
+            dag_models=[dag_model],
+            session=session,
+        )
+
+        # We do not create a new DagRun seems the ADRQ has already been 
consumed
+        assert session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none() is None
+
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+        self, session: Session, dag_maker
+    ):
+        asset_1 = Asset("ready-to-trigger-a-Dag-run")
+        asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+        with dag_maker(dag_id="asset-consumer-delete-selected", 
schedule=asset_1 | asset_2, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_1.name))
+        asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_2.name))
+
+        session.add_all(
+            [
+                # The ADRQ that should triggers the Dag run creation
+                AssetDagRunQueue(
+                    asset_id=asset_1_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+                # The ADRQ that arrives after the Dag run creation but before 
ADRQ clean up
+                # This situation is simluarted by _lock_only_selected_asset 
below

Review Comment:
   Correct typo in comment: 'simluarted' → 'simulated'.
   ```suggestion
                   # This situation is simulated by _lock_only_selected_asset 
below
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self, 
session, dag_maker):
+        asset = Asset(uri="test://asset-for-stale-trigger-date", 
name="asset-for-stale-trigger-date")
+        with dag_maker(dag_id="asset-consumer-stale-trigger-date", 
schedule=[asset], session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+
+        queued_at = timezone.utcnow()
+        session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id, 
asset_id=asset_id, created_at=queued_at))
+        session.flush()
+
+        # Simulate another scheduler consuming ADRQ rows after we computed 
triggered_date_by_dag.
+        
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_model.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        self.job_runner._create_dag_runs_asset_triggered(
+            dag_models=[dag_model],
+            session=session,
+        )
+
+        # We do not create a new DagRun seems the ADRQ has already been 
consumed
+        assert session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none() is None
+
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+        self, session: Session, dag_maker
+    ):
+        asset_1 = Asset("ready-to-trigger-a-Dag-run")
+        asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+        with dag_maker(dag_id="asset-consumer-delete-selected", 
schedule=asset_1 | asset_2, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_1.name))
+        asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_2.name))
+
+        session.add_all(
+            [
+                # The ADRQ that should triggers the Dag run creation
+                AssetDagRunQueue(
+                    asset_id=asset_1_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+                # The ADRQ that arrives after the Dag run creation but before 
ADRQ clean up
+                # This situation is simluarted by _lock_only_selected_asset 
below
+                AssetDagRunQueue(
+                    asset_id=asset_2_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+            ]
+        )
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        def _lock_only_selected_asset(query, **_):
+            # Simulate SKIP LOCKED behavior where this scheduler can only 
consume one ADRQ row.
+            return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+        with patch("airflow.jobs.scheduler_job_runner.with_row_locks", 
side_effect=_lock_only_selected_asset):
+            self.job_runner._create_dag_runs_asset_triggered(
+                dag_models=[dag_model],
+                session=session,
+            )
+
+        dr = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none()
+
+        assert dr is not None
+
+        adrq_1 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_1_id,
+            )
+        ).one_or_none()
+        assert adrq_1 is None
+        adrq_2 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_2_id,
+            )
+        ).one_or_none()
+        assert adrq_2 is not None
+
+        assert dr.creating_job_id == scheduler_job.id
+
+    @pytest.mark.need_serialized_dag
+    @conf_vars({("scheduler", "use_job_schedule"): "False"})
+    def 
test_asset_triggered_dag_runs_created_when_use_job_schedule_false(self, 
session, dag_maker):
+        """
+        Test that asset-triggered dag runs are still created when 
use_job_schedule=False.
+
+        use_job_schedule=False should only disable time-based (cron/interval) 
scheduling,
+        not asset-triggered scheduling.
+        """
+        asset1 = Asset(uri="test://asset-use-job-schedule", 
name="asset_use_job_schedule", group="test")
+
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset1])
+        dr = dag_maker.create_dagrun(
+            run_id="run1",
+            logical_date=(DEFAULT_DATE + timedelta(days=100)),
+            data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE + 
timedelta(days=11)),
+        )
+
+        asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset1.uri))
+
+        event1 = AssetEvent(
+            asset_id=asset1_id,
+            source_task_id="task",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+        )
+        session.add(event1)
+
+        with dag_maker(dag_id="asset-consumer", schedule=[asset1]):
+            pass
+        consumer_dag = dag_maker.dag
+
+        session.add(AssetDagRunQueue(asset_id=asset1_id, 
target_dag_id=consumer_dag.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        with create_session() as session:
+            self.job_runner._create_dagruns_for_dags(session, session)
+
+        # Asset-triggered dag run should be created even when 
use_job_schedule=False
+        created_run = session.scalars(
+            select(DagRun).where(DagRun.dag_id == consumer_dag.dag_id)
+        ).one_or_none()

Review Comment:
   This test rebinds the `session` fixture name to a new `create_session()` 
context, then queries using that (now context-managed/closed) session, and also 
passes a `Session` where `_create_dagruns_for_dags` expects a 
`CommitProhibitorGuard`. Additionally, the ADRQ/event rows were only 
`flush()`ed on the fixture session; a separate DB session may not see them 
without a commit. Use the provided fixture session for both setup and scheduler 
call, invoke `_create_dagruns_for_dags` under `with prohibit_commit(session) as 
guard: ...`, and avoid shadowing the `session` variable (e.g., 
`sched_session`). If using two sessions intentionally, commit the setup 
transaction and `expire_all()`/refresh before asserting.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self, 
session, dag_maker):
+        asset = Asset(uri="test://asset-for-stale-trigger-date", 
name="asset-for-stale-trigger-date")
+        with dag_maker(dag_id="asset-consumer-stale-trigger-date", 
schedule=[asset], session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+
+        queued_at = timezone.utcnow()
+        session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id, 
asset_id=asset_id, created_at=queued_at))
+        session.flush()
+
+        # Simulate another scheduler consuming ADRQ rows after we computed 
triggered_date_by_dag.
+        
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_model.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        self.job_runner._create_dag_runs_asset_triggered(
+            dag_models=[dag_model],
+            session=session,
+        )
+
+        # We do not create a new DagRun seems the ADRQ has already been 
consumed
+        assert session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none() is None
+
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+        self, session: Session, dag_maker
+    ):
+        asset_1 = Asset("ready-to-trigger-a-Dag-run")
+        asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+        with dag_maker(dag_id="asset-consumer-delete-selected", 
schedule=asset_1 | asset_2, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_1.name))
+        asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_2.name))

Review Comment:
   The `Asset` instances are created as `Asset(\"...\")` (positional arg is the 
asset URI), but the lookup filters by `AssetModel.uri == asset_1.name` / 
`asset_2.name`. If `name` is unset or differs from `uri`, these queries can 
return `None` and subsequent ADRQ inserts will break. Filter by `asset_1.uri` / 
`asset_2.uri` (or set `name=` explicitly and query the appropriate column 
consistently).
   ```suggestion
           asset_1_id = 
session.scalar(select(AssetModel.id).where(AssetModel.uri == asset_1.uri))
           asset_2_id = 
session.scalar(select(AssetModel.id).where(AssetModel.uri == asset_2.uri))
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4868,167 @@ def dict_from_obj(obj):
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self, 
session, dag_maker):
+        asset = Asset(uri="test://asset-for-stale-trigger-date", 
name="asset-for-stale-trigger-date")
+        with dag_maker(dag_id="asset-consumer-stale-trigger-date", 
schedule=[asset], session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+
+        queued_at = timezone.utcnow()
+        session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id, 
asset_id=asset_id, created_at=queued_at))
+        session.flush()
+
+        # Simulate another scheduler consuming ADRQ rows after we computed 
triggered_date_by_dag.
+        
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_model.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+        self.job_runner._create_dag_runs_asset_triggered(
+            dag_models=[dag_model],
+            session=session,
+        )
+
+        # We do not create a new DagRun seems the ADRQ has already been 
consumed
+        assert session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none() is None
+
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+        self, session: Session, dag_maker
+    ):
+        asset_1 = Asset("ready-to-trigger-a-Dag-run")
+        asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+        with dag_maker(dag_id="asset-consumer-delete-selected", 
schedule=asset_1 | asset_2, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_1.name))
+        asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_2.name))
+
+        session.add_all(
+            [
+                # The ADRQ that should triggers the Dag run creation
+                AssetDagRunQueue(
+                    asset_id=asset_1_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+                # The ADRQ that arrives after the Dag run creation but before 
ADRQ clean up
+                # This situation is simluarted by _lock_only_selected_asset 
below
+                AssetDagRunQueue(
+                    asset_id=asset_2_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+            ]
+        )
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        def _lock_only_selected_asset(query, **_):
+            # Simulate SKIP LOCKED behavior where this scheduler can only 
consume one ADRQ row.
+            return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+        with patch("airflow.jobs.scheduler_job_runner.with_row_locks", 
side_effect=_lock_only_selected_asset):
+            self.job_runner._create_dag_runs_asset_triggered(
+                dag_models=[dag_model],
+                session=session,
+            )
+
+        dr = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none()
+
+        assert dr is not None
+
+        adrq_1 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_1_id,
+            )
+        ).one_or_none()
+        assert adrq_1 is None
+        adrq_2 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_2_id,
+            )
+        ).one_or_none()
+        assert adrq_2 is not None
+
+        assert dr.creating_job_id == scheduler_job.id
+
+    @pytest.mark.need_serialized_dag
+    @conf_vars({("scheduler", "use_job_schedule"): "False"})
+    def 
test_asset_triggered_dag_runs_created_when_use_job_schedule_false(self, 
session, dag_maker):
+        """
+        Test that asset-triggered dag runs are still created when 
use_job_schedule=False.
+
+        use_job_schedule=False should only disable time-based (cron/interval) 
scheduling,
+        not asset-triggered scheduling.
+        """
+        asset1 = Asset(uri="test://asset-use-job-schedule", 
name="asset_use_job_schedule", group="test")
+
+        with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset1])
+        dr = dag_maker.create_dagrun(
+            run_id="run1",
+            logical_date=(DEFAULT_DATE + timedelta(days=100)),
+            data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE + 
timedelta(days=11)),
+        )
+
+        asset1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset1.uri))
+
+        event1 = AssetEvent(
+            asset_id=asset1_id,
+            source_task_id="task",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+        )
+        session.add(event1)
+
+        with dag_maker(dag_id="asset-consumer", schedule=[asset1]):
+            pass
+        consumer_dag = dag_maker.dag
+
+        session.add(AssetDagRunQueue(asset_id=asset1_id, 
target_dag_id=consumer_dag.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        with create_session() as session:
+            self.job_runner._create_dagruns_for_dags(session, session)
+
+        # Asset-triggered dag run should be created even when 
use_job_schedule=False
+        created_run = session.scalars(
+            select(DagRun).where(DagRun.dag_id == consumer_dag.dag_id)
+        ).one_or_none()
+        assert created_run is not None
+        assert created_run.state == State.QUEUED
+
+    @pytest.mark.need_serialized_dag
+    @conf_vars({("scheduler", "use_job_schedule"): "False"})
+    def test_time_based_dag_runs_not_created_when_use_job_schedule_false(self, 
session, dag_maker):
+        """
+        Test that time-based dag runs are NOT created when 
use_job_schedule=False.
+        """
+        with dag_maker(
+            dag_id="time-based-dag",
+            schedule="@daily",
+            start_date=DEFAULT_DATE,
+            session=session,
+        ):
+            BashOperator(task_id="task", bash_command="echo 1")
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        with create_session() as session:
+            self.job_runner._create_dagruns_for_dags(session, session)
+
+        # Time-based dag run should NOT be created when use_job_schedule=False
+        created_run = session.scalars(select(DagRun).where(DagRun.dag_id == 
"time-based-dag")).one_or_none()
+        assert created_run is None

Review Comment:
   Same issues as the asset-triggered test: `session` is shadowed by a new 
`create_session()` context, `_create_dagruns_for_dags` is called with the wrong 
first argument type (guard vs session), and uncommitted fixture data may not be 
visible across sessions. Run the scheduler method using the fixture `session` 
under a `prohibit_commit(...)` guard, or explicitly commit/refresh when 
crossing session boundaries.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1853,7 +1852,9 @@ def _create_dagruns_for_dags(self, guard: 
CommitProhibitorGuard, session: Sessio
             # filter asset partition triggered Dags
             if d.dag_id not in partition_dag_ids
         }
-        self._create_dag_runs(non_asset_dags, session)
+        # Only create time-based dag runs if use_job_schedule is enabled
+        if self._scheduler_use_job_schedule:
+            self._create_dag_runs(non_asset_dags, session)

Review Comment:
   When `use_job_schedule` is False, `_create_dagruns_for_dags` still builds 
`all_dags_needing_dag_runs`, `asset_triggered_dags`, and `non_asset_dags` 
(including the `difference(...)` set operations) even though time-based runs 
will be skipped. A small optimization is to compute `non_asset_dags` only 
inside the `if self._scheduler_use_job_schedule:` block (and keep the 
asset-triggered list creation), reducing unnecessary set work for users who 
intentionally disable time scheduling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to