This is an automated email from the ASF dual-hosted git repository. uranusjr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 7ccbe4e7ea A manual run can't look like a scheduled one (#28397) 7ccbe4e7ea is described below commit 7ccbe4e7eaa529641052779a89e34d54c5a20f72 Author: Tzu-ping Chung <uranu...@gmail.com> AuthorDate: Thu Dec 22 09:54:26 2022 +0800 A manual run can't look like a scheduled one (#28397) Fix https://github.com/apache/airflow/issues/27818 --- airflow/models/dag.py | 23 ++++++++++++++++++----- tests/models/test_dag.py | 20 ++++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index c08a7f236f..6385634963 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2549,7 +2549,7 @@ class DAG(LoggingMixin): external_trigger: bool | None = False, conf: dict | None = None, run_type: DagRunType | None = None, - session=NEW_SESSION, + session: Session = NEW_SESSION, dag_hash: str | None = None, creating_job_id: int | None = None, data_interval: tuple[datetime, datetime] | None = None, @@ -2586,14 +2586,27 @@ class DAG(LoggingMixin): else: data_interval = self.infer_automated_data_interval(logical_date) + if run_type is None or isinstance(run_type, DagRunType): + pass + elif isinstance(run_type, str): # Compatibility: run_type used to be a str. + run_type = DagRunType(run_type) + else: + raise ValueError(f"`run_type` should be a DagRunType, not {type(run_type)}") + if run_id: # Infer run_type from run_id if needed. if not isinstance(run_id, str): raise ValueError(f"`run_id` should be a str, not {type(run_id)}") - if not run_type: - run_type = DagRunType.from_run_id(run_id) + inferred_run_type = DagRunType.from_run_id(run_id) + if run_type is None: + # No explicit type given, use the inferred type. + run_type = inferred_run_type + elif run_type == DagRunType.MANUAL and inferred_run_type != DagRunType.MANUAL: + # Prevent a manual run from using an ID that looks like a scheduled run. + raise ValueError( + f"A {run_type.value} DAG run cannot use ID {run_id!r} since it " + f"is reserved for {inferred_run_type.value} runs" + ) elif run_type and logical_date is not None: # Generate run_id from run_type and execution_date. - if not isinstance(run_type, DagRunType): - raise ValueError(f"`run_type` should be a DagRunType, not {type(run_type)}") run_id = self.timetable.generate_run_id( run_type=run_type, logical_date=logical_date, data_interval=data_interval ) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 6d3ca77bbe..9f1ba1e743 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3147,3 +3147,23 @@ def test_dag_uses_timetable_for_run_id(session): ) assert dag_run.run_id == "abc" + + +@pytest.mark.parametrize( + "run_id_type", + [DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED, DagRunType.DATASET_TRIGGERED], +) +def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagRunType) -> None: + dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily") + run_id = run_id_type.generate_run_id(DEFAULT_DATE) + with pytest.raises(ValueError) as ctx: + dag.create_dagrun( + run_type=DagRunType.MANUAL, + run_id=run_id, + execution_date=DEFAULT_DATE, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + state=DagRunState.QUEUED, + ) + assert str(ctx.value) == ( + f"A manual DAG run cannot use ID {run_id!r} since it is reserved for {run_id_type.value} runs" + )