Copilot commented on code in PR #63939:
URL: https://github.com/apache/airflow/pull/63939#discussion_r3025355426
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1489,21 +1489,25 @@ def _create_dag_runs_dataset_triggered(
events=dataset_events,
)
- dag_run = dag.create_dagrun(
- run_id=run_id,
- run_type=DagRunType.DATASET_TRIGGERED,
- execution_date=exec_date,
- data_interval=data_interval,
- state=DagRunState.QUEUED,
- external_trigger=False,
- session=session,
- dag_hash=dag_hash,
- creating_job_id=self.job.id,
- )
- Stats.incr("dataset.triggered_dagruns")
- dag_run.consumed_dataset_events.extend(dataset_events)
+ if dataset_events:
+ dag_run = dag.create_dagrun(
+ run_id=run_id,
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=exec_date,
+ data_interval=data_interval,
+ state=DagRunState.QUEUED,
+ external_trigger=False,
+ session=session,
+ dag_hash=dag_hash,
+ creating_job_id=self.job.id,
+ )
+ Stats.incr("dataset.triggered_dagruns")
+ dag_run.consumed_dataset_events.extend(dataset_events)
session.execute(
-
delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id ==
dag_run.dag_id)
+ delete(DatasetDagRunQueue).where(
+ DatasetDagRunQueue.target_dag_id == dag.dag_id,
+ DatasetDagRunQueue.created_at <= exec_date,
+ )
)
Review Comment:
`dag_run` is now created only when `dataset_events` is non-empty, but the
subsequent `DELETE` no longer depends on `dag_run`. This fixes the previous
unbound reference, but it introduces a more serious logic issue: when
`dataset_events` is empty, the code still deletes from `DatasetDagRunQueue` for
the DAG where `created_at <= exec_date`. That can clear readiness queue entries
without creating a DagRun, potentially dropping triggers and preventing future
scheduling. Consider moving the `DELETE` (or at least the `created_at <=
exec_date` cleanup) inside the `if dataset_events:` block, or otherwise ensure
the queue is only pruned when a run is actually created/consumed.
##########
tests/datasets/test_manager.py:
##########
@@ -228,3 +228,97 @@ def test_create_datasets_notifies_dataset_listener(self,
session):
# Ensure the listener was notified
assert len(dataset_listener.created) == 1
assert dataset_listener.created[0].uri == dsm.uri
+
+ @pytest.mark.skip_if_database_isolation_mode
+ def test_slow_path_queue_dagruns_updates_timestamp(self, session):
+ """
+ On non-Postgres backends, _slow_path_queue_dagruns should update
created_at
+ when a DatasetDagRunQueue record already exists for the same
(dataset_id, target_dag_id).
+ """
+ from datetime import timedelta
+
+ from airflow.utils import timezone
+
+ dsem = DatasetManager()
+ ds = DatasetModel(uri="test_slow_path_update_ts")
+ dag = DagModel(dag_id="test_slow_path_dag", is_active=True)
+ session.add_all([ds, dag])
+ session.flush()
+
+ old_ts = timezone.utcnow() - timedelta(days=1)
+ session.add(DatasetDagRunQueue(dataset_id=ds.id,
target_dag_id=dag.dag_id, created_at=old_ts))
+ session.commit()
+
+ before_update = timezone.utcnow()
+ dsem._slow_path_queue_dagruns(dataset_id=ds.id, dags_to_queue={dag},
session=session)
+ session.commit()
+
+ updated = (
+ session.query(DatasetDagRunQueue)
+ .filter_by(dataset_id=ds.id, target_dag_id=dag.dag_id)
+ .one()
+ )
+ assert updated.created_at >= before_update
+
+ @pytest.mark.skip_if_database_isolation_mode
+ @pytest.mark.backend("postgres")
+ def test_postgres_queue_dagruns_updates_timestamp(self, session):
+ """
+ On PostgreSQL, _postgres_queue_dagruns should update created_at via ON
CONFLICT DO UPDATE
+ when a DatasetDagRunQueue record already exists for the same
(dataset_id, target_dag_id).
+ """
+ from datetime import timedelta
+
+ from airflow.utils import timezone
+
+ dsem = DatasetManager()
+ ds = DatasetModel(uri="test_postgres_update_ts")
+ dag = DagModel(dag_id="test_postgres_dag", is_active=True)
+ session.add_all([ds, dag])
+ session.flush()
+
+ old_ts = timezone.utcnow() - timedelta(days=1)
+ session.add(DatasetDagRunQueue(dataset_id=ds.id,
target_dag_id=dag.dag_id, created_at=old_ts))
+ session.commit()
+
+ before_update = timezone.utcnow()
+ dsem._postgres_queue_dagruns(dataset_id=ds.id, dags_to_queue={dag},
session=session)
+ session.commit()
+
+ updated = (
+ session.query(DatasetDagRunQueue)
+ .filter_by(dataset_id=ds.id, target_dag_id=dag.dag_id)
+ .one()
+ )
+ assert updated.created_at >= before_update
+
+ @pytest.mark.skip_if_database_isolation_mode
+ @pytest.mark.backend("postgres")
+ def test_postgres_queue_dagruns_where_guard_prevents_backwards_drift(self,
session):
+ """
+ The WHERE guard on the Postgres upsert should prevent a slower
transaction
+ from overwriting a newer created_at with an older one.
+ """
+ from datetime import timedelta
+
+ from airflow.utils import timezone
+
+ dsem = DatasetManager()
+ ds = DatasetModel(uri="test_postgres_where_guard")
+ dag = DagModel(dag_id="test_pg_guard_dag", is_active=True)
+ session.add_all([ds, dag])
+ session.flush()
+
+ newer_ts = timezone.utcnow()
+ session.add(DatasetDagRunQueue(dataset_id=ds.id,
target_dag_id=dag.dag_id, created_at=newer_ts))
+ session.commit()
+
+ dsem._postgres_queue_dagruns(dataset_id=ds.id, dags_to_queue={dag},
session=session)
+ session.commit()
Review Comment:
`test_postgres_queue_dagruns_where_guard_prevents_backwards_drift` doesn’t
currently validate the WHERE-guard behavior it describes. The test sets an
existing `created_at` to `newer_ts`, then calls `_postgres_queue_dagruns()`,
but that method always uses `timezone.utcnow()` for the upsert value—i.e. a
timestamp *newer* than `newer_ts`—so the update is expected to succeed
regardless of the `WHERE (existing < excluded)` guard. To actually
regression-test “backwards drift prevention”, freeze/patch `timezone.utcnow()`
(e.g. via the repo’s `time_machine` fixture) to return an *older* timestamp for
the upsert, then assert `created_at` remains `newer_ts` (or unchanged).
##########
airflow/datasets/manager.py:
##########
@@ -202,8 +203,13 @@ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
def _postgres_queue_dagruns(cls, dataset_id: int, dags_to_queue:
set[DagModel], session: Session) -> None:
from sqlalchemy.dialects.postgresql import insert
- values = [{"target_dag_id": dag.dag_id} for dag in dags_to_queue]
- stmt =
insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
+ values = [{"target_dag_id": dag.dag_id, "created_at":
timezone.utcnow()} for dag in dags_to_queue]
+ stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset_id)
+ stmt = stmt.on_conflict_do_update(
+ index_elements=["dataset_id", "target_dag_id"],
+ set_={"created_at": stmt.excluded.created_at},
+ where=(DatasetDagRunQueue.created_at < stmt.excluded.created_at),
+ )
session.execute(stmt, values)
Review Comment:
In `_postgres_queue_dagruns`, `values = [{..., "created_at":
timezone.utcnow()} for dag in dags_to_queue]` evaluates `utcnow()` once per
DAG. That means `created_at` can differ slightly across rows from the same
dataset event, and it adds unnecessary per-row Python work. Consider computing
a single `now = timezone.utcnow()` once and reusing it for all rows in this
batch to keep timestamps consistent and reduce overhead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]