This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bd8d5eef86 Deprecate DAG.run method (#42417)
bd8d5eef86 is described below
commit bd8d5eef867384983ac5de927b24f2866f059e91
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Sep 23 18:52:48 2024 -0700
Deprecate DAG.run method (#42417)
This method relies on local backfill mode, which is slated for removal in
3.0. We have suitable alternatives such as DAG.test() and triggering dags via
API.
---
airflow/models/dag.py | 9 +++
tests/jobs/test_scheduler_job.py | 116 +++++++++++++++++++++++----------------
tests/models/test_dag.py | 20 ++++---
3 files changed, 90 insertions(+), 55 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c95d11f3ef..215dae298f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -28,6 +28,7 @@ import pickle
import sys
import time
import traceback
+import warnings
import weakref
from collections import abc, defaultdict, deque
from contextlib import ExitStack
@@ -88,6 +89,7 @@ from airflow.exceptions import (
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
ParamValidationError,
+ RemovedInAirflow3Warning,
TaskDeferred,
TaskNotFound,
UnknownExecutorException,
@@ -2331,6 +2333,13 @@ class DAG(LoggingMixin):
:param run_at_least_once: If true, always run the DAG at least once
even
if no logical run exists within the time range.
"""
+ warnings.warn(
+ "`DAG.run()` is deprecated and will be removed in Airflow 3.0.
Consider "
+ "using `DAG.test()` instead, or trigger your dag via API.",
+ RemovedInAirflow3Warning,
+ stacklevel=2,
+ )
+
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.backfill_job_runner import BackfillJobRunner
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 372a6ae2d9..9113a2dee1 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -42,7 +42,7 @@ from airflow.callbacks.pipe_callback_sink import
PipeCallbackSink
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.datasets import Dataset
from airflow.datasets.manager import DatasetManager
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import MOCK_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
@@ -2838,6 +2838,10 @@ class TestSchedulerJob:
This is hackish: a dag run is created but its tasks are
run by a backfill.
"""
+
+ # todo: AIP-78 remove along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+
if run_kwargs is None:
run_kwargs = {}
@@ -2898,40 +2902,49 @@ class TestSchedulerJob:
"""
DagRuns with one failed and one incomplete root task -> FAILED
"""
- self.evaluate_dagrun(
- dag_id="test_dagrun_states_fail",
- expected_task_states={
- "test_dagrun_fail": State.FAILED,
- "test_dagrun_succeed": State.UPSTREAM_FAILED,
- },
- dagrun_state=State.FAILED,
- )
+ # todo: AIP-78 remove along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+ with pytest.warns(RemovedInAirflow3Warning):
+ self.evaluate_dagrun(
+ dag_id="test_dagrun_states_fail",
+ expected_task_states={
+ "test_dagrun_fail": State.FAILED,
+ "test_dagrun_succeed": State.UPSTREAM_FAILED,
+ },
+ dagrun_state=State.FAILED,
+ )
def test_dagrun_success(self):
"""
DagRuns with one failed and one successful root task -> SUCCESS
"""
- self.evaluate_dagrun(
- dag_id="test_dagrun_states_success",
- expected_task_states={
- "test_dagrun_fail": State.FAILED,
- "test_dagrun_succeed": State.SUCCESS,
- },
- dagrun_state=State.SUCCESS,
- )
+ # todo: AIP-78 remove along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+ with pytest.warns(RemovedInAirflow3Warning):
+ self.evaluate_dagrun(
+ dag_id="test_dagrun_states_success",
+ expected_task_states={
+ "test_dagrun_fail": State.FAILED,
+ "test_dagrun_succeed": State.SUCCESS,
+ },
+ dagrun_state=State.SUCCESS,
+ )
def test_dagrun_root_fail(self):
"""
DagRuns with one successful and one failed root task -> FAILED
"""
- self.evaluate_dagrun(
- dag_id="test_dagrun_states_root_fail",
- expected_task_states={
- "test_dagrun_succeed": State.SUCCESS,
- "test_dagrun_fail": State.FAILED,
- },
- dagrun_state=State.FAILED,
- )
+ # todo: AIP-78 remove along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+ with pytest.warns(RemovedInAirflow3Warning):
+ self.evaluate_dagrun(
+ dag_id="test_dagrun_states_root_fail",
+ expected_task_states={
+ "test_dagrun_succeed": State.SUCCESS,
+ "test_dagrun_fail": State.FAILED,
+ },
+ dagrun_state=State.FAILED,
+ )
def test_dagrun_root_fail_unfinished(self):
"""
@@ -2952,9 +2965,12 @@ class TestSchedulerJob:
)
self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)
- for _ in _mock_executor(self.null_exec):
- with pytest.raises(AirflowException):
- dag.run(start_date=dr.execution_date,
end_date=dr.execution_date)
+ # todo: AIP-78 remove this test along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+ with pytest.warns(RemovedInAirflow3Warning):
+ for _ in _mock_executor(self.null_exec):
+ with pytest.raises(AirflowException):
+ dag.run(start_date=dr.execution_date,
end_date=dr.execution_date)
# Mark the successful task as never having run since we want to see if
the
# dagrun will be in a running state despite having an unfinished task.
@@ -2994,16 +3010,19 @@ class TestSchedulerJob:
if ignore_first_depends_on_past=True and the dagrun execution_date
is after the start_date.
"""
- self.evaluate_dagrun(
- dag_id="test_dagrun_states_deadlock",
- expected_task_states={
- "test_depends_on_past": State.SUCCESS,
- "test_depends_on_past_2": State.SUCCESS,
- },
- dagrun_state=State.SUCCESS,
- advance_execution_date=True,
- run_kwargs=dict(ignore_first_depends_on_past=True),
- )
+ # todo: AIP-78 remove along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+ with pytest.warns(RemovedInAirflow3Warning):
+ self.evaluate_dagrun(
+ dag_id="test_dagrun_states_deadlock",
+ expected_task_states={
+ "test_depends_on_past": State.SUCCESS,
+ "test_depends_on_past_2": State.SUCCESS,
+ },
+ dagrun_state=State.SUCCESS,
+ advance_execution_date=True,
+ run_kwargs=dict(ignore_first_depends_on_past=True),
+ )
def test_dagrun_deadlock_ignore_depends_on_past(self):
"""
@@ -3012,15 +3031,18 @@ class TestSchedulerJob:
test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except
that start_date == execution_date so depends_on_past is irrelevant).
"""
- self.evaluate_dagrun(
- dag_id="test_dagrun_states_deadlock",
- expected_task_states={
- "test_depends_on_past": State.SUCCESS,
- "test_depends_on_past_2": State.SUCCESS,
- },
- dagrun_state=State.SUCCESS,
- run_kwargs=dict(ignore_first_depends_on_past=True),
- )
+ # todo: AIP-78 remove along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+ with pytest.warns(RemovedInAirflow3Warning):
+ self.evaluate_dagrun(
+ dag_id="test_dagrun_states_deadlock",
+ expected_task_states={
+ "test_depends_on_past": State.SUCCESS,
+ "test_depends_on_past_2": State.SUCCESS,
+ },
+ dagrun_state=State.SUCCESS,
+ run_kwargs=dict(ignore_first_depends_on_past=True),
+ )
@pytest.mark.parametrize(
"configs",
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 90d956caeb..df4a892768 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -45,6 +45,7 @@ from airflow.exceptions import (
AirflowException,
DuplicateTaskIdFound,
ParamValidationError,
+ RemovedInAirflow3Warning,
UnknownExecutorException,
)
from airflow.executors import executor_loader
@@ -2733,14 +2734,17 @@ class TestDagModel:
@mock.patch("airflow.models.dag.run_job")
def test_dag_executors(self, run_job_mock):
- dag = DAG(dag_id="test", schedule=None)
- reload(executor_loader)
- with conf_vars({("core", "executor"): "SequentialExecutor"}):
- dag.run()
- assert
isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor,
SequentialExecutor)
-
- dag.run(local=True)
- assert
isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor)
+ # todo: AIP-78 remove along with DAG.run()
+ # this only tests the backfill job runner, not the scheduler
+ with pytest.warns(RemovedInAirflow3Warning):
+ dag = DAG(dag_id="test", schedule=None)
+ reload(executor_loader)
+ with conf_vars({("core", "executor"): "SequentialExecutor"}):
+ dag.run()
+ assert
isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor,
SequentialExecutor)
+
+ dag.run(local=True)
+ assert
isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor)
class TestQueries: