This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b01e534195f Remove some lingering subdag references (#49663)
b01e534195f is described below
commit b01e534195fff5dea03d000051e6469b83852f6c
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Apr 23 15:44:15 2025 -0700
Remove some lingering subdag references (#49663)
---
airflow-core/src/airflow/models/dag.py | 12 ++++++------
airflow-core/src/airflow/models/dagbag.py | 4 ++--
airflow-core/src/airflow/security/permissions.py | 12 +-----------
airflow-core/tests/unit/utils/test_task_group.py | 14 +++++++-------
.../src/airflow/providers/fab/www/security/permissions.py | 12 +-----------
task-sdk/src/airflow/sdk/definitions/dag.py | 14 +++++++-------
6 files changed, 24 insertions(+), 44 deletions(-)
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 13a93bd7de3..4626e0b3b46 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -1251,7 +1251,7 @@ class DAG(TaskSDKDag, LoggingMixin):
# Clear downstream tasks that are in failed/upstream_failed state to
resume them.
# Flush the session so that the tasks marked success are reflected in
the db.
session.flush()
- subdag = self.partial_subset(
+ subset = self.partial_subset(
task_ids={task_id},
include_downstream=True,
include_upstream=False,
@@ -1273,9 +1273,9 @@ class DAG(TaskSDKDag, LoggingMixin):
}
if not future and not past: # Simple case 1: we're only dealing with
exactly one run.
clear_kwargs["run_id"] = run_id
- subdag.clear(**clear_kwargs)
+ subset.clear(**clear_kwargs)
elif future and past: # Simple case 2: we're clearing ALL runs.
- subdag.clear(**clear_kwargs)
+ subset.clear(**clear_kwargs)
else: # Complex cases: we may have more than one run, based on a date
range.
# Make 'future' and 'past' make some sense when multiple runs exist
# for the same logical date. We order runs by their id and only
@@ -1287,7 +1287,7 @@ class DAG(TaskSDKDag, LoggingMixin):
else:
clear_kwargs["end_date"] = logical_date
exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id <
dr_id)
-
subdag.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)),
**clear_kwargs)
+
subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)),
**clear_kwargs)
return altered
@provide_session
@@ -1363,13 +1363,13 @@ class DAG(TaskSDKDag, LoggingMixin):
# Clear downstream tasks that are in failed/upstream_failed state
to resume them.
# Flush the session so that the tasks marked success are reflected
in the db.
session.flush()
- task_subset = self.partial_subset(
+ subset = self.partial_subset(
task_ids=task_ids,
include_downstream=True,
include_upstream=False,
)
- task_subset.clear(
+ subset.clear(
start_date=start_date,
end_date=end_date,
only_failed=True,
diff --git a/airflow-core/src/airflow/models/dagbag.py
b/airflow-core/src/airflow/models/dagbag.py
index 393d01ce7a2..977875b90f0 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -505,8 +505,8 @@ class DagBag(LoggingMixin):
"""
Add the DAG into the bag.
- :raises: AirflowDagCycleException if a cycle is detected in this dag
or its subdags.
- :raises: AirflowDagDuplicatedIdException if this dag or its subdags
already exists in the bag.
+ :raises: AirflowDagCycleException if a cycle is detected.
+ :raises: AirflowDagDuplicatedIdException if this dag already exists in
the bag.
"""
check_cycle(dag) # throws if a task cycle is found
diff --git a/airflow-core/src/airflow/security/permissions.py
b/airflow-core/src/airflow/security/permissions.py
index 647bcf0b0c6..7545fe9d930 100644
--- a/airflow-core/src/airflow/security/permissions.py
+++ b/airflow-core/src/airflow/security/permissions.py
@@ -95,13 +95,7 @@ PREFIX_RESOURCES_MAP = {details["prefix"]: resource for
resource, details in RES
def resource_name(root_dag_id: str, resource: str) -> str:
- """
- Return the resource name for a DAG id.
-
- Note that since a sub-DAG should follow the permission of its
- parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
- for a subdag. A normal dag should pass the ``DagModel.dag_id``.
- """
+ """Return the resource name for a DAG id."""
if root_dag_id in RESOURCE_DETAILS_MAP.keys():
return root_dag_id
if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
@@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
"""
Return the resource name for a DAG id.
- Note that since a sub-DAG should follow the permission of its
- parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
- for a subdag. A normal dag should pass the ``DagModel.dag_id``.
-
Note: This function is kept for backwards compatibility.
"""
if root_dag_id == RESOURCE_DAG:
diff --git a/airflow-core/tests/unit/utils/test_task_group.py
b/airflow-core/tests/unit/utils/test_task_group.py
index 60acb78fbb4..544d81d9661 100644
--- a/airflow-core/tests/unit/utils/test_task_group.py
+++ b/airflow-core/tests/unit/utils/test_task_group.py
@@ -443,7 +443,7 @@ def test_sub_dag_task_group():
group234 >> group6
group234 >> task7
- subdag = dag.partial_subset(task_ids="task5", include_upstream=True,
include_downstream=False)
+ subset = dag.partial_subset(task_ids="task5", include_upstream=True,
include_downstream=False)
expected_node_id = {
"id": None,
@@ -467,9 +467,9 @@ def test_sub_dag_task_group():
],
}
- assert extract_node_id(task_group_to_dict(subdag.task_group)) ==
expected_node_id
+ assert extract_node_id(task_group_to_dict(subset.task_group)) ==
expected_node_id
- edges = dag_edges(subdag)
+ edges = dag_edges(subset)
assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
("group234.group34.downstream_join_id", "task5"),
("group234.group34.task3", "group234.group34.downstream_join_id"),
@@ -479,19 +479,19 @@ def test_sub_dag_task_group():
("task1", "group234.upstream_join_id"),
]
- subdag_task_groups = subdag.task_group.get_task_group_dict()
- assert subdag_task_groups.keys() == {None, "group234", "group234.group34"}
+ groups = subset.task_group.get_task_group_dict()
+ assert groups.keys() == {None, "group234", "group234.group34"}
included_group_ids = {"group234", "group234.group34"}
included_task_ids = {"group234.group34.task3", "group234.group34.task4",
"task1", "task5"}
- for task_group in subdag_task_groups.values():
+ for task_group in groups.values():
assert task_group.upstream_group_ids.issubset(included_group_ids)
assert task_group.downstream_group_ids.issubset(included_group_ids)
assert task_group.upstream_task_ids.issubset(included_task_ids)
assert task_group.downstream_task_ids.issubset(included_task_ids)
- for task in subdag.task_group:
+ for task in subset.task_group:
assert task.upstream_task_ids.issubset(included_task_ids)
assert task.downstream_task_ids.issubset(included_task_ids)
diff --git
a/providers/fab/src/airflow/providers/fab/www/security/permissions.py
b/providers/fab/src/airflow/providers/fab/www/security/permissions.py
index 647bcf0b0c6..7545fe9d930 100644
--- a/providers/fab/src/airflow/providers/fab/www/security/permissions.py
+++ b/providers/fab/src/airflow/providers/fab/www/security/permissions.py
@@ -95,13 +95,7 @@ PREFIX_RESOURCES_MAP = {details["prefix"]: resource for
resource, details in RES
def resource_name(root_dag_id: str, resource: str) -> str:
- """
- Return the resource name for a DAG id.
-
- Note that since a sub-DAG should follow the permission of its
- parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
- for a subdag. A normal dag should pass the ``DagModel.dag_id``.
- """
+ """Return the resource name for a DAG id."""
if root_dag_id in RESOURCE_DETAILS_MAP.keys():
return root_dag_id
if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
@@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
"""
Return the resource name for a DAG id.
- Note that since a sub-DAG should follow the permission of its
- parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
- for a subdag. A normal dag should pass the ``DagModel.dag_id``.
-
Note: This function is kept for backwards compatibility.
"""
if root_dag_id == RESOURCE_DAG:
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index e9915d74cac..afe5d117a74 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -831,7 +831,7 @@ class DAG:
}
def filter_task_group(group, parent_group):
- """Exclude tasks not included in the subdag from the given
TaskGroup."""
+ """Exclude tasks not included in the partial dag from the given
TaskGroup."""
# We want to deepcopy _most but not all_ attributes of the task
group, so we create a shallow copy
# and then manually deep copy the instances. (memo argument to
deepcopy only works for instances
# of classes, not "native" properties of an instance)
@@ -867,12 +867,12 @@ class DAG:
# Removing upstream/downstream references to tasks and TaskGroups that
did not make
# the cut.
- subdag_task_groups = dag.task_group.get_task_group_dict()
- for group in subdag_task_groups.values():
- group.upstream_group_ids.intersection_update(subdag_task_groups)
- group.downstream_group_ids.intersection_update(subdag_task_groups)
- group.upstream_task_ids.intersection_update(dag.task_dict)
- group.downstream_task_ids.intersection_update(dag.task_dict)
+ groups = dag.task_group.get_task_group_dict()
+ for g in groups.values():
+ g.upstream_group_ids.intersection_update(groups)
+ g.downstream_group_ids.intersection_update(groups)
+ g.upstream_task_ids.intersection_update(dag.task_dict)
+ g.downstream_task_ids.intersection_update(dag.task_dict)
for t in dag.tasks:
# Removing upstream/downstream references to tasks that did not