This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 606a26c254 Remove deprecations in BaseOperator for Airflow 3 (#41761)
606a26c254 is described below

commit 606a26c254ec6076b2d4a0ae2e00366f1ff6bbfb
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Aug 27 15:51:03 2024 +0200

    Remove deprecations in BaseOperator for Airflow 3 (#41761)
    
    * Remove deprecations in BaseOperator for Airflow 3
    
    * Fix pytests
---
 .pre-commit-config.yaml             |  1 +
 airflow/config_templates/config.yml | 13 ++------
 airflow/models/baseoperator.py      | 66 +------------------------------------
 newsfragments/41761.significant.rst |  7 ++++
 tests/models/test_baseoperator.py   | 24 +-------------
 5 files changed, 12 insertions(+), 99 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index bf8b140198..60249525d3 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -627,6 +627,7 @@ repos:
           ^docs/apache-airflow-providers-cncf-kubernetes/operators.rst$|
           ^docs/conf.py$|
           ^docs/exts/removemarktransform.py$|
+          ^newsfragments/41761.significant.rst$|
           ^scripts/ci/pre_commit/vendor_k8s_json_schema.py$|
           ^tests/|
           ^.pre-commit-config\.yaml$|
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index a458105d28..5240cd51e0 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1090,7 +1090,7 @@ metrics:
       version_added: 2.6.0
       type: string
       example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
-      or \"^scheduler,^executor,heartbeat|timeout\""
+        or \"^scheduler,^executor,heartbeat|timeout\""
       default: ""
     metrics_block_list:
       description: |
@@ -1104,7 +1104,7 @@ metrics:
       version_added: 2.6.0
       type: string
       example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
-      or \"^scheduler,^executor,heartbeat|timeout\""
+        or \"^scheduler,^executor,heartbeat|timeout\""
       default: ""
     statsd_on:
       description: |
@@ -1517,15 +1517,6 @@ operators:
       type: string
       example: ~
       default: "default"
-    allow_illegal_arguments:
-      description: |
-        Is allowed to pass additional/unused arguments (args, kwargs) to the 
BaseOperator operator.
-        If set to ``False``, an exception will be thrown,
-        otherwise only the console message will be displayed.
-      version_added: 2.0.0
-      type: string
-      example: ~
-      default: "False"
 webserver:
   description: ~
   options:
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 99ea294e1e..7d18aa1474 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -57,7 +57,6 @@ from airflow.configuration import conf
 from airflow.exceptions import (
     AirflowException,
     FailStopDagInvalidTriggerRule,
-    RemovedInAirflow3Warning,
     TaskDeferralError,
     TaskDeferred,
 )
@@ -897,7 +896,6 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         trigger_rule: str = DEFAULT_TRIGGER_RULE,
         resources: dict[str, Any] | None = None,
         run_as_user: str | None = None,
-        task_concurrency: int | None = None,
         map_index_template: str | None = None,
         max_active_tis_per_dag: int | None = None,
         max_active_tis_per_dagrun: int | None = None,
@@ -927,17 +925,9 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 
         kwargs.pop("_airflow_mapped_validation_only", None)
         if kwargs:
-            if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"):
-                raise AirflowException(
-                    f"Invalid arguments were passed to 
{self.__class__.__name__} (task_id: {task_id}). "
-                    f"Invalid arguments were:\n**kwargs: {kwargs}",
-                )
-            warnings.warn(
+            raise AirflowException(
                 f"Invalid arguments were passed to {self.__class__.__name__} 
(task_id: {task_id}). "
-                "Support for passing such arguments will be dropped in future. 
"
                 f"Invalid arguments were:\n**kwargs: {kwargs}",
-                category=RemovedInAirflow3Warning,
-                stacklevel=3,
             )
         validate_key(task_id)
 
@@ -987,23 +977,6 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
             raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot 
be less than 1")
         self.sla = sla
 
-        if trigger_rule == "dummy":
-            warnings.warn(
-                "dummy Trigger Rule is deprecated. Please use 
`TriggerRule.ALWAYS`.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-            trigger_rule = TriggerRule.ALWAYS
-
-        if trigger_rule == "none_failed_or_skipped":
-            warnings.warn(
-                "none_failed_or_skipped Trigger Rule is deprecated. "
-                "Please use `none_failed_min_one_success`.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-            trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
-
         if not TriggerRule.is_valid(trigger_rule):
             raise AirflowException(
                 f"The trigger_rule must be one of 
{TriggerRule.all_triggers()},"
@@ -1038,14 +1011,6 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         self.priority_weight = priority_weight
         self.weight_rule = 
validate_and_load_priority_weight_strategy(weight_rule)
         self.resources = coerce_resources(resources)
-        if task_concurrency and not max_active_tis_per_dag:
-            # TODO: Remove in Airflow 3.0
-            warnings.warn(
-                "The 'task_concurrency' parameter is deprecated. Please use 
'max_active_tis_per_dag'.",
-                RemovedInAirflow3Warning,
-                stacklevel=2,
-            )
-            max_active_tis_per_dag = task_concurrency
         self.max_active_tis_per_dag: int | None = max_active_tis_per_dag
         self.max_active_tis_per_dagrun: int | None = max_active_tis_per_dagrun
         self.do_xcom_push: bool = do_xcom_push
@@ -2081,32 +2046,3 @@ def chain_linear(*elements: DependencyMixin | 
Sequence[DependencyMixin]):
         prev_elem = [curr_elem] if isinstance(curr_elem, DependencyMixin) else 
curr_elem
     if not deps_set:
         raise ValueError("No dependencies were set. Did you forget to expand 
with `*`?")
-
-
-def __getattr__(name):
-    """
-    PEP-562: Lazy loaded attributes on python modules.
-
-    :meta private:
-    """
-    path = __deprecated_imports.get(name)
-    if not path:
-        raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
-
-    from airflow.utils.module_loading import import_string
-
-    warnings.warn(
-        f"Import `{__name__}.{name}` is deprecated. Please use 
`{path}.{name}`.",
-        RemovedInAirflow3Warning,
-        stacklevel=2,
-    )
-    val = import_string(f"{path}.{name}")
-
-    # Store for next time
-    globals()[name] = val
-    return val
-
-
-__deprecated_imports = {
-    "BaseOperatorLink": "airflow.models.baseoperatorlink",
-}
diff --git a/newsfragments/41761.significant.rst 
b/newsfragments/41761.significant.rst
new file mode 100644
index 0000000000..c54f53dd51
--- /dev/null
+++ b/newsfragments/41761.significant.rst
@@ -0,0 +1,7 @@
+Removed a set of deprecations in BaseOperator.
+
+- Parameter ``task_concurrency`` removed, please use 
``max_active_tis_per_dag``.
+- Support for additional (not defined) arguments removed.
+- Support for trigger rule ``dummy`` removed. Please use ``always``.
+- Support for trigger rule ``none_failed_or_skipped`` removed. Please use 
``none_failed_min_one_success``.
+- Support to load ``BaseOperatorLink`` via ``airflow.models.baseoperator`` 
module removed.
diff --git a/tests/models/test_baseoperator.py 
b/tests/models/test_baseoperator.py
index 48aaf2699b..d645b221a5 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -29,7 +29,7 @@ import jinja2
 import pytest
 
 from airflow.decorators import task as task_decorator
-from airflow.exceptions import AirflowException, 
FailStopDagInvalidTriggerRule, RemovedInAirflow3Warning
+from airflow.exceptions import AirflowException, FailStopDagInvalidTriggerRule
 from airflow.lineage.entities import File
 from airflow.models.baseoperator import (
     BASEOPERATOR_ARGS_EXPECTED_TYPES,
@@ -50,7 +50,6 @@ from airflow.utils.template import literal
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import DagRunType
 from tests.models import DEFAULT_DATE
-from tests.test_utils.config import conf_vars
 from tests.test_utils.mock_operators import DeprecatedOperator, MockOperator
 
 if TYPE_CHECKING:
@@ -158,18 +157,6 @@ class TestBaseOperator:
         with pytest.raises(AirflowException, match=error_msg):
             BaseOperator(task_id="test_op", priority_weight="2")
 
-    def test_illegal_args(self):
-        """
-        Tests that Operators reject illegal arguments
-        """
-        msg = r"Invalid arguments were passed to BaseOperator \(task_id: 
test_illegal_args\)"
-        with conf_vars({("operators", "allow_illegal_arguments"): "True"}):
-            with pytest.warns(RemovedInAirflow3Warning, match=msg):
-                BaseOperator(
-                    task_id="test_illegal_args",
-                    illegal_argument_1234="hello?",
-                )
-
     def test_illegal_args_forbidden(self):
         """
         Tests that operators raise exceptions on illegal arguments when
@@ -802,15 +789,6 @@ class TestBaseOperator:
         ):
             BaseOperator(task_id="op1", trigger_rule="some_rule")
 
-    @pytest.mark.parametrize(("rule"), [("dummy"), (TriggerRule.DUMMY)])
-    def test_replace_dummy_trigger_rule(self, rule):
-        with pytest.warns(
-            DeprecationWarning, match="dummy Trigger Rule is deprecated. 
Please use `TriggerRule.ALWAYS`."
-        ):
-            op1 = BaseOperator(task_id="op1", trigger_rule=rule)
-
-            assert op1.trigger_rule == TriggerRule.ALWAYS
-
     def test_weight_rule_default(self):
         op = BaseOperator(task_id="test_task")
         assert _DownstreamPriorityWeightStrategy() == op.weight_rule

Reply via email to