This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 85c3fbac2df Remove regexp from dag.partial_subset method and update
callers (#47360)
85c3fbac2df is described below
commit 85c3fbac2df1fd4c6209b80ac2fd034dbdb131b6
Author: Bugra Ozturk <[email protected]>
AuthorDate: Fri Mar 7 12:51:12 2025 +0100
Remove regexp from dag.partial_subset method and update callers (#47360)
* Remove regexp from dag.partial_subset method and update callers
* Remove casting and convert list checking to old method
* Remove not needed casting
---
.../api_fastapi/core_api/routes/public/task_instances.py | 2 +-
airflow/api_fastapi/core_api/routes/ui/grid.py | 2 +-
airflow/api_fastapi/core_api/routes/ui/structure.py | 2 +-
airflow/cli/commands/remote_commands/task_command.py | 2 +-
airflow/models/dag.py | 6 +++---
task-sdk/src/airflow/sdk/definitions/dag.py | 13 +++++--------
tests/models/test_dag.py | 6 +++---
tests/sensors/test_external_task_sensor.py | 4 ++--
tests/utils/test_task_group.py | 2 +-
9 files changed, 18 insertions(+), 21 deletions(-)
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index cc6d7e5e8cb..a43c3c70780 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -649,7 +649,7 @@ def post_clear_task_instances(
if task_ids is not None:
task_id = [task[0] if isinstance(task, tuple) else task for task in
task_ids]
dag = dag.partial_subset(
- task_ids_or_regex=task_id,
+ task_ids=task_id,
include_downstream=downstream,
include_upstream=upstream,
)
diff --git a/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow/api_fastapi/core_api/routes/ui/grid.py
index c8f4440ec69..10705bb7c93 100644
--- a/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -134,7 +134,7 @@ def grid_data(
if root:
task_node_map_exclude = get_task_group_map(
dag=dag.partial_subset(
- task_ids_or_regex=root,
+ task_ids=root,
include_upstream=include_upstream,
include_downstream=include_downstream,
)
diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py
b/airflow/api_fastapi/core_api/routes/ui/structure.py
index 774952149cd..f3d13d871e4 100644
--- a/airflow/api_fastapi/core_api/routes/ui/structure.py
+++ b/airflow/api_fastapi/core_api/routes/ui/structure.py
@@ -70,7 +70,7 @@ def structure_data(
if root:
dag = dag.partial_subset(
- task_ids_or_regex=root, include_upstream=include_upstream,
include_downstream=include_downstream
+ task_ids=root, include_upstream=include_upstream,
include_downstream=include_downstream
)
nodes = [task_group_to_dict(child) for child in
dag.task_group.topological_sort()]
diff --git a/airflow/cli/commands/remote_commands/task_command.py
b/airflow/cli/commands/remote_commands/task_command.py
index 7417c5c1fa3..32a6f4a2870 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -456,7 +456,7 @@ def task_clear(args) -> None:
if args.task_regex:
for idx, dag in enumerate(dags):
dags[idx] = dag.partial_subset(
- task_ids_or_regex=args.task_regex,
+ task_ids=args.task_regex,
include_downstream=args.downstream,
include_upstream=args.upstream,
)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index cbb73e45f84..619cc9e1d14 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1135,7 +1135,7 @@ class DAG(TaskSDKDag, LoggingMixin):
if not external_dag:
raise AirflowException(f"Could not find dag
{tii.dag_id}")
downstream = external_dag.partial_subset(
- task_ids_or_regex=[tii.task_id],
+ task_ids=[tii.task_id],
include_upstream=False,
include_downstream=True,
)
@@ -1249,7 +1249,7 @@ class DAG(TaskSDKDag, LoggingMixin):
# Flush the session so that the tasks marked success are reflected in
the db.
session.flush()
subdag = self.partial_subset(
- task_ids_or_regex={task_id},
+ task_ids={task_id},
include_downstream=True,
include_upstream=False,
)
@@ -1361,7 +1361,7 @@ class DAG(TaskSDKDag, LoggingMixin):
# Flush the session so that the tasks marked success are reflected
in the db.
session.flush()
task_subset = self.partial_subset(
- task_ids_or_regex=task_ids,
+ task_ids=task_ids,
include_downstream=True,
include_upstream=False,
)
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index d716daa0f46..9a8f93fb9af 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -28,7 +28,6 @@ from collections import abc
from collections.abc import Collection, Iterable, MutableSet
from datetime import datetime, timedelta
from inspect import signature
-from re import Pattern
from typing import (
TYPE_CHECKING,
Any,
@@ -41,7 +40,6 @@ from urllib.parse import urlsplit
import attrs
import jinja2
-import re2
from dateutil.relativedelta import relativedelta
from airflow import settings
@@ -741,7 +739,7 @@ class DAG:
def partial_subset(
self,
- task_ids_or_regex: str | Pattern | Iterable[str],
+ task_ids: str | Iterable[str],
include_downstream=False,
include_upstream=True,
include_direct_upstream=False,
@@ -753,8 +751,7 @@ class DAG:
based on a regex that should match one or many tasks, and includes
upstream and downstream neighbours based on the flag passed.
- :param task_ids_or_regex: Either a list of task_ids, or a regex to
- match against task ids (as a string, or compiled regex pattern).
+ :param task_ids: Either a list of task_ids, or a string task_id
:param include_downstream: Include all downstream tasks of matched
tasks, in addition to matched tasks.
:param include_upstream: Include all upstream tasks of matched tasks,
@@ -769,10 +766,10 @@ class DAG:
memo = {id(self.task_dict): None, id(self.task_group): None}
dag = copy.deepcopy(self, memo) # type: ignore
- if isinstance(task_ids_or_regex, (str, Pattern)):
- matched_tasks = [t for t in self.tasks if
re2.findall(task_ids_or_regex, t.task_id)]
+ if isinstance(task_ids, str):
+ matched_tasks = [t for t in self.tasks if task_ids in t.task_id]
else:
- matched_tasks = [t for t in self.tasks if t.task_id in
task_ids_or_regex]
+ matched_tasks = [t for t in self.tasks if t.task_id in task_ids]
also_include_ids: set[str] = set()
for t in matched_tasks:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 01befbc488f..fd77b62e491 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3046,7 +3046,7 @@ class TestTaskClearingSetupTeardownBehavior:
upstream = False
return set(
task.dag.partial_subset(
- task_ids_or_regex=[task.task_id],
+ task_ids=[task.task_id],
include_downstream=not upstream,
include_upstream=upstream,
).tasks
@@ -3058,7 +3058,7 @@ class TestTaskClearingSetupTeardownBehavior:
upstream = True
return set(
task.dag.partial_subset(
- task_ids_or_regex=task.task_id,
+ task_ids=task.task_id,
include_downstream=not upstream,
include_upstream=upstream,
).tasks
@@ -3069,7 +3069,7 @@ class TestTaskClearingSetupTeardownBehavior:
"""Helper to return tasks that would be cleared if **upstream**
selected."""
return set(
task.dag.partial_subset(
- task_ids_or_regex=[task.task_id],
+ task_ids=[task.task_id],
include_downstream=False,
include_upstream=False,
).tasks
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index 1b347526ec4..9bce3646c00 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -1286,7 +1286,7 @@ def clear_tasks(
"""
Clear the task and its downstream tasks recursively for the dag in the
given dagbag.
"""
- partial: DAG = dag.partial_subset(task_ids_or_regex=[task.task_id],
include_downstream=True)
+ partial: DAG = dag.partial_subset(task_ids=[task.task_id],
include_downstream=True)
return partial.clear(
start_date=start_date,
end_date=end_date,
@@ -1719,7 +1719,7 @@ def
test_clear_overlapping_external_task_marker_mapped_tasks(dag_bag_head_tail_m
session.flush()
dag = dag.partial_subset(
- task_ids_or_regex=["head"],
+ task_ids=["head"],
include_downstream=True,
include_upstream=False,
)
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index b6643348a4e..cfaeaf8288b 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -455,7 +455,7 @@ def test_sub_dag_task_group():
group234 >> group6
group234 >> task7
- subdag = dag.partial_subset(task_ids_or_regex="task5",
include_upstream=True, include_downstream=False)
+ subdag = dag.partial_subset(task_ids="task5", include_upstream=True,
include_downstream=False)
expected_node_id = {
"id": None,