This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b5c64c8145 Update logic and fix false-positive of dag version 
inflation checker (#61345)
6b5c64c8145 is described below

commit 6b5c64c8145431dfaa30cef3d93021d8acc5e249
Author: Jeongwoo Do <[email protected]>
AuthorDate: Mon Mar 30 17:35:00 2026 +0900

    Update logic and fix false-positive of dag version inflation checker 
(#61345)
    
    * change logic to detect dag version inflation
    
    * fix logics
    
    * fix logic for task_name
---
 .../airflow/utils/dag_version_inflation_checker.py | 97 ++++++++++++++++-----
 .../utils/test_dag_version_inflation_checker.py    | 99 +++++++++++++++++++---
 2 files changed, 167 insertions(+), 29 deletions(-)

diff --git a/airflow-core/src/airflow/utils/dag_version_inflation_checker.py 
b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py
index ca7f58b2602..f11161116ff 100644
--- a/airflow-core/src/airflow/utils/dag_version_inflation_checker.py
+++ b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py
@@ -59,7 +59,7 @@ class DagVersionInflationCheckResult:
 
     def __init__(self, check_level: DagVersionInflationCheckLevel):
         self.check_level: DagVersionInflationCheckLevel = check_level
-        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.warnings: dict[int, RuntimeVaryingValueWarning] = {}
         self.runtime_varying_values: dict = {}
 
     def format_warnings(self) -> str | None:
@@ -72,7 +72,7 @@ class DagVersionInflationCheckResult:
             "It causes the Dag version to increase as values change on every 
Dag parse.",
             "",
         ]
-        for w in self.warnings:
+        for w in self.warnings.values():
             lines.extend(
                 [
                     f"Line {w.line}, Col {w.col}",
@@ -137,6 +137,7 @@ class WarningContext(str, Enum):
 
     TASK_CONSTRUCTOR = "Task constructor"
     DAG_CONSTRUCTOR = "Dag constructor"
+    TASK_DECORATOR = "Task decorator"
 
 
 class RuntimeVaryingValueAnalyzer:
@@ -305,17 +306,23 @@ class DagTaskDetector:
         self.from_imports: dict[str, tuple[str, str]] = from_imports
         self.dag_instances: set[str] = set()
         self.is_in_dag_context: bool = False
+        self.function_def_context: str | None = None
 
     def is_dag_constructor(self, node: ast.Call) -> bool:
         """Check if a call is a Dag constructor."""
-        if not isinstance(node.func, ast.Name):
-            return False
-
-        func_name = node.func.id
+        # to handle use case "from airflow import sdk" and "with sdk.DAG()"
+        if isinstance(node.func, ast.Attribute) and 
isinstance(node.func.value, ast.Name):
+            if node.func.value.id in self.from_imports:
+                module, original = self.from_imports[node.func.value.id]
+                if (module == "airflow" or module.startswith("airflow.")) and 
node.func.attr in (
+                    "DAG",
+                    "dag",
+                ):
+                    return True
 
-        # "from airflow import DAG" form or "from airflow.decorator import dag"
-        if func_name in self.from_imports:
-            module, original = self.from_imports[func_name]
+        # to handle use case "from airflow import DAG" form or "from 
airflow.decorator import dag"
+        if isinstance(node.func, ast.Name) and node.func.id in 
self.from_imports:
+            module, original = self.from_imports[node.func.id]
             if (module == "airflow" or module.startswith("airflow.")) and 
original in ("DAG", "dag"):
                 return True
 
@@ -329,8 +336,8 @@ class DagTaskDetector:
         1. All calls within a Dag with block
         2. Calls that receive a Dag instance as an argument (dag=...)
         """
-        # Inside Dag with block
-        if self.is_in_dag_context:
+        # Check whether it is nside Dag with block and has task pattern name
+        if self.is_in_dag_context and self.check_is_task_by_name(node.func):
             return True
 
         # Passing Dag instance as argument
@@ -345,6 +352,32 @@ class DagTaskDetector:
 
         return False
 
+    def is_task_decorator(self, node: ast.expr):
+        if isinstance(node, ast.Name) or isinstance(node, ast.Attribute):
+            return self.check_is_task_by_name(node)
+        if isinstance(node, ast.Call):
+            return self.is_task_decorator(node.func)
+        return False
+
+    def check_is_task_by_name(self, node: ast.expr):
+        """Check if task function has task name."""
+
+        def check_is_task_function_name(name):
+            return (
+                name.lower().endswith("operator")
+                or name.lower().endswith("task")
+                or name.lower().endswith("sensor")
+            )
+
+        if isinstance(node, ast.Name):
+            return check_is_task_function_name(node.id)
+        if isinstance(node, ast.Attribute):
+            if check_is_task_function_name(node.attr):
+                return True
+            return self.check_is_task_by_name(node.value)
+
+        return False
+
     def register_dag_instance(self, var_name: str):
         """Register a Dag instance variable name."""
         self.dag_instances.add(var_name)
@@ -375,6 +408,7 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor):
         self.imports: dict[str, str] = {}
         self.from_imports: dict[str, tuple[str, str]] = {}
         self.varying_vars: dict[str, tuple[int, str]] = {}
+        self.varying_functions: dict[str, RuntimeVaryingValueWarning] = {}
         self.check_level = check_level
 
         # Helper objects
@@ -424,6 +458,9 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor):
 
         Check not assign but just call the function or Dag definition via 
decorator.
         """
+        if isinstance(node.func, ast.Name) and (warning := 
self.varying_functions.get(node.func.id)):
+            self.static_check_result.warnings[warning.line] = warning
+
         if self.dag_detector.is_dag_constructor(node):
             self._check_and_warn(node, WarningContext.DAG_CONSTRUCTOR)
 
@@ -464,6 +501,10 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor):
                     # check the value defined in with statement to detect 
entering Dag with block
                     is_with_dag_context = True
 
+                    # add dag variable defined in with Dag statement
+                    if item.optional_vars and isinstance(item.optional_vars, 
ast.Name):
+                        self._register_dag_instances([item.optional_vars])
+
         if is_with_dag_context:
             self.dag_detector.enter_dag_context()
 
@@ -473,6 +514,21 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor):
         # Exit Dag with block
         self.dag_detector.exit_dag_context()
 
+    def visit_FunctionDef(self, node: ast.FunctionDef):
+        for decorator in node.decorator_list:
+            if self.dag_detector.is_task_decorator(decorator):
+                if isinstance(decorator, ast.Call):
+                    self._check_and_warn(decorator, 
WarningContext.TASK_DECORATOR)
+                return
+            self.visit(decorator)
+
+        self.dag_detector.function_def_context = node.name
+
+        for body in node.body:
+            self.visit(body)
+
+        self.dag_detector.function_def_context = None
+
     def _register_dag_instances(self, targets: list):
         """Register Dag instance variable names."""
         for target in targets:
@@ -489,19 +545,22 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor):
     def _check_and_warn(self, call: ast.Call, context: WarningContext):
         """Check function call arguments and generate warnings."""
         if self.value_analyzer.get_varying_source(call):
-            self.static_check_result.warnings.append(
-                RuntimeVaryingValueWarning(
-                    line=call.lineno,
-                    col=call.col_offset,
-                    code=ast.unparse(call),
-                    message=self._get_warning_message(context),
-                )
+            warning = RuntimeVaryingValueWarning(
+                line=call.lineno,
+                col=call.col_offset,
+                code=ast.unparse(call),
+                message=self._get_warning_message(context),
             )
 
+            if self.dag_detector.function_def_context:
+                self.varying_functions[self.dag_detector.function_def_context] 
= warning
+            else:
+                self.static_check_result.warnings[warning.line] = warning
+
     def _get_warning_message(self, context: WarningContext) -> str:
         """Get appropriate warning message based on context."""
         if self.dag_detector.is_in_dag_context and context == 
WarningContext.TASK_CONSTRUCTOR:
-            return "Don't use runtime-varying values as function arguments 
within with Dag block"
+            return "Don't use runtime-varying values as arguments of task 
within with Dag block"
         return f"Don't use runtime-varying value as argument in 
{context.value}"
 
 
diff --git 
a/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py 
b/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py
index fdc2160328f..8d858523be4 100644
--- a/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py
+++ b/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py
@@ -363,6 +363,28 @@ class TestDAGTaskDetector:
         result = self.detector.is_task_constructor(call_node)
         assert result is True
 
+    def test_is_task_decorator__check_when_normal_decorator(self):
+        code = """
+@task(task_id='task_id')
+def test():
+    print("test")
+"""
+        call_node = ast.parse(code).body
+
+        assert isinstance(call_node[0], ast.FunctionDef)
+        assert self.detector.is_task_decorator(call_node[0].decorator_list[0]) 
is True
+
+    def test_is_task_decorator__check_when_attribute_decorator(self):
+        code = """
[email protected]
+def test():
+    print("test")
+        """
+        call_node = ast.parse(code).body
+
+        assert isinstance(call_node[0], ast.FunctionDef)
+        assert self.detector.is_task_decorator(call_node[0].decorator_list[0]) 
is True
+
     def test_enter_and_exit_dag_context(self):
         """Properly track entering and exiting Dag with-blocks."""
         assert self.detector.is_in_dag_context is False
@@ -467,7 +489,7 @@ dag = DAG(dag_id=f"dag_{datetime.now()}")
         self.checker.visit(tree)
 
         assert len(self.checker.static_check_result.warnings) == 1
-        assert any("Dag constructor" in w.message for w in 
self.checker.static_check_result.warnings)
+        assert any("Dag constructor" in w.message for w in 
self.checker.static_check_result.warnings.values())
 
     def test_visit_call__detects_task_in_dag_context(self):
         """Detect task creation inside Dag with block."""
@@ -484,7 +506,7 @@ with DAG(dag_id="test") as dag:
         self.checker.visit(tree)
 
         assert len(self.checker.static_check_result.warnings) == 1
-        assert any("PythonOperator" in w.code for w in 
self.checker.static_check_result.warnings)
+        assert any("PythonOperator" in w.code for w in 
self.checker.static_check_result.warnings.values())
 
     def test_visit_for__warns_on_varying_range(self):
         """Warn when for-loop range is runtime-varying."""
@@ -498,7 +520,7 @@ with DAG(
     schedule_interval='@daily',
 ) as dag:
     for i in [datetime.now(), "3"]:
-        task = BashOperator(
+        task = BashTask(
             task_id='print_bash_hello_{i}',
             bash_command=f'echo "Hello from DAG {i}!"',  # !problem
             dag=dag,
@@ -507,10 +529,10 @@ with DAG(
         tree = ast.parse(code)
 
         self.checker.visit(tree)
-        warnings = self.checker.static_check_result.warnings
+        warnings = self.checker.static_check_result.warnings.values()
 
         assert len(warnings) == 1
-        assert any("BashOperator" in w.code for w in warnings)
+        assert any("BashTask" in w.code for w in warnings)
 
     def test_check_and_warn__creates_warning_for_varying_arg(self):
         """Create a warning when detecting varying positional argument."""
@@ -522,7 +544,7 @@ with DAG(
         self.checker._check_and_warn(call_node, WarningContext.DAG_CONSTRUCTOR)
 
         assert len(self.checker.static_check_result.warnings) == 1
-        warning = self.checker.static_check_result.warnings[0]
+        warning = 
next(iter(self.checker.static_check_result.warnings.values()))
         assert WarningContext.DAG_CONSTRUCTOR.value in warning.message
         assert "datetime.now()" in warning.code
 
@@ -536,7 +558,7 @@ with DAG(
         self.checker._check_and_warn(call_node, 
WarningContext.TASK_CONSTRUCTOR)
 
         assert len(self.checker.static_check_result.warnings) == 1
-        warning = self.checker.static_check_result.warnings[0]
+        warning = 
next(iter(self.checker.static_check_result.warnings.values()))
         assert "dag_id" in warning.code
         assert "datetime.now()" in warning.code
 
@@ -552,7 +574,7 @@ class TestIntegrationScenarios:
         tree = ast.parse(code)
         checker = AirflowRuntimeVaryingValueChecker()
         checker.visit(tree)
-        return checker.static_check_result.warnings
+        return list(checker.static_check_result.warnings.values())
 
     def test_antipattern__dynamic_dag_id_with_timestamp(self):
         """ANTI-PATTERN: Using timestamps in Dag IDs."""
@@ -663,6 +685,7 @@ task1 >> task2
         code = """
 from airflow.decorators import dag, task
 from datetime import datetime
+from random import random
 
 @dag(dag_id=f"my_dag_{datetime.now()}") # !problem
 def my_dag_function():
@@ -677,6 +700,10 @@ def my_dag_function():
         assert len(warnings) == 1
 
     def test_dag_generated_in_for_or_function_statement(self):
+        """
+        There are runtime-varying case in create_dag function.
+        But the function doesn't use in here so doesn't make warning
+        """
         code = """
 from airflow import DAG
 from airflow.operators.bash import BashOperator
@@ -691,7 +718,7 @@ def create_dag(dag_id, task_id):
 
     with DAG(
         dag_id,
-        default_args=default_args, # !problem
+        default_args=default_args, # not problem, because the function 
create_dag not called in statement
     ) as dag:
         task1 = BashOperator(
             task_id=task_id
@@ -732,4 +759,56 @@ for i in [datetime.now(), "3"]:
     task1 >> task2 >> task3
 """
         warnings = self._check_code(code)
-        assert len(warnings) == 5
+        assert len(warnings) == 4
+
+    def test_import_dag_from_sdk_module(self):
+        code = """
+from airflow import sdk
+from airflow.providers.standard.operators.python import PythonOperator
+from time import sleep
+import datetime
+
+with sdk.DAG(
+    dag_id="test1",
+    schedule="* * * * *",
+    start_date=datetime.datetime.now(datetime.timezone.utc) - 
datetime.timedelta(minutes=1),
+    tags=["example"],
+) as dag:
+    PythonOperator(
+        task_id="test1_task",
+        python_callable=lambda: print(datetime.now()),
+    )
+"""
+        warnings = self._check_code(code)
+        assert len(warnings) == 1
+
+    def test_python_task_with_task_decorator(self):
+        code = """
+from airflow.sdk import task, DAG
+import datetime
+
+with DAG(
+    dag_id="test1",
+    schedule="* * * * *",
+    tags=["example"],
+) as dag:
+    # the function is serialized with code string byte, so it doesn't affect 
dag version
+
+    @task
+    def add_one_task(x: int):
+        from random import random
+
+        for i in range(int(random() * 50)):
+            do_task(f"Simulating work... {i+1}")
+            sleep(1)
+        return x + 1
+
+    #!problem
+    @task(task_id=datetime.now())
+    def test_task():
+        print("test")
+
+    add_one(3)
+"""
+        warnings = self._check_code(code)
+        assert len(warnings) == 1

Reply via email to