This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 85c3fbac2df Remove regexp from dag.partial_subset method and update 
callers (#47360)
85c3fbac2df is described below

commit 85c3fbac2df1fd4c6209b80ac2fd034dbdb131b6
Author: Bugra Ozturk <[email protected]>
AuthorDate: Fri Mar 7 12:51:12 2025 +0100

    Remove regexp from dag.partial_subset method and update callers (#47360)
    
    * Remove regexp from dag.partial_subset method and update callers
    
    * Remove casting and convert list checking to old method
    
    * Remove not needed casting
---
 .../api_fastapi/core_api/routes/public/task_instances.py    |  2 +-
 airflow/api_fastapi/core_api/routes/ui/grid.py              |  2 +-
 airflow/api_fastapi/core_api/routes/ui/structure.py         |  2 +-
 airflow/cli/commands/remote_commands/task_command.py        |  2 +-
 airflow/models/dag.py                                       |  6 +++---
 task-sdk/src/airflow/sdk/definitions/dag.py                 | 13 +++++--------
 tests/models/test_dag.py                                    |  6 +++---
 tests/sensors/test_external_task_sensor.py                  |  4 ++--
 tests/utils/test_task_group.py                              |  2 +-
 9 files changed, 18 insertions(+), 21 deletions(-)

diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index cc6d7e5e8cb..a43c3c70780 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -649,7 +649,7 @@ def post_clear_task_instances(
     if task_ids is not None:
         task_id = [task[0] if isinstance(task, tuple) else task for task in 
task_ids]
         dag = dag.partial_subset(
-            task_ids_or_regex=task_id,
+            task_ids=task_id,
             include_downstream=downstream,
             include_upstream=upstream,
         )
diff --git a/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow/api_fastapi/core_api/routes/ui/grid.py
index c8f4440ec69..10705bb7c93 100644
--- a/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -134,7 +134,7 @@ def grid_data(
     if root:
         task_node_map_exclude = get_task_group_map(
             dag=dag.partial_subset(
-                task_ids_or_regex=root,
+                task_ids=root,
                 include_upstream=include_upstream,
                 include_downstream=include_downstream,
             )
diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py 
b/airflow/api_fastapi/core_api/routes/ui/structure.py
index 774952149cd..f3d13d871e4 100644
--- a/airflow/api_fastapi/core_api/routes/ui/structure.py
+++ b/airflow/api_fastapi/core_api/routes/ui/structure.py
@@ -70,7 +70,7 @@ def structure_data(
 
     if root:
         dag = dag.partial_subset(
-            task_ids_or_regex=root, include_upstream=include_upstream, 
include_downstream=include_downstream
+            task_ids=root, include_upstream=include_upstream, 
include_downstream=include_downstream
         )
 
     nodes = [task_group_to_dict(child) for child in 
dag.task_group.topological_sort()]
diff --git a/airflow/cli/commands/remote_commands/task_command.py 
b/airflow/cli/commands/remote_commands/task_command.py
index 7417c5c1fa3..32a6f4a2870 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -456,7 +456,7 @@ def task_clear(args) -> None:
         if args.task_regex:
             for idx, dag in enumerate(dags):
                 dags[idx] = dag.partial_subset(
-                    task_ids_or_regex=args.task_regex,
+                    task_ids=args.task_regex,
                     include_downstream=args.downstream,
                     include_upstream=args.upstream,
                 )
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index cbb73e45f84..619cc9e1d14 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1135,7 +1135,7 @@ class DAG(TaskSDKDag, LoggingMixin):
                     if not external_dag:
                         raise AirflowException(f"Could not find dag 
{tii.dag_id}")
                     downstream = external_dag.partial_subset(
-                        task_ids_or_regex=[tii.task_id],
+                        task_ids=[tii.task_id],
                         include_upstream=False,
                         include_downstream=True,
                     )
@@ -1249,7 +1249,7 @@ class DAG(TaskSDKDag, LoggingMixin):
         # Flush the session so that the tasks marked success are reflected in 
the db.
         session.flush()
         subdag = self.partial_subset(
-            task_ids_or_regex={task_id},
+            task_ids={task_id},
             include_downstream=True,
             include_upstream=False,
         )
@@ -1361,7 +1361,7 @@ class DAG(TaskSDKDag, LoggingMixin):
             # Flush the session so that the tasks marked success are reflected 
in the db.
             session.flush()
             task_subset = self.partial_subset(
-                task_ids_or_regex=task_ids,
+                task_ids=task_ids,
                 include_downstream=True,
                 include_upstream=False,
             )
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py 
b/task-sdk/src/airflow/sdk/definitions/dag.py
index d716daa0f46..9a8f93fb9af 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -28,7 +28,6 @@ from collections import abc
 from collections.abc import Collection, Iterable, MutableSet
 from datetime import datetime, timedelta
 from inspect import signature
-from re import Pattern
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -41,7 +40,6 @@ from urllib.parse import urlsplit
 
 import attrs
 import jinja2
-import re2
 from dateutil.relativedelta import relativedelta
 
 from airflow import settings
@@ -741,7 +739,7 @@ class DAG:
 
     def partial_subset(
         self,
-        task_ids_or_regex: str | Pattern | Iterable[str],
+        task_ids: str | Iterable[str],
         include_downstream=False,
         include_upstream=True,
         include_direct_upstream=False,
@@ -753,8 +751,7 @@ class DAG:
         based on a regex that should match one or many tasks, and includes
         upstream and downstream neighbours based on the flag passed.
 
-        :param task_ids_or_regex: Either a list of task_ids, or a regex to
-            match against task ids (as a string, or compiled regex pattern).
+        :param task_ids: Either a list of task_ids, or a string task_id
         :param include_downstream: Include all downstream tasks of matched
             tasks, in addition to matched tasks.
         :param include_upstream: Include all upstream tasks of matched tasks,
@@ -769,10 +766,10 @@ class DAG:
         memo = {id(self.task_dict): None, id(self.task_group): None}
         dag = copy.deepcopy(self, memo)  # type: ignore
 
-        if isinstance(task_ids_or_regex, (str, Pattern)):
-            matched_tasks = [t for t in self.tasks if 
re2.findall(task_ids_or_regex, t.task_id)]
+        if isinstance(task_ids, str):
+            matched_tasks = [t for t in self.tasks if task_ids in t.task_id]
         else:
-            matched_tasks = [t for t in self.tasks if t.task_id in 
task_ids_or_regex]
+            matched_tasks = [t for t in self.tasks if t.task_id in task_ids]
 
         also_include_ids: set[str] = set()
         for t in matched_tasks:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 01befbc488f..fd77b62e491 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3046,7 +3046,7 @@ class TestTaskClearingSetupTeardownBehavior:
         upstream = False
         return set(
             task.dag.partial_subset(
-                task_ids_or_regex=[task.task_id],
+                task_ids=[task.task_id],
                 include_downstream=not upstream,
                 include_upstream=upstream,
             ).tasks
@@ -3058,7 +3058,7 @@ class TestTaskClearingSetupTeardownBehavior:
         upstream = True
         return set(
             task.dag.partial_subset(
-                task_ids_or_regex=task.task_id,
+                task_ids=task.task_id,
                 include_downstream=not upstream,
                 include_upstream=upstream,
             ).tasks
@@ -3069,7 +3069,7 @@ class TestTaskClearingSetupTeardownBehavior:
         """Helper to return tasks that would be cleared if **upstream** 
selected."""
         return set(
             task.dag.partial_subset(
-                task_ids_or_regex=[task.task_id],
+                task_ids=[task.task_id],
                 include_downstream=False,
                 include_upstream=False,
             ).tasks
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index 1b347526ec4..9bce3646c00 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -1286,7 +1286,7 @@ def clear_tasks(
     """
     Clear the task and its downstream tasks recursively for the dag in the 
given dagbag.
     """
-    partial: DAG = dag.partial_subset(task_ids_or_regex=[task.task_id], 
include_downstream=True)
+    partial: DAG = dag.partial_subset(task_ids=[task.task_id], 
include_downstream=True)
     return partial.clear(
         start_date=start_date,
         end_date=end_date,
@@ -1719,7 +1719,7 @@ def 
test_clear_overlapping_external_task_marker_mapped_tasks(dag_bag_head_tail_m
     session.flush()
 
     dag = dag.partial_subset(
-        task_ids_or_regex=["head"],
+        task_ids=["head"],
         include_downstream=True,
         include_upstream=False,
     )
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index b6643348a4e..cfaeaf8288b 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -455,7 +455,7 @@ def test_sub_dag_task_group():
         group234 >> group6
         group234 >> task7
 
-    subdag = dag.partial_subset(task_ids_or_regex="task5", 
include_upstream=True, include_downstream=False)
+    subdag = dag.partial_subset(task_ids="task5", include_upstream=True, 
include_downstream=False)
 
     expected_node_id = {
         "id": None,

Reply via email to