Copilot commented on code in PR #63939:
URL: https://github.com/apache/airflow/pull/63939#discussion_r3025355426


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1489,21 +1489,25 @@ def _create_dag_runs_dataset_triggered(
                     events=dataset_events,
                 )
 
-                dag_run = dag.create_dagrun(
-                    run_id=run_id,
-                    run_type=DagRunType.DATASET_TRIGGERED,
-                    execution_date=exec_date,
-                    data_interval=data_interval,
-                    state=DagRunState.QUEUED,
-                    external_trigger=False,
-                    session=session,
-                    dag_hash=dag_hash,
-                    creating_job_id=self.job.id,
-                )
-                Stats.incr("dataset.triggered_dagruns")
-                dag_run.consumed_dataset_events.extend(dataset_events)
+                if dataset_events:
+                    dag_run = dag.create_dagrun(
+                        run_id=run_id,
+                        run_type=DagRunType.DATASET_TRIGGERED,
+                        execution_date=exec_date,
+                        data_interval=data_interval,
+                        state=DagRunState.QUEUED,
+                        external_trigger=False,
+                        session=session,
+                        dag_hash=dag_hash,
+                        creating_job_id=self.job.id,
+                    )
+                    Stats.incr("dataset.triggered_dagruns")
+                    dag_run.consumed_dataset_events.extend(dataset_events)
                 session.execute(
-                    
delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == 
dag_run.dag_id)
+                    delete(DatasetDagRunQueue).where(
+                        DatasetDagRunQueue.target_dag_id == dag.dag_id,
+                        DatasetDagRunQueue.created_at <= exec_date,
+                    )
                 )

Review Comment:
   `dag_run` is now created only when `dataset_events` is non-empty, but the 
subsequent `DELETE` no longer depends on `dag_run`. This fixes the previous 
unbound reference, but it introduces a more serious logic issue: when 
`dataset_events` is empty, the code still deletes from `DatasetDagRunQueue` for 
the DAG where `created_at <= exec_date`. That can clear readiness queue entries 
without creating a DagRun, potentially dropping triggers and preventing future 
scheduling. Consider moving the `DELETE` (or at least the `created_at <= 
exec_date` cleanup) inside the `if dataset_events:` block, or otherwise ensure 
the queue is only pruned when a run is actually created/consumed.



##########
tests/datasets/test_manager.py:
##########
@@ -228,3 +228,97 @@ def test_create_datasets_notifies_dataset_listener(self, 
session):
         # Ensure the listener was notified
         assert len(dataset_listener.created) == 1
         assert dataset_listener.created[0].uri == dsm.uri
+
+    @pytest.mark.skip_if_database_isolation_mode
+    def test_slow_path_queue_dagruns_updates_timestamp(self, session):
+        """
+        On non-Postgres backends, _slow_path_queue_dagruns should update 
created_at
+        when a DatasetDagRunQueue record already exists for the same 
(dataset_id, target_dag_id).
+        """
+        from datetime import timedelta
+
+        from airflow.utils import timezone
+
+        dsem = DatasetManager()
+        ds = DatasetModel(uri="test_slow_path_update_ts")
+        dag = DagModel(dag_id="test_slow_path_dag", is_active=True)
+        session.add_all([ds, dag])
+        session.flush()
+
+        old_ts = timezone.utcnow() - timedelta(days=1)
+        session.add(DatasetDagRunQueue(dataset_id=ds.id, 
target_dag_id=dag.dag_id, created_at=old_ts))
+        session.commit()
+
+        before_update = timezone.utcnow()
+        dsem._slow_path_queue_dagruns(dataset_id=ds.id, dags_to_queue={dag}, 
session=session)
+        session.commit()
+
+        updated = (
+            session.query(DatasetDagRunQueue)
+            .filter_by(dataset_id=ds.id, target_dag_id=dag.dag_id)
+            .one()
+        )
+        assert updated.created_at >= before_update
+
+    @pytest.mark.skip_if_database_isolation_mode
+    @pytest.mark.backend("postgres")
+    def test_postgres_queue_dagruns_updates_timestamp(self, session):
+        """
+        On PostgreSQL, _postgres_queue_dagruns should update created_at via ON 
CONFLICT DO UPDATE
+        when a DatasetDagRunQueue record already exists for the same 
(dataset_id, target_dag_id).
+        """
+        from datetime import timedelta
+
+        from airflow.utils import timezone
+
+        dsem = DatasetManager()
+        ds = DatasetModel(uri="test_postgres_update_ts")
+        dag = DagModel(dag_id="test_postgres_dag", is_active=True)
+        session.add_all([ds, dag])
+        session.flush()
+
+        old_ts = timezone.utcnow() - timedelta(days=1)
+        session.add(DatasetDagRunQueue(dataset_id=ds.id, 
target_dag_id=dag.dag_id, created_at=old_ts))
+        session.commit()
+
+        before_update = timezone.utcnow()
+        dsem._postgres_queue_dagruns(dataset_id=ds.id, dags_to_queue={dag}, 
session=session)
+        session.commit()
+
+        updated = (
+            session.query(DatasetDagRunQueue)
+            .filter_by(dataset_id=ds.id, target_dag_id=dag.dag_id)
+            .one()
+        )
+        assert updated.created_at >= before_update
+
+    @pytest.mark.skip_if_database_isolation_mode
+    @pytest.mark.backend("postgres")
+    def test_postgres_queue_dagruns_where_guard_prevents_backwards_drift(self, 
session):
+        """
+        The WHERE guard on the Postgres upsert should prevent a slower 
transaction
+        from overwriting a newer created_at with an older one.
+        """
+        from datetime import timedelta
+
+        from airflow.utils import timezone
+
+        dsem = DatasetManager()
+        ds = DatasetModel(uri="test_postgres_where_guard")
+        dag = DagModel(dag_id="test_pg_guard_dag", is_active=True)
+        session.add_all([ds, dag])
+        session.flush()
+
+        newer_ts = timezone.utcnow()
+        session.add(DatasetDagRunQueue(dataset_id=ds.id, 
target_dag_id=dag.dag_id, created_at=newer_ts))
+        session.commit()
+
+        dsem._postgres_queue_dagruns(dataset_id=ds.id, dags_to_queue={dag}, 
session=session)
+        session.commit()

Review Comment:
   `test_postgres_queue_dagruns_where_guard_prevents_backwards_drift` doesn’t 
currently validate the WHERE-guard behavior it describes. The test sets an 
existing `created_at` to `newer_ts`, then calls `_postgres_queue_dagruns()`, 
but that method always uses `timezone.utcnow()` for the upsert value—i.e. a 
timestamp *newer* than `newer_ts`—so the update is expected to succeed 
regardless of the `WHERE (existing < excluded)` guard. To actually 
regression-test “backwards drift prevention”, freeze/patch `timezone.utcnow()` 
(e.g. via the repo’s `time_machine` fixture) to return an *older* timestamp for 
the upsert, then assert `created_at` remains `newer_ts` (or unchanged).



##########
airflow/datasets/manager.py:
##########
@@ -202,8 +203,13 @@ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
     def _postgres_queue_dagruns(cls, dataset_id: int, dags_to_queue: 
set[DagModel], session: Session) -> None:
         from sqlalchemy.dialects.postgresql import insert
 
-        values = [{"target_dag_id": dag.dag_id} for dag in dags_to_queue]
-        stmt = 
insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
+        values = [{"target_dag_id": dag.dag_id, "created_at": 
timezone.utcnow()} for dag in dags_to_queue]
+        stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset_id)
+        stmt = stmt.on_conflict_do_update(
+            index_elements=["dataset_id", "target_dag_id"],
+            set_={"created_at": stmt.excluded.created_at},
+            where=(DatasetDagRunQueue.created_at < stmt.excluded.created_at),
+        )
         session.execute(stmt, values)

Review Comment:
   In `_postgres_queue_dagruns`, `values = [{..., "created_at": 
timezone.utcnow()} for dag in dags_to_queue]` evaluates `utcnow()` once per 
DAG. That means `created_at` can differ slightly across rows from the same 
dataset event, and it adds unnecessary per-row Python work. Consider computing 
a single `now = timezone.utcnow()` once and reusing it for all rows in this 
batch to keep timestamps consistent and reduce overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to