This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b01e534195f Remove some lingering subdag references (#49663)
b01e534195f is described below

commit b01e534195fff5dea03d000051e6469b83852f6c
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Apr 23 15:44:15 2025 -0700

    Remove some lingering subdag references (#49663)
---
 airflow-core/src/airflow/models/dag.py                     | 12 ++++++------
 airflow-core/src/airflow/models/dagbag.py                  |  4 ++--
 airflow-core/src/airflow/security/permissions.py           | 12 +-----------
 airflow-core/tests/unit/utils/test_task_group.py           | 14 +++++++-------
 .../src/airflow/providers/fab/www/security/permissions.py  | 12 +-----------
 task-sdk/src/airflow/sdk/definitions/dag.py                | 14 +++++++-------
 6 files changed, 24 insertions(+), 44 deletions(-)

diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 13a93bd7de3..4626e0b3b46 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -1251,7 +1251,7 @@ class DAG(TaskSDKDag, LoggingMixin):
         # Clear downstream tasks that are in failed/upstream_failed state to 
resume them.
         # Flush the session so that the tasks marked success are reflected in 
the db.
         session.flush()
-        subdag = self.partial_subset(
+        subset = self.partial_subset(
             task_ids={task_id},
             include_downstream=True,
             include_upstream=False,
@@ -1273,9 +1273,9 @@ class DAG(TaskSDKDag, LoggingMixin):
         }
         if not future and not past:  # Simple case 1: we're only dealing with 
exactly one run.
             clear_kwargs["run_id"] = run_id
-            subdag.clear(**clear_kwargs)
+            subset.clear(**clear_kwargs)
         elif future and past:  # Simple case 2: we're clearing ALL runs.
-            subdag.clear(**clear_kwargs)
+            subset.clear(**clear_kwargs)
         else:  # Complex cases: we may have more than one run, based on a date 
range.
             # Make 'future' and 'past' make some sense when multiple runs exist
             # for the same logical date. We order runs by their id and only
@@ -1287,7 +1287,7 @@ class DAG(TaskSDKDag, LoggingMixin):
             else:
                 clear_kwargs["end_date"] = logical_date
                 exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < 
dr_id)
-            
subdag.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), 
**clear_kwargs)
+            
subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), 
**clear_kwargs)
         return altered
 
     @provide_session
@@ -1363,13 +1363,13 @@ class DAG(TaskSDKDag, LoggingMixin):
             # Clear downstream tasks that are in failed/upstream_failed state 
to resume them.
             # Flush the session so that the tasks marked success are reflected 
in the db.
             session.flush()
-            task_subset = self.partial_subset(
+            subset = self.partial_subset(
                 task_ids=task_ids,
                 include_downstream=True,
                 include_upstream=False,
             )
 
-            task_subset.clear(
+            subset.clear(
                 start_date=start_date,
                 end_date=end_date,
                 only_failed=True,
diff --git a/airflow-core/src/airflow/models/dagbag.py 
b/airflow-core/src/airflow/models/dagbag.py
index 393d01ce7a2..977875b90f0 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -505,8 +505,8 @@ class DagBag(LoggingMixin):
         """
         Add the DAG into the bag.
 
-        :raises: AirflowDagCycleException if a cycle is detected in this dag 
or its subdags.
-        :raises: AirflowDagDuplicatedIdException if this dag or its subdags 
already exists in the bag.
+        :raises: AirflowDagCycleException if a cycle is detected.
+        :raises: AirflowDagDuplicatedIdException if this dag already exists in 
the bag.
         """
         check_cycle(dag)  # throws if a task cycle is found
 
diff --git a/airflow-core/src/airflow/security/permissions.py 
b/airflow-core/src/airflow/security/permissions.py
index 647bcf0b0c6..7545fe9d930 100644
--- a/airflow-core/src/airflow/security/permissions.py
+++ b/airflow-core/src/airflow/security/permissions.py
@@ -95,13 +95,7 @@ PREFIX_RESOURCES_MAP = {details["prefix"]: resource for 
resource, details in RES
 
 
 def resource_name(root_dag_id: str, resource: str) -> str:
-    """
-    Return the resource name for a DAG id.
-
-    Note that since a sub-DAG should follow the permission of its
-    parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
-    for a subdag. A normal dag should pass the ``DagModel.dag_id``.
-    """
+    """Return the resource name for a DAG id."""
     if root_dag_id in RESOURCE_DETAILS_MAP.keys():
         return root_dag_id
     if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
@@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
     """
     Return the resource name for a DAG id.
 
-    Note that since a sub-DAG should follow the permission of its
-    parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
-    for a subdag. A normal dag should pass the ``DagModel.dag_id``.
-
     Note: This function is kept for backwards compatibility.
     """
     if root_dag_id == RESOURCE_DAG:
diff --git a/airflow-core/tests/unit/utils/test_task_group.py 
b/airflow-core/tests/unit/utils/test_task_group.py
index 60acb78fbb4..544d81d9661 100644
--- a/airflow-core/tests/unit/utils/test_task_group.py
+++ b/airflow-core/tests/unit/utils/test_task_group.py
@@ -443,7 +443,7 @@ def test_sub_dag_task_group():
         group234 >> group6
         group234 >> task7
 
-    subdag = dag.partial_subset(task_ids="task5", include_upstream=True, 
include_downstream=False)
+    subset = dag.partial_subset(task_ids="task5", include_upstream=True, 
include_downstream=False)
 
     expected_node_id = {
         "id": None,
@@ -467,9 +467,9 @@ def test_sub_dag_task_group():
         ],
     }
 
-    assert extract_node_id(task_group_to_dict(subdag.task_group)) == 
expected_node_id
+    assert extract_node_id(task_group_to_dict(subset.task_group)) == 
expected_node_id
 
-    edges = dag_edges(subdag)
+    edges = dag_edges(subset)
     assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
         ("group234.group34.downstream_join_id", "task5"),
         ("group234.group34.task3", "group234.group34.downstream_join_id"),
@@ -479,19 +479,19 @@ def test_sub_dag_task_group():
         ("task1", "group234.upstream_join_id"),
     ]
 
-    subdag_task_groups = subdag.task_group.get_task_group_dict()
-    assert subdag_task_groups.keys() == {None, "group234", "group234.group34"}
+    groups = subset.task_group.get_task_group_dict()
+    assert groups.keys() == {None, "group234", "group234.group34"}
 
     included_group_ids = {"group234", "group234.group34"}
     included_task_ids = {"group234.group34.task3", "group234.group34.task4", 
"task1", "task5"}
 
-    for task_group in subdag_task_groups.values():
+    for task_group in groups.values():
         assert task_group.upstream_group_ids.issubset(included_group_ids)
         assert task_group.downstream_group_ids.issubset(included_group_ids)
         assert task_group.upstream_task_ids.issubset(included_task_ids)
         assert task_group.downstream_task_ids.issubset(included_task_ids)
 
-    for task in subdag.task_group:
+    for task in subset.task_group:
         assert task.upstream_task_ids.issubset(included_task_ids)
         assert task.downstream_task_ids.issubset(included_task_ids)
 
diff --git 
a/providers/fab/src/airflow/providers/fab/www/security/permissions.py 
b/providers/fab/src/airflow/providers/fab/www/security/permissions.py
index 647bcf0b0c6..7545fe9d930 100644
--- a/providers/fab/src/airflow/providers/fab/www/security/permissions.py
+++ b/providers/fab/src/airflow/providers/fab/www/security/permissions.py
@@ -95,13 +95,7 @@ PREFIX_RESOURCES_MAP = {details["prefix"]: resource for 
resource, details in RES
 
 
 def resource_name(root_dag_id: str, resource: str) -> str:
-    """
-    Return the resource name for a DAG id.
-
-    Note that since a sub-DAG should follow the permission of its
-    parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
-    for a subdag. A normal dag should pass the ``DagModel.dag_id``.
-    """
+    """Return the resource name for a DAG id."""
     if root_dag_id in RESOURCE_DETAILS_MAP.keys():
         return root_dag_id
     if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
@@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
     """
     Return the resource name for a DAG id.
 
-    Note that since a sub-DAG should follow the permission of its
-    parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
-    for a subdag. A normal dag should pass the ``DagModel.dag_id``.
-
     Note: This function is kept for backwards compatibility.
     """
     if root_dag_id == RESOURCE_DAG:
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py 
b/task-sdk/src/airflow/sdk/definitions/dag.py
index e9915d74cac..afe5d117a74 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -831,7 +831,7 @@ class DAG:
         }
 
         def filter_task_group(group, parent_group):
-            """Exclude tasks not included in the subdag from the given 
TaskGroup."""
+            """Exclude tasks not included in the partial dag from the given 
TaskGroup."""
             # We want to deepcopy _most but not all_ attributes of the task 
group, so we create a shallow copy
             # and then manually deep copy the instances. (memo argument to 
deepcopy only works for instances
             # of classes, not "native" properties of an instance)
@@ -867,12 +867,12 @@ class DAG:
 
         # Removing upstream/downstream references to tasks and TaskGroups that 
did not make
         # the cut.
-        subdag_task_groups = dag.task_group.get_task_group_dict()
-        for group in subdag_task_groups.values():
-            group.upstream_group_ids.intersection_update(subdag_task_groups)
-            group.downstream_group_ids.intersection_update(subdag_task_groups)
-            group.upstream_task_ids.intersection_update(dag.task_dict)
-            group.downstream_task_ids.intersection_update(dag.task_dict)
+        groups = dag.task_group.get_task_group_dict()
+        for g in groups.values():
+            g.upstream_group_ids.intersection_update(groups)
+            g.downstream_group_ids.intersection_update(groups)
+            g.upstream_task_ids.intersection_update(dag.task_dict)
+            g.downstream_task_ids.intersection_update(dag.task_dict)
 
         for t in dag.tasks:
             # Removing upstream/downstream references to tasks that did not

Reply via email to