This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ccbe4e7ea A manual run can't look like a scheduled one (#28397)
7ccbe4e7ea is described below

commit 7ccbe4e7eaa529641052779a89e34d54c5a20f72
Author: Tzu-ping Chung <uranu...@gmail.com>
AuthorDate: Thu Dec 22 09:54:26 2022 +0800

    A manual run can't look like a scheduled one (#28397)
    
    Fix https://github.com/apache/airflow/issues/27818
---
 airflow/models/dag.py    | 23 ++++++++++++++++++-----
 tests/models/test_dag.py | 20 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c08a7f236f..6385634963 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2549,7 +2549,7 @@ class DAG(LoggingMixin):
         external_trigger: bool | None = False,
         conf: dict | None = None,
         run_type: DagRunType | None = None,
-        session=NEW_SESSION,
+        session: Session = NEW_SESSION,
         dag_hash: str | None = None,
         creating_job_id: int | None = None,
         data_interval: tuple[datetime, datetime] | None = None,
@@ -2586,14 +2586,27 @@ class DAG(LoggingMixin):
             else:
                 data_interval = 
self.infer_automated_data_interval(logical_date)
 
+        if run_type is None or isinstance(run_type, DagRunType):
+            pass
+        elif isinstance(run_type, str):  # Compatibility: run_type used to be 
a str.
+            run_type = DagRunType(run_type)
+        else:
+            raise ValueError(f"`run_type` should be a DagRunType, not 
{type(run_type)}")
+
         if run_id:  # Infer run_type from run_id if needed.
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` should be a str, not 
{type(run_id)}")
-            if not run_type:
-                run_type = DagRunType.from_run_id(run_id)
+            inferred_run_type = DagRunType.from_run_id(run_id)
+            if run_type is None:
+                # No explicit type given, use the inferred type.
+                run_type = inferred_run_type
+            elif run_type == DagRunType.MANUAL and inferred_run_type != 
DagRunType.MANUAL:
+                # Prevent a manual run from using an ID that looks like a 
scheduled run.
+                raise ValueError(
+                    f"A {run_type.value} DAG run cannot use ID {run_id!r} 
since it "
+                    f"is reserved for {inferred_run_type.value} runs"
+                )
         elif run_type and logical_date is not None:  # Generate run_id from 
run_type and execution_date.
-            if not isinstance(run_type, DagRunType):
-                raise ValueError(f"`run_type` should be a DagRunType, not 
{type(run_type)}")
             run_id = self.timetable.generate_run_id(
                 run_type=run_type, logical_date=logical_date, 
data_interval=data_interval
             )
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 6d3ca77bbe..9f1ba1e743 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3147,3 +3147,23 @@ def test_dag_uses_timetable_for_run_id(session):
     )
 
     assert dag_run.run_id == "abc"
+
+
+@pytest.mark.parametrize(
+    "run_id_type",
+    [DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED, 
DagRunType.DATASET_TRIGGERED],
+)
+def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: 
DagRunType) -> None:
+    dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily")
+    run_id = run_id_type.generate_run_id(DEFAULT_DATE)
+    with pytest.raises(ValueError) as ctx:
+        dag.create_dagrun(
+            run_type=DagRunType.MANUAL,
+            run_id=run_id,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            state=DagRunState.QUEUED,
+        )
+    assert str(ctx.value) == (
+        f"A manual DAG run cannot use ID {run_id!r} since it is reserved for 
{run_id_type.value} runs"
+    )

Reply via email to