This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 27fe45bdfe Remove deprecated parameters from airflow (core) Operators
(#41736)
27fe45bdfe is described below
commit 27fe45bdfe25eabba24c9d4de0b2e1807ea36840
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Aug 27 15:55:41 2024 +0200
Remove deprecated parameters from airflow (core) Operators (#41736)
---
airflow/operators/datetime.py | 11 +----------
airflow/operators/trigger_dagrun.py | 12 ------------
airflow/operators/weekday.py | 11 -----------
newsfragments/41736.significant.rst | 7 +++++++
tests/operators/test_datetime.py | 16 ----------------
tests/operators/test_trigger_dagrun.py | 29 +----------------------------
tests/operators/test_weekday.py | 20 --------------------
7 files changed, 9 insertions(+), 97 deletions(-)
diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py
index 732b380077..4455b84dd3 100644
--- a/airflow/operators/datetime.py
+++ b/airflow/operators/datetime.py
@@ -17,10 +17,9 @@
from __future__ import annotations
import datetime
-import warnings
from typing import TYPE_CHECKING, Iterable
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
+from airflow.exceptions import AirflowException
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone
@@ -56,7 +55,6 @@ class BranchDateTimeOperator(BaseBranchOperator):
target_lower: datetime.datetime | datetime.time | None,
target_upper: datetime.datetime | datetime.time | None,
use_task_logical_date: bool = False,
- use_task_execution_date: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -71,13 +69,6 @@ class BranchDateTimeOperator(BaseBranchOperator):
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.use_task_logical_date = use_task_logical_date
- if use_task_execution_date:
- self.use_task_logical_date = use_task_execution_date
- warnings.warn(
- "Parameter ``use_task_execution_date`` is deprecated. Use
``use_task_logical_date``.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
def choose_branch(self, context: Context) -> str | Iterable[str]:
if self.use_task_logical_date:
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index 2521297dcf..539506ff9b 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -20,7 +20,6 @@ from __future__ import annotations
import datetime
import json
import time
-import warnings
from typing import TYPE_CHECKING, Any, Sequence, cast
from sqlalchemy import select
@@ -34,7 +33,6 @@ from airflow.exceptions import (
AirflowSkipException,
DagNotFound,
DagRunAlreadyExists,
- RemovedInAirflow3Warning,
)
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
@@ -110,7 +108,6 @@ class TriggerDagRunOperator(BaseOperator):
DAG for the same logical date already exists.
:param deferrable: If waiting for completion, whether or not to defer the
task until done,
default is ``False``.
- :param execution_date: Deprecated parameter; same as ``logical_date``.
"""
template_fields: Sequence[str] = (
@@ -139,7 +136,6 @@ class TriggerDagRunOperator(BaseOperator):
failed_states: list[str | DagRunState] | None = None,
skip_when_already_exists: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
- execution_date: str | datetime.datetime | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -160,14 +156,6 @@ class TriggerDagRunOperator(BaseOperator):
self.skip_when_already_exists = skip_when_already_exists
self._defer = deferrable
- if execution_date is not None:
- warnings.warn(
- "Parameter 'execution_date' is deprecated. Use 'logical_date'
instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- logical_date = execution_date
-
if logical_date is not None and not isinstance(logical_date, (str,
datetime.datetime)):
type_name = type(logical_date).__name__
raise TypeError(
diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py
index af3e332899..f59a1da134 100644
--- a/airflow/operators/weekday.py
+++ b/airflow/operators/weekday.py
@@ -17,10 +17,8 @@
# under the License.
from __future__ import annotations
-import warnings
from typing import TYPE_CHECKING, Iterable
-from airflow.exceptions import RemovedInAirflow3Warning
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.weekday import WeekDay
@@ -91,7 +89,6 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
:param use_task_logical_date: If ``True``, uses task's logical date to
compare
with is_today. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week.
- :param use_task_execution_day: deprecated parameter, same effect as
`use_task_logical_date`
"""
def __init__(
@@ -101,7 +98,6 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
follow_task_ids_if_false: str | Iterable[str],
week_day: str | Iterable[str] | WeekDay | Iterable[WeekDay],
use_task_logical_date: bool = False,
- use_task_execution_day: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -109,13 +105,6 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
self.follow_task_ids_if_false = follow_task_ids_if_false
self.week_day = week_day
self.use_task_logical_date = use_task_logical_date
- if use_task_execution_day:
- self.use_task_logical_date = use_task_execution_day
- warnings.warn(
- "Parameter ``use_task_execution_day`` is deprecated. Use
``use_task_logical_date``.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
self._week_day_num = WeekDay.validate_week_day(week_day)
def choose_branch(self, context: Context) -> str | Iterable[str]:
diff --git a/newsfragments/41736.significant.rst
b/newsfragments/41736.significant.rst
new file mode 100644
index 0000000000..2c90979d80
--- /dev/null
+++ b/newsfragments/41736.significant.rst
@@ -0,0 +1,7 @@
+Removed deprecated parameters from core-operators.
+
+Parameters removed:
+
+- airflow.operators.datetime.BranchDateTimeOperator: use_task_execution_date
+- airflow.operators.trigger_dagrun.TriggerDagRunOperator: execution_date
+- airflow.operators.weekday.BranchDayOfWeekOperator: use_task_execution_day
diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py
index e4294a8856..e090d11ebe 100644
--- a/tests/operators/test_datetime.py
+++ b/tests/operators/test_datetime.py
@@ -253,19 +253,3 @@ class TestBranchDateTimeOperator:
"branch_2": State.SKIPPED,
}
)
-
- def test_deprecation_warning(self):
- warning_message = (
- """Parameter ``use_task_execution_date`` is deprecated. Use
``use_task_logical_date``."""
- )
- with pytest.warns(DeprecationWarning) as warnings:
- BranchDateTimeOperator(
- task_id="warning",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0),
- target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0),
- use_task_execution_date=True,
- dag=self.dag,
- )
- assert warning_message == str(warnings[0].message)
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index 9ec22e7e7a..52a11d10e5 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -24,7 +24,7 @@ from unittest import mock
import pendulum
import pytest
-from airflow.exceptions import AirflowException, DagRunAlreadyExists,
RemovedInAirflow3Warning, TaskDeferred
+from airflow.exceptions import AirflowException, DagRunAlreadyExists,
TaskDeferred
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
@@ -383,7 +383,6 @@ class TestDagRunOperator:
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
trigger_run_id="dummy_run_id",
- execution_date=None,
reset_dag_run=False,
skip_when_already_exists=True,
)
@@ -643,32 +642,6 @@ class TestDagRunOperator:
with pytest.raises(AirflowException, match="failed with failed state"):
task.execute_complete(context={}, event=trigger.serialize())
- def test_trigger_dagrun_with_execution_date(self, dag_maker):
- """Test TriggerDagRunOperator with custom execution_date (deprecated
parameter)"""
- custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
- with dag_maker(
- TEST_DAG_ID, default_args={"owner": "airflow", "start_date":
DEFAULT_DATE}, serialized=True
- ) as dag:
- with pytest.warns(
- RemovedInAirflow3Warning,
- match="Parameter 'execution_date' is deprecated. Use
'logical_date' instead.",
- ):
- task = TriggerDagRunOperator(
- task_id="test_trigger_dagrun_with_execution_date",
- trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=custom_execution_date,
- )
- self.re_sync_triggered_dag_to_db(dag, dag_maker)
- dag_maker.create_dagrun()
- task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
-
- with create_session() as session:
- dagrun = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).one()
- assert dagrun.external_trigger
- assert dagrun.logical_date == custom_execution_date
- assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
custom_execution_date)
- self.assert_extra_link(dagrun, task, session)
-
@pytest.mark.skip_if_database_isolation_mode # Known to be broken in db
isolation mode
@pytest.mark.parametrize(
argnames=["trigger_logical_date"],
diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py
index 9030942c65..176246cf7b 100644
--- a/tests/operators/test_weekday.py
+++ b/tests/operators/test_weekday.py
@@ -285,23 +285,3 @@ class TestBranchDayOfWeekOperator:
for ti in tis:
if ti.task_id == "make_choice":
assert ti.xcom_pull(task_ids="make_choice") == "branch_1"
-
- def test_deprecation_warning(self, dag_maker):
- warning_message = (
- """Parameter ``use_task_execution_day`` is deprecated. Use
``use_task_logical_date``."""
- )
- with pytest.warns(DeprecationWarning) as warnings:
- with dag_maker(
- "branch_day_of_week_operator_test",
- start_date=DEFAULT_DATE,
- schedule=INTERVAL,
- serialized=True,
- ):
- BranchDayOfWeekOperator(
- task_id="week_day_warn",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- week_day="Monday",
- use_task_execution_day=True,
- )
- assert warning_message == str(warnings[0].message)