This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e7bbf829a Remove deprecations in airflow.models.skipmixin (#41780)
1e7bbf829a is described below
commit 1e7bbf829ad6c9377f13b2c9db9e2e78a5806256
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Aug 27 14:33:59 2024 +0200
Remove deprecations in airflow.models.skipmixin (#41780)
* Remove deprecations in airflow.models.skipmixin
* Fix pytests
---
airflow/models/skipmixin.py | 33 +++------------------------------
airflow/operators/python.py | 5 +----
newsfragments/41780.significant.rst | 1 +
tests/models/test_skipmixin.py | 33 +++------------------------------
tests/operators/test_python.py | 2 --
5 files changed, 8 insertions(+), 66 deletions(-)
diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py
index 1ed56a43bf..12cbdb380b 100644
--- a/airflow/models/skipmixin.py
+++ b/airflow/models/skipmixin.py
@@ -17,14 +17,13 @@
# under the License.
from __future__ import annotations
-import warnings
from types import GeneratorType
from typing import TYPE_CHECKING, Iterable, Sequence
-from sqlalchemy import select, update
+from sqlalchemy import update
from airflow.api_internal.internal_api_call import internal_api_call
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
+from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -33,7 +32,6 @@ from airflow.utils.sqlalchemy import tuple_in_condition
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
- from pendulum import DateTime
from sqlalchemy import Session
from airflow.models.dagrun import DagRun
@@ -98,16 +96,13 @@ class SkipMixin(LoggingMixin):
def skip(
self,
dag_run: DagRun | DagRunPydantic,
- execution_date: DateTime,
tasks: Iterable[DAGNode],
map_index: int = -1,
):
"""Facade for compatibility for call to internal API."""
# SkipMixin may not necessarily have a task_id attribute. Only store
to XCom if one is available.
task_id: str | None = getattr(self, "task_id", None)
- SkipMixin._skip(
- dag_run=dag_run, task_id=task_id, execution_date=execution_date,
tasks=tasks, map_index=map_index
- )
+ SkipMixin._skip(dag_run=dag_run, task_id=task_id, tasks=tasks,
map_index=map_index)
@staticmethod
@internal_api_call
@@ -115,7 +110,6 @@ class SkipMixin(LoggingMixin):
def _skip(
dag_run: DagRun | DagRunPydantic,
task_id: str | None,
- execution_date: DateTime,
tasks: Iterable[DAGNode],
session: Session = NEW_SESSION,
map_index: int = -1,
@@ -128,7 +122,6 @@ class SkipMixin(LoggingMixin):
are cleared.
:param dag_run: the DagRun for which to set the tasks to skipped
- :param execution_date: execution_date
:param tasks: tasks to skip (not task_ids)
:param session: db session to use
:param map_index: map_index of the current task instance
@@ -137,26 +130,6 @@ class SkipMixin(LoggingMixin):
if not task_list:
return
- if execution_date and not dag_run:
- from airflow.models.dagrun import DagRun
-
- warnings.warn(
- "Passing an execution_date to `skip()` is deprecated in favour
of passing a dag_run",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
-
- dag_run = session.scalars(
- select(DagRun).where(
- DagRun.dag_id == task_list[0].dag_id,
DagRun.execution_date == execution_date
- )
- ).one()
-
- elif execution_date and dag_run and execution_date !=
dag_run.execution_date:
- raise ValueError(
- "execution_date has a different value to
dag_run.execution_date -- please only pass dag_run"
- )
-
if dag_run is None:
raise ValueError("dag_run is required")
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 09b2644bee..47e0cf3193 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -32,7 +32,7 @@ from abc import ABCMeta, abstractmethod
from collections.abc import Container
from pathlib import Path
from tempfile import TemporaryDirectory
-from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable,
Mapping, NamedTuple, Sequence, cast
+from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable,
Mapping, NamedTuple, Sequence
import lazy_object_proxy
@@ -63,8 +63,6 @@ from airflow.utils.session import create_session
log = logging.getLogger(__name__)
if TYPE_CHECKING:
- from pendulum.datetime import DateTime
-
from airflow.serialization.enums import Encoding
from airflow.utils.context import Context
@@ -341,7 +339,6 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
self.skip(
dag_run=dag_run,
- execution_date=cast("DateTime", dag_run.execution_date),
tasks=to_skip,
map_index=context["ti"].map_index,
)
diff --git a/newsfragments/41780.significant.rst
b/newsfragments/41780.significant.rst
new file mode 100644
index 0000000000..2e7aed291c
--- /dev/null
+++ b/newsfragments/41780.significant.rst
@@ -0,0 +1 @@
+Remove deprecated support for passing ``execution_date`` to
``airflow.models.skipmixin.SkipMixin.skip()``.
diff --git a/tests/models/test_skipmixin.py b/tests/models/test_skipmixin.py
index 62d8c4d059..c24bfb7c86 100644
--- a/tests/models/test_skipmixin.py
+++ b/tests/models/test_skipmixin.py
@@ -20,12 +20,11 @@ from __future__ import annotations
import datetime
from unittest.mock import Mock, patch
-import pendulum
import pytest
from airflow import settings
from airflow.decorators import task, task_group
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
+from airflow.exceptions import AirflowException
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import TaskInstance as TI
from airflow.operators.empty import EmptyOperator
@@ -66,7 +65,7 @@ class TestSkipMixin:
execution_date=now,
state=State.FAILED,
)
- SkipMixin().skip(dag_run=dag_run, execution_date=now, tasks=tasks)
+ SkipMixin().skip(dag_run=dag_run, tasks=tasks)
session.query(TI).filter(
TI.dag_id == "dag",
@@ -76,35 +75,9 @@ class TestSkipMixin:
TI.end_date == now,
).one()
- @pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode
- @patch("airflow.utils.timezone.utcnow")
- def test_skip_none_dagrun(self, mock_now, dag_maker):
- now = datetime.datetime.now(tz=pendulum.timezone("UTC"))
- mock_now.return_value = now
- with dag_maker(
- "dag",
- ):
- tasks = [EmptyOperator(task_id="task")]
- dag_maker.create_dagrun(execution_date=now)
-
- with pytest.warns(
- RemovedInAirflow3Warning,
- match=r"Passing an execution_date to `skip\(\)` is deprecated in
favour of passing a dag_run",
- ):
- SkipMixin().skip(dag_run=None, execution_date=now, tasks=tasks)
-
- session = dag_maker.session
- session.query(TI).filter(
- TI.dag_id == "dag",
- TI.task_id == "task",
- TI.state == State.SKIPPED,
- TI.start_date == now,
- TI.end_date == now,
- ).one()
-
def test_skip_none_tasks(self):
session = Mock()
- SkipMixin().skip(dag_run=None, execution_date=None, tasks=[])
+ SkipMixin().skip(dag_run=None, tasks=[])
assert not session.query.called
assert not session.commit.called
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 107adcc12c..0f52f16b87 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -2098,7 +2098,6 @@ class TestShortCircuitWithTeardown:
# we can't use assert_called_with because it's a set and therefore
not ordered
actual_kwargs = op1.skip.call_args.kwargs
actual_skipped = set(actual_kwargs["tasks"])
- assert actual_kwargs["execution_date"] == dagrun.logical_date
assert actual_skipped == {op3}
@pytest.mark.skip_if_database_isolation_mode # tests pure logic with
run() method, mix of pydantic and mock fails
@@ -2139,7 +2138,6 @@ class TestShortCircuitWithTeardown:
else:
assert isinstance(actual_skipped, Generator)
assert set(actual_skipped) == {op3}
- assert actual_kwargs["execution_date"] == dagrun.logical_date
@pytest.mark.parametrize(