Lee-W commented on code in PR #59430:
URL: https://github.com/apache/airflow/pull/59430#discussion_r2660908546


##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):

Review Comment:
   ```suggestion
       def __init__(self, check_level: Literal["warning", "error"]):
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.

Review Comment:
   ```suggestion
       Represents the result of stability analysis on a Dag file.
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")
+
+        return "\n".join(lines)
+
+    def get_warning_dag_format_dict(self, dag_ids):
+        """Convert warning statement to Dag warning format."""
+        from airflow.models.dagwarning import DagWarningType
+
+        if not self.warnings or self.check_level != "warning":
+            return []
+        return [
+            {
+                "dag_id": dag_id,
+                "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value,
+                "message": self.format_warnings(),
+            }
+            for dag_id in dag_ids
+        ]
+
+    def get_error_format_dict(self, file_path, bundle_path):
+        if not self.warnings or self.check_level != "error":
+            return None
+
+        relative_file_path = str(Path(file_path).relative_to(bundle_path)) if 
bundle_path else file_path
+        return {relative_file_path: self.format_warnings()}
+
+
+@dataclass
+class RuntimeVaryingValueWarning:
+    """Warning information for runtime-varying value detection."""
+
+    line: int
+    col: int
+    code: str
+    message: str
+
+
+class WarningContext(str, Enum):
+    """Context types for warnings."""
+
+    TASK_CONSTRUCTOR = "Task constructor"
+    DAG_CONSTRUCTOR = "Dag constructor"
+
+
+RUNTIME_VARYING_CALLS = [
+    ("datetime", "now"),
+    ("datetime", "today"),
+    ("datetime", "utcnow"),
+    ("date", "today"),
+    ("time", "time"),
+    ("time", "localtime"),
+    ("random", "random"),
+    ("random", "randint"),
+    ("random", "choice"),
+    ("random", "uniform"),
+    ("uuid", "uuid4"),
+    ("uuid", "uuid1"),
+    ("pendulum", "now"),
+    ("pendulum", "today"),
+    ("pendulum", "yesterday"),
+    ("pendulum", "tomorrow"),
+]
+
+
+class RuntimeVaryingValueAnalyzer:
+    """
+    Analyzer dedicated to tracking and detecting runtime-varying values.
+
+    This analyzer is responsible for identifying if a given AST node
+    contains values that change on every execution (datetime.now(), random(), 
etc.).
+    """
+
+    def __init__(self, varying_vars: dict, imports: dict, from_imports: dict):
+        self.varying_vars = varying_vars
+        self.imports = imports
+        self.from_imports = from_imports
+
+    def get_varying_source(self, node: ast.expr) -> str | None:
+        """
+        Check if an AST node contains runtime-varying values and return the 
source.
+
+        Checks:
+        - Runtime-varying function calls (datetime.now(), etc.)
+        - Runtime-varying variable references
+        - Runtime-varying values in f-strings
+        - Runtime-varying values in expressions/collections
+        """
+        # 1. Direct runtime-varying call
+        if isinstance(node, ast.Call) and self._is_runtime_varying_call(node):
+            return ast.unparse(node)
+
+        # 2. Runtime-varying variable reference
+        if isinstance(node, ast.Name) and node.id in self.varying_vars:
+            _, source = self.varying_vars[node.id]
+            return source
+
+        # 3. f-string
+        if isinstance(node, ast.JoinedStr):
+            return self._check_fstring_varying(node)
+
+        # 4. Binary operation
+        if isinstance(node, ast.BinOp):
+            return self.get_varying_source(node.left) or 
self.get_varying_source(node.right)
+
+        # 5. Collections (list/tuple/set)
+        if isinstance(node, (ast.List, ast.Tuple, ast.Set)):
+            return self._check_collection_varying(node.elts)
+
+        # 6. List comprehension
+        if isinstance(node, ast.ListComp):
+            return self.get_varying_source(node.elt)
+
+        # 7. Dictionary
+        if isinstance(node, ast.Dict):
+            return self._check_dict_varying(node)
+
+        # 8. Method call chain
+        if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
+            return self.get_varying_source(node.func.value)
+
+        return None
+
+    def _check_fstring_varying(self, node: ast.JoinedStr) -> str | None:
+        """Check for runtime-varying values inside f-strings."""
+        for value in node.values:
+            if isinstance(value, ast.FormattedValue):
+                source = self.get_varying_source(value.value)
+                if source:
+                    return source
+        return None
+
+    def _check_collection_varying(self, elements: list) -> str | None:
+        """Check for runtime-varying values in collection elements."""
+        for elt in elements:
+            source = self.get_varying_source(elt)
+            if source:
+                return source
+        return None
+
+    def _check_dict_varying(self, node: ast.Dict) -> str | None:
+        """Check for runtime-varying values in dictionary keys/values."""
+        for key, value in zip(node.keys, node.values):
+            if key:
+                source = self.get_varying_source(key)
+                if source:
+                    return source
+            if value:
+                source = self.get_varying_source(value)
+                if source:
+                    return source
+        return None
+
+    def _is_runtime_varying_call(self, node: ast.Call) -> bool:
+        """
+        Check if a call is runtime-varying.
+
+        1. Is the function itself runtime-varying?
+        2. Do the arguments contain runtime-varying values?
+        """
+        # Check if the function itself is runtime-varying
+        if isinstance(node.func, ast.Attribute):
+            if self._is_runtime_varying_attribute_call(node.func):
+                return True
+        elif isinstance(node.func, ast.Name):
+            if self._is_runtime_varying_name_call(node.func):
+                return True
+
+        # Check if arguments contain runtime-varying values
+        return self._has_varying_arguments(node)
+
+    def _has_varying_arguments(self, node: ast.Call) -> bool:
+        """Check if function arguments contain runtime-varying values."""
+        for arg in node.args:
+            if self.get_varying_source(arg):
+                return True
+
+        for kw in node.keywords:
+            if self.get_varying_source(kw.value):
+                return True
+
+        return False
+
+    def _is_runtime_varying_attribute_call(self, attr: ast.Attribute) -> bool:
+        """Check for runtime-varying calls like datetime.now()."""
+        method_name = attr.attr
+
+        if isinstance(attr.value, ast.Name):
+            module_or_alias = attr.value.id
+            actual_module = self.imports.get(module_or_alias, module_or_alias)
+
+            # If imported via "from import"
+            if module_or_alias in self.from_imports:
+                _, original_name = self.from_imports[module_or_alias]
+                actual_module = original_name
+
+            return (actual_module, method_name) in RUNTIME_VARYING_CALLS
+
+        # Nested attribute (e.g., datetime.datetime.now)
+        if isinstance(attr.value, ast.Attribute):
+            inner_attr = attr.value
+            if isinstance(inner_attr.value, ast.Name):
+                return (inner_attr.attr, method_name) in RUNTIME_VARYING_CALLS
+
+        return False
+
+    def _is_runtime_varying_name_call(self, func: ast.Name) -> bool:
+        """Check for runtime-varying calls like now() (when imported via 'from 
import')."""
+        func_name = func.id
+
+        if func_name in self.from_imports:
+            module, original_name = self.from_imports[func_name]
+            module_parts = module.split(".")
+
+            for part in module_parts:
+                if (part, original_name) in RUNTIME_VARYING_CALLS:
+                    return True
+
+        return False
+
+
+class DagTaskDetector:
+    """
+    Detector dedicated to identifying Dag and Task constructors.
+
+    This detector identifies when code is creating Dag or Task objects
+    in Airflow. It needs to handle both traditional class instantiation and 
decorator styles.
+    """
+
+    def __init__(self, from_imports: dict):
+        self.from_imports = from_imports
+        self.dag_instances: set[str] = set()
+        self.is_in_dag_context: bool = False
+
+    def is_dag_constructor(self, node: ast.Call) -> bool:
+        """Check if a call is a DAG constructor."""
+        if not isinstance(node.func, ast.Name):
+            return False
+
+        func_name = node.func.id
+
+        # "from airflow import DAG" form or "from airflow.decorator import dag"
+        if func_name in self.from_imports:
+            module, original = self.from_imports[func_name]
+            if (module == "airflow" or module.startswith("airflow.")) and 
original in ("DAG", "dag"):
+                return True
+
+        return False
+
+    def is_task_constructor(self, node: ast.Call) -> bool:
+        """
+        Check if a call is a Task constructor.
+
+        Criteria:
+        1. All calls within a DAG with block
+        2. Calls that receive a DAG instance as an argument (dag=...)
+        """
+        # Inside DAG with block
+        if self.is_in_dag_context:
+            return True
+
+        # Passing DAG instance as argument
+        for arg in node.args:
+            if isinstance(arg, ast.Name) and arg.id in self.dag_instances:
+                return True
+
+        for keyword in node.keywords:
+            if keyword.value and isinstance(keyword.value, ast.Name):
+                if keyword.value.id in self.dag_instances:
+                    return True
+
+        return False
+
+    def register_dag_instance(self, var_name: str):
+        """Register a DAG instance variable name."""
+        self.dag_instances.add(var_name)
+
+    def enter_dag_context(self):
+        """Enter a DAG with block."""
+        self.is_in_dag_context = True
+
+    def exit_dag_context(self):
+        """Exit a DAG with block."""
+        self.is_in_dag_context = False
+
+
+class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor):
+    """
+    Main visitor class to detect runtime-varying value usage in Airflow 
DAG/Task.

Review Comment:
   ```suggestion
       Main visitor class to detect runtime-varying value usage in Airflow 
Dag/Task.
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")

Review Comment:
   ```suggestion
               lines.extend(
                   [
                       "️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:",
                       *(
                           f"  Line {line}: '{var_name}' related '{source}'"
                           for var_name, (line, source) in sorted(
                               self.runtime_varying_values.items(),
                               key=lambda x: x[1][0],
                           )
                       ),
                       "",
                   ]
               )
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")

Review Comment:
   ```suggestion
               lines.extend(
                   [
                       f"Line {w.line}, Col {w.col}",
                       f"Code: {w.code}",
                       f"Issue: {w.message}",
                       "",
                   ]
               )
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")
+
+        return "\n".join(lines)
+
+    def get_warning_dag_format_dict(self, dag_ids):
+        """Convert warning statement to Dag warning format."""
+        from airflow.models.dagwarning import DagWarningType
+
+        if not self.warnings or self.check_level != "warning":
+            return []
+        return [
+            {
+                "dag_id": dag_id,
+                "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value,
+                "message": self.format_warnings(),
+            }
+            for dag_id in dag_ids
+        ]
+
+    def get_error_format_dict(self, file_path, bundle_path):
+        if not self.warnings or self.check_level != "error":
+            return None
+
+        relative_file_path = str(Path(file_path).relative_to(bundle_path)) if 
bundle_path else file_path
+        return {relative_file_path: self.format_warnings()}
+
+
+@dataclass
+class RuntimeVaryingValueWarning:
+    """Warning information for runtime-varying value detection."""
+
+    line: int
+    col: int
+    code: str
+    message: str
+
+
+class WarningContext(str, Enum):
+    """Context types for warnings."""
+
+    TASK_CONSTRUCTOR = "Task constructor"
+    DAG_CONSTRUCTOR = "Dag constructor"
+
+
+RUNTIME_VARYING_CALLS = [
+    ("datetime", "now"),
+    ("datetime", "today"),
+    ("datetime", "utcnow"),
+    ("date", "today"),
+    ("time", "time"),
+    ("time", "localtime"),
+    ("random", "random"),
+    ("random", "randint"),
+    ("random", "choice"),
+    ("random", "uniform"),
+    ("uuid", "uuid4"),
+    ("uuid", "uuid1"),
+    ("pendulum", "now"),
+    ("pendulum", "today"),
+    ("pendulum", "yesterday"),
+    ("pendulum", "tomorrow"),
+]
+
+
+class RuntimeVaryingValueAnalyzer:
+    """
+    Analyzer dedicated to tracking and detecting runtime-varying values.
+
+    This analyzer is responsible for identifying if a given AST node
+    contains values that change on every execution (datetime.now(), random(), 
etc.).
+    """
+
+    def __init__(self, varying_vars: dict, imports: dict, from_imports: dict):
+        self.varying_vars = varying_vars
+        self.imports = imports
+        self.from_imports = from_imports
+
+    def get_varying_source(self, node: ast.expr) -> str | None:
+        """
+        Check if an AST node contains runtime-varying values and return the 
source.
+
+        Checks:
+        - Runtime-varying function calls (datetime.now(), etc.)
+        - Runtime-varying variable references
+        - Runtime-varying values in f-strings
+        - Runtime-varying values in expressions/collections
+        """
+        # 1. Direct runtime-varying call
+        if isinstance(node, ast.Call) and self._is_runtime_varying_call(node):
+            return ast.unparse(node)
+
+        # 2. Runtime-varying variable reference
+        if isinstance(node, ast.Name) and node.id in self.varying_vars:
+            _, source = self.varying_vars[node.id]
+            return source
+
+        # 3. f-string
+        if isinstance(node, ast.JoinedStr):
+            return self._check_fstring_varying(node)
+
+        # 4. Binary operation
+        if isinstance(node, ast.BinOp):
+            return self.get_varying_source(node.left) or 
self.get_varying_source(node.right)
+
+        # 5. Collections (list/tuple/set)
+        if isinstance(node, (ast.List, ast.Tuple, ast.Set)):
+            return self._check_collection_varying(node.elts)
+
+        # 6. List comprehension
+        if isinstance(node, ast.ListComp):
+            return self.get_varying_source(node.elt)
+
+        # 7. Dictionary
+        if isinstance(node, ast.Dict):
+            return self._check_dict_varying(node)
+
+        # 8. Method call chain
+        if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
+            return self.get_varying_source(node.func.value)
+
+        return None
+
+    def _check_fstring_varying(self, node: ast.JoinedStr) -> str | None:
+        """Check for runtime-varying values inside f-strings."""
+        for value in node.values:
+            if isinstance(value, ast.FormattedValue):
+                source = self.get_varying_source(value.value)
+                if source:
+                    return source
+        return None
+
+    def _check_collection_varying(self, elements: list) -> str | None:
+        """Check for runtime-varying values in collection elements."""
+        for elt in elements:
+            source = self.get_varying_source(elt)
+            if source:
+                return source
+        return None
+
+    def _check_dict_varying(self, node: ast.Dict) -> str | None:
+        """Check for runtime-varying values in dictionary keys/values."""
+        for key, value in zip(node.keys, node.values):
+            if key:
+                source = self.get_varying_source(key)
+                if source:
+                    return source
+            if value:
+                source = self.get_varying_source(value)
+                if source:
+                    return source
+        return None
+
+    def _is_runtime_varying_call(self, node: ast.Call) -> bool:
+        """
+        Check if a call is runtime-varying.
+
+        1. Is the function itself runtime-varying?
+        2. Do the arguments contain runtime-varying values?
+        """
+        # Check if the function itself is runtime-varying
+        if isinstance(node.func, ast.Attribute):
+            if self._is_runtime_varying_attribute_call(node.func):
+                return True
+        elif isinstance(node.func, ast.Name):
+            if self._is_runtime_varying_name_call(node.func):
+                return True
+
+        # Check if arguments contain runtime-varying values
+        return self._has_varying_arguments(node)
+
+    def _has_varying_arguments(self, node: ast.Call) -> bool:
+        """Check if function arguments contain runtime-varying values."""
+        for arg in node.args:
+            if self.get_varying_source(arg):
+                return True
+
+        for kw in node.keywords:
+            if self.get_varying_source(kw.value):
+                return True
+
+        return False
+
+    def _is_runtime_varying_attribute_call(self, attr: ast.Attribute) -> bool:
+        """Check for runtime-varying calls like datetime.now()."""
+        method_name = attr.attr
+
+        if isinstance(attr.value, ast.Name):
+            module_or_alias = attr.value.id
+            actual_module = self.imports.get(module_or_alias, module_or_alias)
+
+            # If imported via "from import"
+            if module_or_alias in self.from_imports:
+                _, original_name = self.from_imports[module_or_alias]
+                actual_module = original_name
+
+            return (actual_module, method_name) in RUNTIME_VARYING_CALLS
+
+        # Nested attribute (e.g., datetime.datetime.now)
+        if isinstance(attr.value, ast.Attribute):
+            inner_attr = attr.value
+            if isinstance(inner_attr.value, ast.Name):
+                return (inner_attr.attr, method_name) in RUNTIME_VARYING_CALLS
+
+        return False
+
+    def _is_runtime_varying_name_call(self, func: ast.Name) -> bool:
+        """Check for runtime-varying calls like now() (when imported via 'from 
import')."""
+        func_name = func.id
+
+        if func_name in self.from_imports:
+            module, original_name = self.from_imports[func_name]
+            module_parts = module.split(".")
+
+            for part in module_parts:
+                if (part, original_name) in RUNTIME_VARYING_CALLS:
+                    return True
+
+        return False
+
+
+class DagTaskDetector:
+    """
+    Detector dedicated to identifying Dag and Task constructors.
+
+    This detector identifies when code is creating Dag or Task objects
+    in Airflow. It needs to handle both traditional class instantiation and 
decorator styles.
+    """
+
+    def __init__(self, from_imports: dict):
+        self.from_imports = from_imports
+        self.dag_instances: set[str] = set()
+        self.is_in_dag_context: bool = False
+
+    def is_dag_constructor(self, node: ast.Call) -> bool:
+        """Check if a call is a DAG constructor."""
+        if not isinstance(node.func, ast.Name):
+            return False
+
+        func_name = node.func.id
+
+        # "from airflow import DAG" form or "from airflow.decorator import dag"
+        if func_name in self.from_imports:
+            module, original = self.from_imports[func_name]
+            if (module == "airflow" or module.startswith("airflow.")) and 
original in ("DAG", "dag"):
+                return True
+
+        return False
+
+    def is_task_constructor(self, node: ast.Call) -> bool:
+        """
+        Check if a call is a Task constructor.
+
+        Criteria:
+        1. All calls within a DAG with block
+        2. Calls that receive a DAG instance as an argument (dag=...)
+        """
+        # Inside DAG with block
+        if self.is_in_dag_context:
+            return True
+
+        # Passing DAG instance as argument
+        for arg in node.args:
+            if isinstance(arg, ast.Name) and arg.id in self.dag_instances:
+                return True
+
+        for keyword in node.keywords:
+            if keyword.value and isinstance(keyword.value, ast.Name):
+                if keyword.value.id in self.dag_instances:
+                    return True
+
+        return False
+
+    def register_dag_instance(self, var_name: str):
+        """Register a DAG instance variable name."""
+        self.dag_instances.add(var_name)
+
+    def enter_dag_context(self):
+        """Enter a DAG with block."""
+        self.is_in_dag_context = True
+
+    def exit_dag_context(self):
+        """Exit a DAG with block."""
+        self.is_in_dag_context = False

Review Comment:
   ```suggestion
           """Register a Dag instance variable name."""
           self.dag_instances.add(var_name)
   
       def enter_dag_context(self):
           """Enter a Dag with block."""
           self.is_in_dag_context = True
   
       def exit_dag_context(self):
           """Exit a Dag with block."""
           self.is_in_dag_context = False
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,468 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+@dataclass
+class RuntimeVaryingValueWarning:
+    """Warning information for runtime-varying value detection."""
+
+    line: int
+    col: int
+    code: str
+    message: str
+
+
+class WarningContext(str, Enum):
+    """Context types for warnings."""
+
+    TASK_CONSTRUCTOR = "Task constructor"
+    DAG_CONSTRUCTOR = "Dag constructor"
+
+
+RUNTIME_VARYING_CALLS = [

Review Comment:
   We can move this list to the top so that we can easily find where to extend



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")
+
+        return "\n".join(lines)
+
+    def get_warning_dag_format_dict(self, dag_ids):
+        """Convert warning statement to Dag warning format."""
+        from airflow.models.dagwarning import DagWarningType
+
+        if not self.warnings or self.check_level != "warning":
+            return []
+        return [
+            {
+                "dag_id": dag_id,
+                "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value,
+                "message": self.format_warnings(),
+            }
+            for dag_id in dag_ids
+        ]
+
+    def get_error_format_dict(self, file_path, bundle_path):
+        if not self.warnings or self.check_level != "error":
+            return None
+
+        relative_file_path = str(Path(file_path).relative_to(bundle_path)) if 
bundle_path else file_path
+        return {relative_file_path: self.format_warnings()}
+
+
+@dataclass
+class RuntimeVaryingValueWarning:
+    """Warning information for runtime-varying value detection."""
+
+    line: int
+    col: int
+    code: str
+    message: str
+
+
+class WarningContext(str, Enum):
+    """Context types for warnings."""
+
+    TASK_CONSTRUCTOR = "Task constructor"
+    DAG_CONSTRUCTOR = "Dag constructor"
+
+
+RUNTIME_VARYING_CALLS = [
+    ("datetime", "now"),
+    ("datetime", "today"),
+    ("datetime", "utcnow"),
+    ("date", "today"),
+    ("time", "time"),
+    ("time", "localtime"),
+    ("random", "random"),
+    ("random", "randint"),
+    ("random", "choice"),
+    ("random", "uniform"),
+    ("uuid", "uuid4"),
+    ("uuid", "uuid1"),
+    ("pendulum", "now"),
+    ("pendulum", "today"),
+    ("pendulum", "yesterday"),
+    ("pendulum", "tomorrow"),
+]
+
+
+class RuntimeVaryingValueAnalyzer:
+    """
+    Analyzer dedicated to tracking and detecting runtime-varying values.
+
+    This analyzer is responsible for identifying if a given AST node
+    contains values that change on every execution (datetime.now(), random(), 
etc.).
+    """
+
+    def __init__(self, varying_vars: dict, imports: dict, from_imports: dict):

Review Comment:
   let's improve the type annotation a bit here



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")
+
+        return "\n".join(lines)
+
+    def get_warning_dag_format_dict(self, dag_ids):
+        """Convert warning statement to Dag warning format."""
+        from airflow.models.dagwarning import DagWarningType
+
+        if not self.warnings or self.check_level != "warning":
+            return []
+        return [
+            {
+                "dag_id": dag_id,
+                "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value,
+                "message": self.format_warnings(),
+            }
+            for dag_id in dag_ids
+        ]
+
+    def get_error_format_dict(self, file_path, bundle_path):
+        if not self.warnings or self.check_level != "error":
+            return None
+
+        relative_file_path = str(Path(file_path).relative_to(bundle_path)) if 
bundle_path else file_path
+        return {relative_file_path: self.format_warnings()}
+
+
+@dataclass
+class RuntimeVaryingValueWarning:
+    """Warning information for runtime-varying value detection."""
+
+    line: int
+    col: int
+    code: str
+    message: str
+
+
+class WarningContext(str, Enum):
+    """Context types for warnings."""
+
+    TASK_CONSTRUCTOR = "Task constructor"
+    DAG_CONSTRUCTOR = "Dag constructor"
+
+
+RUNTIME_VARYING_CALLS = [
+    ("datetime", "now"),
+    ("datetime", "today"),
+    ("datetime", "utcnow"),
+    ("date", "today"),
+    ("time", "time"),
+    ("time", "localtime"),
+    ("random", "random"),
+    ("random", "randint"),
+    ("random", "choice"),
+    ("random", "uniform"),
+    ("uuid", "uuid4"),
+    ("uuid", "uuid1"),
+    ("pendulum", "now"),
+    ("pendulum", "today"),
+    ("pendulum", "yesterday"),
+    ("pendulum", "tomorrow"),
+]
+
+
+class RuntimeVaryingValueAnalyzer:
+    """
+    Analyzer dedicated to tracking and detecting runtime-varying values.
+
+    This analyzer is responsible for identifying if a given AST node
+    contains values that change on every execution (datetime.now(), random(), 
etc.).
+    """
+
+    def __init__(self, varying_vars: dict, imports: dict, from_imports: dict):
+        self.varying_vars = varying_vars
+        self.imports = imports
+        self.from_imports = from_imports
+
+    def get_varying_source(self, node: ast.expr) -> str | None:
+        """
+        Check if an AST node contains runtime-varying values and return the 
source.
+
+        Checks:
+        - Runtime-varying function calls (datetime.now(), etc.)
+        - Runtime-varying variable references
+        - Runtime-varying values in f-strings
+        - Runtime-varying values in expressions/collections
+        """
+        # 1. Direct runtime-varying call
+        if isinstance(node, ast.Call) and self._is_runtime_varying_call(node):
+            return ast.unparse(node)
+
+        # 2. Runtime-varying variable reference
+        if isinstance(node, ast.Name) and node.id in self.varying_vars:
+            _, source = self.varying_vars[node.id]
+            return source
+
+        # 3. f-string
+        if isinstance(node, ast.JoinedStr):
+            return self._check_fstring_varying(node)
+
+        # 4. Binary operation
+        if isinstance(node, ast.BinOp):
+            return self.get_varying_source(node.left) or 
self.get_varying_source(node.right)
+
+        # 5. Collections (list/tuple/set)
+        if isinstance(node, (ast.List, ast.Tuple, ast.Set)):
+            return self._check_collection_varying(node.elts)
+
+        # 6. List comprehension
+        if isinstance(node, ast.ListComp):
+            return self.get_varying_source(node.elt)
+
+        # 7. Dictionary
+        if isinstance(node, ast.Dict):
+            return self._check_dict_varying(node)
+
+        # 8. Method call chain
+        if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
+            return self.get_varying_source(node.func.value)
+
+        return None
+
+    def _check_fstring_varying(self, node: ast.JoinedStr) -> str | None:
+        """Check for runtime-varying values inside f-strings."""
+        for value in node.values:
+            if isinstance(value, ast.FormattedValue):
+                source = self.get_varying_source(value.value)
+                if source:

Review Comment:
   ```suggestion
                   if source := self.get_varying_source(value.value):
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")
+
+        return "\n".join(lines)
+
+    def get_warning_dag_format_dict(self, dag_ids):

Review Comment:
   ```suggestion
       def get_warning_dag_format_dict(self, dag_ids: list[str]) -> 
list[dict[str, Any]]:
   ```
   
   or make the dict part a typed dict



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")
+
+        return "\n".join(lines)
+
+    def get_warning_dag_format_dict(self, dag_ids):
+        """Convert warning statement to Dag warning format."""
+        from airflow.models.dagwarning import DagWarningType
+
+        if not self.warnings or self.check_level != "warning":
+            return []
+        return [
+            {
+                "dag_id": dag_id,
+                "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value,
+                "message": self.format_warnings(),
+            }
+            for dag_id in dag_ids
+        ]
+
+    def get_error_format_dict(self, file_path, bundle_path):
+        if not self.warnings or self.check_level != "error":
+            return None
+
+        relative_file_path = str(Path(file_path).relative_to(bundle_path)) if 
bundle_path else file_path
+        return {relative_file_path: self.format_warnings()}
+
+
+@dataclass
+class RuntimeVaryingValueWarning:
+    """Warning information for runtime-varying value detection."""
+
+    line: int
+    col: int
+    code: str
+    message: str
+
+
+class WarningContext(str, Enum):
+    """Context types for warnings."""
+
+    TASK_CONSTRUCTOR = "Task constructor"
+    DAG_CONSTRUCTOR = "Dag constructor"
+
+
+RUNTIME_VARYING_CALLS = [
+    ("datetime", "now"),
+    ("datetime", "today"),
+    ("datetime", "utcnow"),
+    ("date", "today"),
+    ("time", "time"),
+    ("time", "localtime"),
+    ("random", "random"),
+    ("random", "randint"),
+    ("random", "choice"),
+    ("random", "uniform"),
+    ("uuid", "uuid4"),
+    ("uuid", "uuid1"),
+    ("pendulum", "now"),
+    ("pendulum", "today"),
+    ("pendulum", "yesterday"),
+    ("pendulum", "tomorrow"),
+]
+
+
+class RuntimeVaryingValueAnalyzer:
+    """
+    Analyzer dedicated to tracking and detecting runtime-varying values.
+
+    This analyzer is responsible for identifying if a given AST node
+    contains values that change on every execution (datetime.now(), random(), 
etc.).
+    """
+
+    def __init__(self, varying_vars: dict, imports: dict, from_imports: dict):
+        self.varying_vars = varying_vars
+        self.imports = imports
+        self.from_imports = from_imports
+
+    def get_varying_source(self, node: ast.expr) -> str | None:
+        """
+        Check if an AST node contains runtime-varying values and return the 
source.
+
+        Checks:
+        - Runtime-varying function calls (datetime.now(), etc.)
+        - Runtime-varying variable references
+        - Runtime-varying values in f-strings
+        - Runtime-varying values in expressions/collections
+        """
+        # 1. Direct runtime-varying call
+        if isinstance(node, ast.Call) and self._is_runtime_varying_call(node):
+            return ast.unparse(node)
+
+        # 2. Runtime-varying variable reference
+        if isinstance(node, ast.Name) and node.id in self.varying_vars:
+            _, source = self.varying_vars[node.id]
+            return source
+
+        # 3. f-string
+        if isinstance(node, ast.JoinedStr):
+            return self._check_fstring_varying(node)
+
+        # 4. Binary operation
+        if isinstance(node, ast.BinOp):
+            return self.get_varying_source(node.left) or 
self.get_varying_source(node.right)
+
+        # 5. Collections (list/tuple/set)
+        if isinstance(node, (ast.List, ast.Tuple, ast.Set)):
+            return self._check_collection_varying(node.elts)
+
+        # 6. List comprehension
+        if isinstance(node, ast.ListComp):
+            return self.get_varying_source(node.elt)
+
+        # 7. Dictionary
+        if isinstance(node, ast.Dict):
+            return self._check_dict_varying(node)
+
+        # 8. Method call chain
+        if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
+            return self.get_varying_source(node.func.value)
+
+        return None
+
+    def _check_fstring_varying(self, node: ast.JoinedStr) -> str | None:
+        """Check for runtime-varying values inside f-strings."""
+        for value in node.values:
+            if isinstance(value, ast.FormattedValue):
+                source = self.get_varying_source(value.value)
+                if source:
+                    return source
+        return None
+
+    def _check_collection_varying(self, elements: list) -> str | None:
+        """Check for runtime-varying values in collection elements."""
+        for elt in elements:
+            source = self.get_varying_source(elt)
+            if source:
+                return source
+        return None
+
+    def _check_dict_varying(self, node: ast.Dict) -> str | None:
+        """Check for runtime-varying values in dictionary keys/values."""
+        for key, value in zip(node.keys, node.values):
+            if key:
+                source = self.get_varying_source(key)
+                if source:
+                    return source
+            if value:
+                source = self.get_varying_source(value)
+                if source:
+                    return source
+        return None
+
+    def _is_runtime_varying_call(self, node: ast.Call) -> bool:

Review Comment:
   ```suggestion
       def is_runtime_varying_call(self, node: ast.Call) -> bool:
   ```
   
   Why do we need to make these methods private?



##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -2663,6 +2663,22 @@ dag_processor:
       type: boolean
       example: ~
       default: "True"
+    dag_stability_check_level:
+      description: |
+        Controls the behavior of dag stability checker performed before DAG 
parsing in the dag-processor.

Review Comment:
   ```suggestion
           Controls the behavior of Dag stability checker performed before Dag 
parsing in the Dag processor.
   ```



##########
airflow-core/src/airflow/utils/dag_stability_checker.py:
##########
@@ -0,0 +1,510 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+
+
+class DagStabilityCheckerResult:
+    """
+    Represents the result of stability analysis on a DAG file.
+
+    Stores detected warnings and formats them appropriately based on the 
configured check level
+    (warning or error).
+    """
+
+    def __init__(self, check_level):
+        self.check_level: str = check_level
+        self.warnings: list[RuntimeVaryingValueWarning] = []
+        self.runtime_varying_values: dict = {}
+
+    def format_warnings(self) -> str | None:
+        """Return formatted string of warning list."""
+        if not self.warnings:
+            return None
+
+        lines = [
+            "⚠️ This Dag uses runtime-variable values in Dag construction.",
+            "⚠️ It causes the Dag version to increase as values change on 
every Dag parse.",
+            "",
+        ]
+        for w in self.warnings:
+            lines.append(f"Line {w.line}, Col {w.col}")
+            lines.append(f"Code: {w.code}")
+            lines.append(f"Issue: {w.message}")
+            lines.append("")
+
+        if self.runtime_varying_values:
+            lines.append("️⚠️ Don't use the variables as arguments in DAG/Task 
constructors:")
+            # Sort by line number
+            sorted_vars = sorted(self.runtime_varying_values.items(), 
key=lambda x: x[1][0])
+            for var_name, (line, source) in sorted_vars:
+                lines.append(f"  Line {line}: '{var_name}' related '{source}'")
+            lines.append("")
+
+        return "\n".join(lines)
+
+    def get_warning_dag_format_dict(self, dag_ids):
+        """Convert warning statement to Dag warning format."""
+        from airflow.models.dagwarning import DagWarningType
+
+        if not self.warnings or self.check_level != "warning":
+            return []
+        return [
+            {
+                "dag_id": dag_id,
+                "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value,
+                "message": self.format_warnings(),
+            }
+            for dag_id in dag_ids
+        ]
+
+    def get_error_format_dict(self, file_path, bundle_path):
+        if not self.warnings or self.check_level != "error":
+            return None
+
+        relative_file_path = str(Path(file_path).relative_to(bundle_path)) if 
bundle_path else file_path
+        return {relative_file_path: self.format_warnings()}
+
+
+@dataclass
+class RuntimeVaryingValueWarning:
+    """Warning information for runtime-varying value detection."""
+
+    line: int
+    col: int
+    code: str
+    message: str
+
+
+class WarningContext(str, Enum):
+    """Context types for warnings."""
+
+    TASK_CONSTRUCTOR = "Task constructor"
+    DAG_CONSTRUCTOR = "Dag constructor"
+
+
+RUNTIME_VARYING_CALLS = [
+    ("datetime", "now"),
+    ("datetime", "today"),
+    ("datetime", "utcnow"),
+    ("date", "today"),
+    ("time", "time"),
+    ("time", "localtime"),
+    ("random", "random"),
+    ("random", "randint"),
+    ("random", "choice"),
+    ("random", "uniform"),
+    ("uuid", "uuid4"),
+    ("uuid", "uuid1"),
+    ("pendulum", "now"),
+    ("pendulum", "today"),
+    ("pendulum", "yesterday"),
+    ("pendulum", "tomorrow"),
+]
+
+
+class RuntimeVaryingValueAnalyzer:
+    """
+    Analyzer dedicated to tracking and detecting runtime-varying values.
+
+    This analyzer is responsible for identifying if a given AST node
+    contains values that change on every execution (datetime.now(), random(), 
etc.).
+    """
+
+    def __init__(self, varying_vars: dict, imports: dict, from_imports: dict):
+        self.varying_vars = varying_vars
+        self.imports = imports
+        self.from_imports = from_imports
+
+    def get_varying_source(self, node: ast.expr) -> str | None:
+        """
+        Check if an AST node contains runtime-varying values and return the 
source.
+
+        Checks:
+        - Runtime-varying function calls (datetime.now(), etc.)
+        - Runtime-varying variable references
+        - Runtime-varying values in f-strings
+        - Runtime-varying values in expressions/collections
+        """
+        # 1. Direct runtime-varying call
+        if isinstance(node, ast.Call) and self._is_runtime_varying_call(node):
+            return ast.unparse(node)
+
+        # 2. Runtime-varying variable reference
+        if isinstance(node, ast.Name) and node.id in self.varying_vars:
+            _, source = self.varying_vars[node.id]
+            return source
+
+        # 3. f-string
+        if isinstance(node, ast.JoinedStr):
+            return self._check_fstring_varying(node)
+
+        # 4. Binary operation
+        if isinstance(node, ast.BinOp):
+            return self.get_varying_source(node.left) or 
self.get_varying_source(node.right)
+
+        # 5. Collections (list/tuple/set)
+        if isinstance(node, (ast.List, ast.Tuple, ast.Set)):
+            return self._check_collection_varying(node.elts)
+
+        # 6. List comprehension
+        if isinstance(node, ast.ListComp):
+            return self.get_varying_source(node.elt)
+
+        # 7. Dictionary
+        if isinstance(node, ast.Dict):
+            return self._check_dict_varying(node)
+
+        # 8. Method call chain
+        if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
+            return self.get_varying_source(node.func.value)
+
+        return None
+
+    def _check_fstring_varying(self, node: ast.JoinedStr) -> str | None:

Review Comment:
   If we name the first method as `get_varing_source`, should this be named as 
something like `get_varing_fstring` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to