This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0df12653cfab2272861a4e551268b28839d013a3 Author: Daniel Standish <[email protected]> AuthorDate: Wed Apr 23 15:44:15 2025 -0700 Remove some lingering subdag references (#49663) (cherry picked from commit b01e534195fff5dea03d000051e6469b83852f6c) --- airflow-core/src/airflow/models/dag.py | 12 ++++++------ airflow-core/src/airflow/models/dagbag.py | 4 ++-- airflow-core/src/airflow/security/permissions.py | 12 +----------- airflow-core/tests/unit/utils/test_task_group.py | 14 +++++++------- .../src/airflow/providers/fab/www/security/permissions.py | 12 +----------- task-sdk/src/airflow/sdk/definitions/dag.py | 14 +++++++------- 6 files changed, 24 insertions(+), 44 deletions(-) diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 13a93bd7de3..4626e0b3b46 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -1251,7 +1251,7 @@ class DAG(TaskSDKDag, LoggingMixin): # Clear downstream tasks that are in failed/upstream_failed state to resume them. # Flush the session so that the tasks marked success are reflected in the db. session.flush() - subdag = self.partial_subset( + subset = self.partial_subset( task_ids={task_id}, include_downstream=True, include_upstream=False, @@ -1273,9 +1273,9 @@ class DAG(TaskSDKDag, LoggingMixin): } if not future and not past: # Simple case 1: we're only dealing with exactly one run. clear_kwargs["run_id"] = run_id - subdag.clear(**clear_kwargs) + subset.clear(**clear_kwargs) elif future and past: # Simple case 2: we're clearing ALL runs. - subdag.clear(**clear_kwargs) + subset.clear(**clear_kwargs) else: # Complex cases: we may have more than one run, based on a date range. # Make 'future' and 'past' make some sense when multiple runs exist # for the same logical date. We order runs by their id and only @@ -1287,7 +1287,7 @@ class DAG(TaskSDKDag, LoggingMixin): else: clear_kwargs["end_date"] = logical_date exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) - subdag.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) + subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) return altered @provide_session @@ -1363,13 +1363,13 @@ class DAG(TaskSDKDag, LoggingMixin): # Clear downstream tasks that are in failed/upstream_failed state to resume them. # Flush the session so that the tasks marked success are reflected in the db. session.flush() - task_subset = self.partial_subset( + subset = self.partial_subset( task_ids=task_ids, include_downstream=True, include_upstream=False, ) - task_subset.clear( + subset.clear( start_date=start_date, end_date=end_date, only_failed=True, diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index 393d01ce7a2..977875b90f0 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -505,8 +505,8 @@ class DagBag(LoggingMixin): """ Add the DAG into the bag. - :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags. - :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag. + :raises: AirflowDagCycleException if a cycle is detected. + :raises: AirflowDagDuplicatedIdException if this dag already exists in the bag. """ check_cycle(dag) # throws if a task cycle is found diff --git a/airflow-core/src/airflow/security/permissions.py b/airflow-core/src/airflow/security/permissions.py index 647bcf0b0c6..7545fe9d930 100644 --- a/airflow-core/src/airflow/security/permissions.py +++ b/airflow-core/src/airflow/security/permissions.py @@ -95,13 +95,7 @@ PREFIX_RESOURCES_MAP = {details["prefix"]: resource for resource, details in RES def resource_name(root_dag_id: str, resource: str) -> str: - """ - Return the resource name for a DAG id. - - Note that since a sub-DAG should follow the permission of its - parent DAG, you should pass ``DagModel.root_dag_id`` to this function, - for a subdag. A normal dag should pass the ``DagModel.dag_id``. - """ + """Return the resource name for a DAG id.""" if root_dag_id in RESOURCE_DETAILS_MAP.keys(): return root_dag_id if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())): @@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str: """ Return the resource name for a DAG id. - Note that since a sub-DAG should follow the permission of its - parent DAG, you should pass ``DagModel.root_dag_id`` to this function, - for a subdag. A normal dag should pass the ``DagModel.dag_id``. - Note: This function is kept for backwards compatibility. """ if root_dag_id == RESOURCE_DAG: diff --git a/airflow-core/tests/unit/utils/test_task_group.py b/airflow-core/tests/unit/utils/test_task_group.py index 60acb78fbb4..544d81d9661 100644 --- a/airflow-core/tests/unit/utils/test_task_group.py +++ b/airflow-core/tests/unit/utils/test_task_group.py @@ -443,7 +443,7 @@ def test_sub_dag_task_group(): group234 >> group6 group234 >> task7 - subdag = dag.partial_subset(task_ids="task5", include_upstream=True, include_downstream=False) + subset = dag.partial_subset(task_ids="task5", include_upstream=True, include_downstream=False) expected_node_id = { "id": None, @@ -467,9 +467,9 @@ def test_sub_dag_task_group(): ], } - assert extract_node_id(task_group_to_dict(subdag.task_group)) == expected_node_id + assert extract_node_id(task_group_to_dict(subset.task_group)) == expected_node_id - edges = dag_edges(subdag) + edges = dag_edges(subset) assert sorted((e["source_id"], e["target_id"]) for e in edges) == [ ("group234.group34.downstream_join_id", "task5"), ("group234.group34.task3", "group234.group34.downstream_join_id"), @@ -479,19 +479,19 @@ def test_sub_dag_task_group(): ("task1", "group234.upstream_join_id"), ] - subdag_task_groups = subdag.task_group.get_task_group_dict() - assert subdag_task_groups.keys() == {None, "group234", "group234.group34"} + groups = subset.task_group.get_task_group_dict() + assert groups.keys() == {None, "group234", "group234.group34"} included_group_ids = {"group234", "group234.group34"} included_task_ids = {"group234.group34.task3", "group234.group34.task4", "task1", "task5"} - for task_group in subdag_task_groups.values(): + for task_group in groups.values(): assert task_group.upstream_group_ids.issubset(included_group_ids) assert task_group.downstream_group_ids.issubset(included_group_ids) assert task_group.upstream_task_ids.issubset(included_task_ids) assert task_group.downstream_task_ids.issubset(included_task_ids) - for task in subdag.task_group: + for task in subset.task_group: assert task.upstream_task_ids.issubset(included_task_ids) assert task.downstream_task_ids.issubset(included_task_ids) diff --git a/providers/fab/src/airflow/providers/fab/www/security/permissions.py b/providers/fab/src/airflow/providers/fab/www/security/permissions.py index 647bcf0b0c6..7545fe9d930 100644 --- a/providers/fab/src/airflow/providers/fab/www/security/permissions.py +++ b/providers/fab/src/airflow/providers/fab/www/security/permissions.py @@ -95,13 +95,7 @@ PREFIX_RESOURCES_MAP = {details["prefix"]: resource for resource, details in RES def resource_name(root_dag_id: str, resource: str) -> str: - """ - Return the resource name for a DAG id. - - Note that since a sub-DAG should follow the permission of its - parent DAG, you should pass ``DagModel.root_dag_id`` to this function, - for a subdag. A normal dag should pass the ``DagModel.dag_id``. - """ + """Return the resource name for a DAG id.""" if root_dag_id in RESOURCE_DETAILS_MAP.keys(): return root_dag_id if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())): @@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str: """ Return the resource name for a DAG id. - Note that since a sub-DAG should follow the permission of its - parent DAG, you should pass ``DagModel.root_dag_id`` to this function, - for a subdag. A normal dag should pass the ``DagModel.dag_id``. - Note: This function is kept for backwards compatibility. """ if root_dag_id == RESOURCE_DAG: diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index e9915d74cac..afe5d117a74 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -831,7 +831,7 @@ class DAG: } def filter_task_group(group, parent_group): - """Exclude tasks not included in the subdag from the given TaskGroup.""" + """Exclude tasks not included in the partial dag from the given TaskGroup.""" # We want to deepcopy _most but not all_ attributes of the task group, so we create a shallow copy # and then manually deep copy the instances. (memo argument to deepcopy only works for instances # of classes, not "native" properties of an instance) @@ -867,12 +867,12 @@ class DAG: # Removing upstream/downstream references to tasks and TaskGroups that did not make # the cut. - subdag_task_groups = dag.task_group.get_task_group_dict() - for group in subdag_task_groups.values(): - group.upstream_group_ids.intersection_update(subdag_task_groups) - group.downstream_group_ids.intersection_update(subdag_task_groups) - group.upstream_task_ids.intersection_update(dag.task_dict) - group.downstream_task_ids.intersection_update(dag.task_dict) + groups = dag.task_group.get_task_group_dict() + for g in groups.values(): + g.upstream_group_ids.intersection_update(groups) + g.downstream_group_ids.intersection_update(groups) + g.upstream_task_ids.intersection_update(dag.task_dict) + g.downstream_task_ids.intersection_update(dag.task_dict) for t in dag.tasks: # Removing upstream/downstream references to tasks that did not
