amoghrajesh commented on code in PR #60580:
URL: https://github.com/apache/airflow/pull/60580#discussion_r2831527794


##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -371,6 +371,129 @@ def hash(cls, dag_data):
         data_json = json.dumps(data_, sort_keys=True).encode("utf-8")
         return md5(data_json).hexdigest()
 
+    # Fields to exclude from minimal version hash (noise/config that changes 
frequently
+    # but doesn't affect task behavior or require a new version)
+    _EXCLUDED_TASK_FIELDS: set[str] = {
+        # Display/documentation
+        "ui_color",
+        "ui_fgcolor",
+        "doc",
+        "doc_md",
+        "doc_json",
+        "doc_yaml",
+        "doc_rst",
+        # Template metadata (not actual values)
+        "template_fields",
+        "template_ext",
+        "template_fields_renderers",
+        # Other metadata
+        "render_template_as_native_obj",
+    }
+
+    _EXCLUDED_DAG_FIELDS: set[str] = {
+        # Display/documentation
+        "description",
+        "doc_md",
+        # Config
+        "render_template_as_native_obj",
+        # Location (can change without affecting logic)
+        "fileloc",
+        "relative_fileloc",
+    }
+
+    @classmethod
+    def _extract_task_group_structure(cls, task_group: dict | None) -> dict | 
None:
+        """Extract structural parts of task_group for minimal version hash."""
+        if not task_group or not isinstance(task_group, dict):
+            return None
+
+        children_structure: dict[str, dict | None | str] = {}
+        children = task_group.get("children", {})
+        for label, child in children.items():
+            # In JSON, tuples become lists: ("taskgroup", {...}) -> 
["taskgroup", {...}]
+            if isinstance(child, (tuple, list)) and len(child) == 2:
+                child_type, child_value = child
+                if child_type == "taskgroup" and isinstance(child_value, dict):
+                    children_structure[label] = 
cls._extract_task_group_structure(child_value)
+                else:
+                    children_structure[label] = "task"
+            elif isinstance(child, dict):
+                children_structure[label] = 
cls._extract_task_group_structure(child)
+            else:
+                children_structure[label] = "task"
+
+        return {
+            "_group_id": task_group.get("_group_id"),
+            "is_mapped": task_group.get("is_mapped", False),
+            "children": children_structure,
+        }
+
+    @classmethod
+    def _extract_task_for_minimal_hash(cls, task_payload: dict) -> dict:
+        """Extract version-relevant fields from a task for minimal version 
hash."""
+        return {k: v for k, v in task_payload.items() if k not in 
cls._EXCLUDED_TASK_FIELDS}
+
+    @classmethod
+    def _data_for_minimal_version_hash(cls, dag_data: dict) -> dict:
+        """
+        Extract version-relevant data from DAG for minimal version hash.
+
+        This hash captures the "important" parts of a DAG that should trigger
+        a new version when changed:
+        - Task identity and structure (task_id, task_type, dependencies)
+        - Task execution logic (python_callable, bash_command, sql, etc.)
+        - Mapped task/group configuration
+        - Asset dependencies (inlets, outlets)
+
+        Changes to these fields indicate meaningful changes to what the DAG 
does.
+
+        Excluded (noise that shouldn't trigger new version):
+        - Display/documentation (ui_color, doc_md, description)
+        - Scheduling (start_date, end_date)
+        """
+        dag_section = dag_data.get("dag")
+        if not isinstance(dag_section, dict):
+            return {}
+
+        tasks = []
+        for task in dag_section.get("tasks", []):
+            if not isinstance(task, dict):
+                continue
+            task_payload = task.get("__var") if isinstance(task.get("__var"), 
dict) else task
+            if not isinstance(task_payload, dict):
+                continue
+            tasks.append(cls._extract_task_for_minimal_hash(task_payload))
+
+        # Extract task_group structure (only structural parts)
+        task_group_structure = 
cls._extract_task_group_structure(dag_section.get("task_group"))
+
+        # Extract DAG-level fields, excluding noise
+        dag_fields = {
+            k: v
+            for k, v in dag_section.items()
+            if k not in cls._EXCLUDED_DAG_FIELDS and k != "tasks" and k != 
"task_group"
+        }
+        dag_fields["tasks"] = tasks
+        dag_fields["task_group"] = task_group_structure
+
+        return dag_fields
+
+    @classmethod
+    def minimal_version_hash(cls, dag_data: dict) -> str:

Review Comment:
   Does this have to be a public method?



##########
airflow-core/tests/unit/models/test_serialized_dag.py:
##########
@@ -370,6 +366,150 @@ def 
test_new_dag_versions_are_created_if_there_is_a_dagrun(self, dag_maker, sess
         assert session.scalar(select(func.count()).select_from(DagVersion)) == 
2
         assert session.scalar(select(func.count()).select_from(SDM)) == 2
 
+    def 
test_minimal_version_hash_keeps_version_when_only_start_date_changes(self, 
dag_maker, session):
+        """Changing start_date should NOT create a new version (excluded from 
minimal hash)."""

Review Comment:
   We mention this but I do not see start_date in any of the excluded classes, 
am I missing something?



##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -371,6 +371,129 @@ def hash(cls, dag_data):
         data_json = json.dumps(data_, sort_keys=True).encode("utf-8")
         return md5(data_json).hexdigest()
 
+    # Fields to exclude from minimal version hash (noise/config that changes 
frequently
+    # but doesn't affect task behavior or require a new version)
+    _EXCLUDED_TASK_FIELDS: set[str] = {
+        # Display/documentation
+        "ui_color",
+        "ui_fgcolor",
+        "doc",
+        "doc_md",
+        "doc_json",
+        "doc_yaml",
+        "doc_rst",
+        # Template metadata (not actual values)
+        "template_fields",
+        "template_ext",
+        "template_fields_renderers",
+        # Other metadata
+        "render_template_as_native_obj",
+    }
+
+    _EXCLUDED_DAG_FIELDS: set[str] = {
+        # Display/documentation
+        "description",
+        "doc_md",
+        # Config
+        "render_template_as_native_obj",
+        # Location (can change without affecting logic)
+        "fileloc",
+        "relative_fileloc",
+    }

Review Comment:
   Do we need to add start_date and trigger_rule here or is it implicitly 
handled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to