This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 9d66c483a5d fix: rm `skip_if` and `run_if` in python source (#41832)
(#45680)
9d66c483a5d is described below
commit 9d66c483a5d012656b5a343e89d8bb538ea9644c
Author: Josix <[email protected]>
AuthorDate: Thu Jan 16 02:17:47 2025 +0800
fix: rm `skip_if` and `run_if` in python source (#41832) (#45680)
Co-authored-by: phi-friday <[email protected]>
---
airflow/utils/decorators.py | 2 +-
tests/utils/test_decorators.py | 128 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 129 insertions(+), 1 deletion(-)
diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py
index 77a5eddaf08..e6981256ebb 100644
--- a/airflow/utils/decorators.py
+++ b/airflow/utils/decorators.py
@@ -81,7 +81,7 @@ def remove_task_decorator(python_source: str,
task_decorator_name: str) -> str:
after_decorator = after_decorator[1:]
return before_decorator + after_decorator
- decorators = ["@setup", "@teardown", task_decorator_name]
+ decorators = ["@setup", "@teardown", "@task.skip_if", "@task.run_if",
task_decorator_name]
for decorator in decorators:
python_source = _remove_task_decorator(python_source, decorator)
return python_source
diff --git a/tests/utils/test_decorators.py b/tests/utils/test_decorators.py
new file mode 100644
index 00000000000..19d3ec31d03
--- /dev/null
+++ b/tests/utils/test_decorators.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pytest
+
+from airflow.decorators import task
+
+if TYPE_CHECKING:
+ from airflow.decorators.base import Task, TaskDecorator
+
+_CONDITION_DECORATORS = frozenset({"skip_if", "run_if"})
+_NO_SOURCE_DECORATORS = frozenset({"sensor"})
+DECORATORS = sorted(
+ set(x for x in dir(task) if not x.startswith("_")) - _CONDITION_DECORATORS
- _NO_SOURCE_DECORATORS
+)
+DECORATORS_USING_SOURCE = ("external_python", "virtualenv",
"branch_virtualenv", "branch_external_python")
+
+
[email protected]
+def decorator(request: pytest.FixtureRequest) -> TaskDecorator:
+ decorator_factory = getattr(task, request.param)
+
+ kwargs = {}
+ if "external" in request.param:
+ kwargs["python"] = "python3"
+ return decorator_factory(**kwargs)
+
+
[email protected]("decorator", DECORATORS_USING_SOURCE,
indirect=["decorator"])
+def test_task_decorator_using_source(decorator: TaskDecorator):
+ @decorator
+ def f():
+ return ["some_task"]
+
+ assert parse_python_source(f, "decorator") == 'def f():\n return
["some_task"]\n'
+
+
[email protected]("decorator", DECORATORS, indirect=["decorator"])
+def test_skip_if(decorator: TaskDecorator):
+ @task.skip_if(lambda context: True)
+ @decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f, "decorator") == 'def f():\n return "hello
world"\n'
+
+
[email protected]("decorator", DECORATORS, indirect=["decorator"])
+def test_run_if(decorator: TaskDecorator):
+ @task.run_if(lambda context: True)
+ @decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f, "decorator") == 'def f():\n return "hello
world"\n'
+
+
+def test_skip_if_and_run_if():
+ @task.skip_if(lambda context: True)
+ @task.run_if(lambda context: True)
+ @task.virtualenv()
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == 'def f():\n return "hello world"\n'
+
+
+def test_run_if_and_skip_if():
+ @task.run_if(lambda context: True)
+ @task.skip_if(lambda context: True)
+ @task.virtualenv()
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == 'def f():\n return "hello world"\n'
+
+
+def test_skip_if_allow_decorator():
+ def non_task_decorator(func):
+ return func
+
+ @task.skip_if(lambda context: True)
+ @task.virtualenv()
+ @non_task_decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == '@non_task_decorator\ndef f():\n
return "hello world"\n'
+
+
+def test_run_if_allow_decorator():
+ def non_task_decorator(func):
+ return func
+
+ @task.run_if(lambda context: True)
+ @task.virtualenv()
+ @non_task_decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == '@non_task_decorator\ndef f():\n
return "hello world"\n'
+
+
+def parse_python_source(task: Task, custom_operator_name: str | None = None)
-> str:
+ operator = task().operator
+ if custom_operator_name:
+ custom_operator_name = (
+ custom_operator_name if custom_operator_name.startswith("@") else
f"@{custom_operator_name}"
+ )
+ operator.__dict__["custom_operator_name"] = custom_operator_name
+ return operator.get_python_source()