Lee-W commented on code in PR #63546:
URL: https://github.com/apache/airflow/pull/63546#discussion_r3020755253
##########
tests/models/test_dag.py:
##########
@@ -3064,6 +3065,109 @@ def test_dags_needing_dagruns_datasets(self, dag_maker,
session):
dag_models = query.all()
assert dag_models == [dag_model]
+ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self,
session, caplog):
+ """DDRQ rows for a Dag without SerializedDagModel must be skipped (no
dataset_triggered info).
+
+ Rows must remain in ``dataset_dag_run_queue`` so the scheduler can
re-evaluate on a later
+ heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns``
only drops them from
+ the in-memory candidate set, it does not delete ORM rows).
+ """
+ orphan_dag_id = "ddr_q_no_serialized_dag"
+ session.add(DatasetModel(uri="dataset_for_orphan_ddrq"))
+ session.flush()
+ dataset_id =
session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar()
+ session.add(
+ DagModel(
+ dag_id=orphan_dag_id,
+ max_active_tasks=1,
+ has_task_concurrency_limits=False,
+ next_dagrun=timezone.datetime(2038, 1, 1),
+ next_dagrun_create_after=timezone.datetime(2038, 1, 2),
+ is_active=True,
+ has_import_errors=False,
+ is_paused=False,
+ )
+ )
+ session.flush()
+ session.add(DatasetDagRunQueue(dataset_id=dataset_id,
target_dag_id=orphan_dag_id))
+ session.flush()
+
+ with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
+ _query, dataset_triggered_dag_info =
DagModel.dags_needing_dagruns(session)
+
+ assert orphan_dag_id not in dataset_triggered_dag_info
+ assert "DAGs in DDRQ but missing SerializedDagModel" in caplog.text
Review Comment:
Same as another PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]