This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 606a26c254 Remove deprecations in BaseOperator for Airflow 3 (#41761)
606a26c254 is described below
commit 606a26c254ec6076b2d4a0ae2e00366f1ff6bbfb
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Aug 27 15:51:03 2024 +0200
Remove deprecations in BaseOperator for Airflow 3 (#41761)
* Remove deprecations in BaseOperator for Airflow 3
* Fix pytests
---
.pre-commit-config.yaml | 1 +
airflow/config_templates/config.yml | 13 ++------
airflow/models/baseoperator.py | 66 +------------------------------------
newsfragments/41761.significant.rst | 7 ++++
tests/models/test_baseoperator.py | 24 +-------------
5 files changed, 12 insertions(+), 99 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index bf8b140198..60249525d3 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -627,6 +627,7 @@ repos:
^docs/apache-airflow-providers-cncf-kubernetes/operators.rst$|
^docs/conf.py$|
^docs/exts/removemarktransform.py$|
+ ^newsfragments/41761.significant.rst$|
^scripts/ci/pre_commit/vendor_k8s_json_schema.py$|
^tests/|
^.pre-commit-config\.yaml$|
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index a458105d28..5240cd51e0 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1090,7 +1090,7 @@ metrics:
version_added: 2.6.0
type: string
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
- or \"^scheduler,^executor,heartbeat|timeout\""
+ or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
metrics_block_list:
description: |
@@ -1104,7 +1104,7 @@ metrics:
version_added: 2.6.0
type: string
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
- or \"^scheduler,^executor,heartbeat|timeout\""
+ or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
statsd_on:
description: |
@@ -1517,15 +1517,6 @@ operators:
type: string
example: ~
default: "default"
- allow_illegal_arguments:
- description: |
- Is allowed to pass additional/unused arguments (args, kwargs) to the
BaseOperator operator.
- If set to ``False``, an exception will be thrown,
- otherwise only the console message will be displayed.
- version_added: 2.0.0
- type: string
- example: ~
- default: "False"
webserver:
description: ~
options:
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 99ea294e1e..7d18aa1474 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -57,7 +57,6 @@ from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
FailStopDagInvalidTriggerRule,
- RemovedInAirflow3Warning,
TaskDeferralError,
TaskDeferred,
)
@@ -897,7 +896,6 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
trigger_rule: str = DEFAULT_TRIGGER_RULE,
resources: dict[str, Any] | None = None,
run_as_user: str | None = None,
- task_concurrency: int | None = None,
map_index_template: str | None = None,
max_active_tis_per_dag: int | None = None,
max_active_tis_per_dagrun: int | None = None,
@@ -927,17 +925,9 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
kwargs.pop("_airflow_mapped_validation_only", None)
if kwargs:
- if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"):
- raise AirflowException(
- f"Invalid arguments were passed to
{self.__class__.__name__} (task_id: {task_id}). "
- f"Invalid arguments were:\n**kwargs: {kwargs}",
- )
- warnings.warn(
+ raise AirflowException(
f"Invalid arguments were passed to {self.__class__.__name__}
(task_id: {task_id}). "
- "Support for passing such arguments will be dropped in future.
"
f"Invalid arguments were:\n**kwargs: {kwargs}",
- category=RemovedInAirflow3Warning,
- stacklevel=3,
)
validate_key(task_id)
@@ -987,23 +977,6 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot
be less than 1")
self.sla = sla
- if trigger_rule == "dummy":
- warnings.warn(
- "dummy Trigger Rule is deprecated. Please use
`TriggerRule.ALWAYS`.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- trigger_rule = TriggerRule.ALWAYS
-
- if trigger_rule == "none_failed_or_skipped":
- warnings.warn(
- "none_failed_or_skipped Trigger Rule is deprecated. "
- "Please use `none_failed_min_one_success`.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
-
if not TriggerRule.is_valid(trigger_rule):
raise AirflowException(
f"The trigger_rule must be one of
{TriggerRule.all_triggers()},"
@@ -1038,14 +1011,6 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
self.priority_weight = priority_weight
self.weight_rule =
validate_and_load_priority_weight_strategy(weight_rule)
self.resources = coerce_resources(resources)
- if task_concurrency and not max_active_tis_per_dag:
- # TODO: Remove in Airflow 3.0
- warnings.warn(
- "The 'task_concurrency' parameter is deprecated. Please use
'max_active_tis_per_dag'.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- max_active_tis_per_dag = task_concurrency
self.max_active_tis_per_dag: int | None = max_active_tis_per_dag
self.max_active_tis_per_dagrun: int | None = max_active_tis_per_dagrun
self.do_xcom_push: bool = do_xcom_push
@@ -2081,32 +2046,3 @@ def chain_linear(*elements: DependencyMixin |
Sequence[DependencyMixin]):
prev_elem = [curr_elem] if isinstance(curr_elem, DependencyMixin) else
curr_elem
if not deps_set:
raise ValueError("No dependencies were set. Did you forget to expand
with `*`?")
-
-
-def __getattr__(name):
- """
- PEP-562: Lazy loaded attributes on python modules.
-
- :meta private:
- """
- path = __deprecated_imports.get(name)
- if not path:
- raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
-
- from airflow.utils.module_loading import import_string
-
- warnings.warn(
- f"Import `{__name__}.{name}` is deprecated. Please use
`{path}.{name}`.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- val = import_string(f"{path}.{name}")
-
- # Store for next time
- globals()[name] = val
- return val
-
-
-__deprecated_imports = {
- "BaseOperatorLink": "airflow.models.baseoperatorlink",
-}
diff --git a/newsfragments/41761.significant.rst
b/newsfragments/41761.significant.rst
new file mode 100644
index 0000000000..c54f53dd51
--- /dev/null
+++ b/newsfragments/41761.significant.rst
@@ -0,0 +1,7 @@
+Removed a set of deprecations in BaseOperator.
+
+- Parameter ``task_concurrency`` removed, please use
``max_active_tis_per_dag``.
+- Support for additional (not defined) arguments removed.
+- Support for trigger rule ``dummy`` removed. Please use ``always``.
+- Support for trigger rule ``none_failed_or_skipped`` removed. Please use
``none_failed_min_one_success``.
+- Support to load ``BaseOperatorLink`` via ``airflow.models.baseoperator``
module removed.
diff --git a/tests/models/test_baseoperator.py
b/tests/models/test_baseoperator.py
index 48aaf2699b..d645b221a5 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -29,7 +29,7 @@ import jinja2
import pytest
from airflow.decorators import task as task_decorator
-from airflow.exceptions import AirflowException,
FailStopDagInvalidTriggerRule, RemovedInAirflow3Warning
+from airflow.exceptions import AirflowException, FailStopDagInvalidTriggerRule
from airflow.lineage.entities import File
from airflow.models.baseoperator import (
BASEOPERATOR_ARGS_EXPECTED_TYPES,
@@ -50,7 +50,6 @@ from airflow.utils.template import literal
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import DagRunType
from tests.models import DEFAULT_DATE
-from tests.test_utils.config import conf_vars
from tests.test_utils.mock_operators import DeprecatedOperator, MockOperator
if TYPE_CHECKING:
@@ -158,18 +157,6 @@ class TestBaseOperator:
with pytest.raises(AirflowException, match=error_msg):
BaseOperator(task_id="test_op", priority_weight="2")
- def test_illegal_args(self):
- """
- Tests that Operators reject illegal arguments
- """
- msg = r"Invalid arguments were passed to BaseOperator \(task_id:
test_illegal_args\)"
- with conf_vars({("operators", "allow_illegal_arguments"): "True"}):
- with pytest.warns(RemovedInAirflow3Warning, match=msg):
- BaseOperator(
- task_id="test_illegal_args",
- illegal_argument_1234="hello?",
- )
-
def test_illegal_args_forbidden(self):
"""
Tests that operators raise exceptions on illegal arguments when
@@ -802,15 +789,6 @@ class TestBaseOperator:
):
BaseOperator(task_id="op1", trigger_rule="some_rule")
- @pytest.mark.parametrize(("rule"), [("dummy"), (TriggerRule.DUMMY)])
- def test_replace_dummy_trigger_rule(self, rule):
- with pytest.warns(
- DeprecationWarning, match="dummy Trigger Rule is deprecated.
Please use `TriggerRule.ALWAYS`."
- ):
- op1 = BaseOperator(task_id="op1", trigger_rule=rule)
-
- assert op1.trigger_rule == TriggerRule.ALWAYS
-
def test_weight_rule_default(self):
op = BaseOperator(task_id="test_task")
assert _DownstreamPriorityWeightStrategy() == op.weight_rule