This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bd8d5eef86 Deprecate DAG.run method (#42417)
bd8d5eef86 is described below

commit bd8d5eef867384983ac5de927b24f2866f059e91
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Sep 23 18:52:48 2024 -0700

    Deprecate DAG.run method (#42417)
    
    This method relies on local backfill mode, which is slated for removal in 
3.0. We have suitable alternatives such as DAG.test() and triggering dags via 
API.
---
 airflow/models/dag.py            |   9 +++
 tests/jobs/test_scheduler_job.py | 116 +++++++++++++++++++++++----------------
 tests/models/test_dag.py         |  20 ++++---
 3 files changed, 90 insertions(+), 55 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c95d11f3ef..215dae298f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -28,6 +28,7 @@ import pickle
 import sys
 import time
 import traceback
+import warnings
 import weakref
 from collections import abc, defaultdict, deque
 from contextlib import ExitStack
@@ -88,6 +89,7 @@ from airflow.exceptions import (
     DuplicateTaskIdFound,
     FailStopDagInvalidTriggerRule,
     ParamValidationError,
+    RemovedInAirflow3Warning,
     TaskDeferred,
     TaskNotFound,
     UnknownExecutorException,
@@ -2331,6 +2333,13 @@ class DAG(LoggingMixin):
         :param run_at_least_once: If true, always run the DAG at least once 
even
             if no logical run exists within the time range.
         """
+        warnings.warn(
+            "`DAG.run()` is deprecated and will be removed in Airflow 3.0. 
Consider "
+            "using `DAG.test()` instead, or trigger your dag via API.",
+            RemovedInAirflow3Warning,
+            stacklevel=2,
+        )
+
         from airflow.executors.executor_loader import ExecutorLoader
         from airflow.jobs.backfill_job_runner import BackfillJobRunner
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 372a6ae2d9..9113a2dee1 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -42,7 +42,7 @@ from airflow.callbacks.pipe_callback_sink import 
PipeCallbackSink
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.datasets import Dataset
 from airflow.datasets.manager import DatasetManager
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.executors.base_executor import BaseExecutor
 from airflow.executors.executor_constants import MOCK_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
@@ -2838,6 +2838,10 @@ class TestSchedulerJob:
         This is hackish: a dag run is created but its tasks are
         run by a backfill.
         """
+
+        # todo: AIP-78 remove along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+
         if run_kwargs is None:
             run_kwargs = {}
 
@@ -2898,40 +2902,49 @@ class TestSchedulerJob:
         """
         DagRuns with one failed and one incomplete root task -> FAILED
         """
-        self.evaluate_dagrun(
-            dag_id="test_dagrun_states_fail",
-            expected_task_states={
-                "test_dagrun_fail": State.FAILED,
-                "test_dagrun_succeed": State.UPSTREAM_FAILED,
-            },
-            dagrun_state=State.FAILED,
-        )
+        # todo: AIP-78 remove along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+        with pytest.warns(RemovedInAirflow3Warning):
+            self.evaluate_dagrun(
+                dag_id="test_dagrun_states_fail",
+                expected_task_states={
+                    "test_dagrun_fail": State.FAILED,
+                    "test_dagrun_succeed": State.UPSTREAM_FAILED,
+                },
+                dagrun_state=State.FAILED,
+            )
 
     def test_dagrun_success(self):
         """
         DagRuns with one failed and one successful root task -> SUCCESS
         """
-        self.evaluate_dagrun(
-            dag_id="test_dagrun_states_success",
-            expected_task_states={
-                "test_dagrun_fail": State.FAILED,
-                "test_dagrun_succeed": State.SUCCESS,
-            },
-            dagrun_state=State.SUCCESS,
-        )
+        # todo: AIP-78 remove along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+        with pytest.warns(RemovedInAirflow3Warning):
+            self.evaluate_dagrun(
+                dag_id="test_dagrun_states_success",
+                expected_task_states={
+                    "test_dagrun_fail": State.FAILED,
+                    "test_dagrun_succeed": State.SUCCESS,
+                },
+                dagrun_state=State.SUCCESS,
+            )
 
     def test_dagrun_root_fail(self):
         """
         DagRuns with one successful and one failed root task -> FAILED
         """
-        self.evaluate_dagrun(
-            dag_id="test_dagrun_states_root_fail",
-            expected_task_states={
-                "test_dagrun_succeed": State.SUCCESS,
-                "test_dagrun_fail": State.FAILED,
-            },
-            dagrun_state=State.FAILED,
-        )
+        # todo: AIP-78 remove along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+        with pytest.warns(RemovedInAirflow3Warning):
+            self.evaluate_dagrun(
+                dag_id="test_dagrun_states_root_fail",
+                expected_task_states={
+                    "test_dagrun_succeed": State.SUCCESS,
+                    "test_dagrun_fail": State.FAILED,
+                },
+                dagrun_state=State.FAILED,
+            )
 
     def test_dagrun_root_fail_unfinished(self):
         """
@@ -2952,9 +2965,12 @@ class TestSchedulerJob:
         )
         self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)
 
-        for _ in _mock_executor(self.null_exec):
-            with pytest.raises(AirflowException):
-                dag.run(start_date=dr.execution_date, 
end_date=dr.execution_date)
+        # todo: AIP-78 remove this test along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+        with pytest.warns(RemovedInAirflow3Warning):
+            for _ in _mock_executor(self.null_exec):
+                with pytest.raises(AirflowException):
+                    dag.run(start_date=dr.execution_date, 
end_date=dr.execution_date)
 
         # Mark the successful task as never having run since we want to see if 
the
         # dagrun will be in a running state despite having an unfinished task.
@@ -2994,16 +3010,19 @@ class TestSchedulerJob:
         if ignore_first_depends_on_past=True and the dagrun execution_date
         is after the start_date.
         """
-        self.evaluate_dagrun(
-            dag_id="test_dagrun_states_deadlock",
-            expected_task_states={
-                "test_depends_on_past": State.SUCCESS,
-                "test_depends_on_past_2": State.SUCCESS,
-            },
-            dagrun_state=State.SUCCESS,
-            advance_execution_date=True,
-            run_kwargs=dict(ignore_first_depends_on_past=True),
-        )
+        # todo: AIP-78 remove along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+        with pytest.warns(RemovedInAirflow3Warning):
+            self.evaluate_dagrun(
+                dag_id="test_dagrun_states_deadlock",
+                expected_task_states={
+                    "test_depends_on_past": State.SUCCESS,
+                    "test_depends_on_past_2": State.SUCCESS,
+                },
+                dagrun_state=State.SUCCESS,
+                advance_execution_date=True,
+                run_kwargs=dict(ignore_first_depends_on_past=True),
+            )
 
     def test_dagrun_deadlock_ignore_depends_on_past(self):
         """
@@ -3012,15 +3031,18 @@ class TestSchedulerJob:
         test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except
         that start_date == execution_date so depends_on_past is irrelevant).
         """
-        self.evaluate_dagrun(
-            dag_id="test_dagrun_states_deadlock",
-            expected_task_states={
-                "test_depends_on_past": State.SUCCESS,
-                "test_depends_on_past_2": State.SUCCESS,
-            },
-            dagrun_state=State.SUCCESS,
-            run_kwargs=dict(ignore_first_depends_on_past=True),
-        )
+        # todo: AIP-78 remove along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+        with pytest.warns(RemovedInAirflow3Warning):
+            self.evaluate_dagrun(
+                dag_id="test_dagrun_states_deadlock",
+                expected_task_states={
+                    "test_depends_on_past": State.SUCCESS,
+                    "test_depends_on_past_2": State.SUCCESS,
+                },
+                dagrun_state=State.SUCCESS,
+                run_kwargs=dict(ignore_first_depends_on_past=True),
+            )
 
     @pytest.mark.parametrize(
         "configs",
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 90d956caeb..df4a892768 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -45,6 +45,7 @@ from airflow.exceptions import (
     AirflowException,
     DuplicateTaskIdFound,
     ParamValidationError,
+    RemovedInAirflow3Warning,
     UnknownExecutorException,
 )
 from airflow.executors import executor_loader
@@ -2733,14 +2734,17 @@ class TestDagModel:
 
     @mock.patch("airflow.models.dag.run_job")
     def test_dag_executors(self, run_job_mock):
-        dag = DAG(dag_id="test", schedule=None)
-        reload(executor_loader)
-        with conf_vars({("core", "executor"): "SequentialExecutor"}):
-            dag.run()
-            assert 
isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, 
SequentialExecutor)
-
-            dag.run(local=True)
-            assert 
isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor)
+        # todo: AIP-78 remove along with DAG.run()
+        #  this only tests the backfill job runner, not the scheduler
+        with pytest.warns(RemovedInAirflow3Warning):
+            dag = DAG(dag_id="test", schedule=None)
+            reload(executor_loader)
+            with conf_vars({("core", "executor"): "SequentialExecutor"}):
+                dag.run()
+                assert 
isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, 
SequentialExecutor)
+
+                dag.run(local=True)
+                assert 
isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor)
 
 
 class TestQueries:

Reply via email to