This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7ea4ace57dcd80920f63a9cd439eed53da5d67da
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Apr 13 21:03:34 2022 +0100

    fixup! fixup! fixup! fixup! fixup! Allow marking/clearing mapped 
taskinstances from the UI
---
 airflow/api/common/mark_tasks.py |  3 +--
 airflow/models/dag.py            | 21 ++++++++++++---------
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 84fd48f4e4..8885ebb59e 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -271,8 +271,7 @@ def _iter_existing_dag_run_infos(dag: DAG, run_ids: 
List[str]) -> Iterator[_DagR
         yield _DagRunInfo(dag_run.logical_date, 
dag.get_run_data_interval(dag_run))
 
 
-@provide_session
-def find_task_relatives(tasks, downstream, upstream, session: SASession = 
NEW_SESSION):
+def find_task_relatives(tasks, downstream, upstream):
     """Yield task ids and optionally ancestor and descendant ids."""
     for item in tasks:
         if isinstance(item, tuple):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 931fd469d7..8856a841d3 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1370,8 +1370,8 @@ class DAG(LoggingMixin):
     def _get_task_instances(
         self,
         *,
-        task_ids: Iterable[str],
-        task_ids_and_map_indexes: Optional[Iterable[Tuple[str, int]]],
+        task_ids,
+        task_ids_and_map_indexes,
         start_date: Optional[datetime],
         end_date: Optional[datetime],
         run_id: Optional[str],
@@ -1380,7 +1380,7 @@ class DAG(LoggingMixin):
         include_parentdag: bool,
         include_dependent_dags: bool,
         exclude_task_ids: Collection[str],
-        exclude_task_ids_and_map_indexes: Collection[Tuple[str, int]],
+        exclude_task_ids_and_map_indexes,
         session: Session,
         dag_bag: Optional["DagBag"] = ...,
     ) -> Iterable[TaskInstance]:
@@ -1390,8 +1390,8 @@ class DAG(LoggingMixin):
     def _get_task_instances(
         self,
         *,
-        task_ids: Iterable[str],
-        task_ids_and_map_indexes: Optional[Iterable[Tuple[str, int]]],
+        task_ids,
+        task_ids_and_map_indexes,
         as_pk_tuple: Literal[True],
         start_date: Optional[datetime],
         end_date: Optional[datetime],
@@ -1401,7 +1401,7 @@ class DAG(LoggingMixin):
         include_parentdag: bool,
         include_dependent_dags: bool,
         exclude_task_ids: Collection[str],
-        exclude_task_ids_and_map_indexes: Collection[Tuple[str, int]],
+        exclude_task_ids_and_map_indexes,
         session: Session,
         dag_bag: Optional["DagBag"] = ...,
         recursion_depth: int = ...,
@@ -1413,8 +1413,8 @@ class DAG(LoggingMixin):
     def _get_task_instances(
         self,
         *,
-        task_ids: Iterable[str],
-        task_ids_and_map_indexes: Optional[Iterable[Tuple[str, int]]],
+        task_ids,
+        task_ids_and_map_indexes,
         as_pk_tuple: Literal[True, None] = None,
         start_date: Optional[datetime],
         end_date: Optional[datetime],
@@ -1424,7 +1424,7 @@ class DAG(LoggingMixin):
         include_parentdag: bool,
         include_dependent_dags: bool,
         exclude_task_ids: Collection[str],
-        exclude_task_ids_and_map_indexes: Collection[Tuple[str, int]],
+        exclude_task_ids_and_map_indexes,
         session: Session,
         dag_bag: Optional["DagBag"] = None,
         recursion_depth: int = 0,
@@ -1587,6 +1587,7 @@ class DAG(LoggingMixin):
                     )
                     result.update(
                         downstream._get_task_instances(
+                            task_ids=None,
                             task_ids_and_map_indexes=None,
                             run_id=tii.run_id,
                             start_date=None,
@@ -1596,6 +1597,7 @@ class DAG(LoggingMixin):
                             include_dependent_dags=include_dependent_dags,
                             include_parentdag=False,
                             as_pk_tuple=True,
+                            exclude_task_ids=exclude_task_ids,
                             
exclude_task_ids_and_map_indexes=exclude_task_ids_and_map_indexes,
                             dag_bag=dag_bag,
                             session=session,
@@ -1800,6 +1802,7 @@ class DAG(LoggingMixin):
         Clears a set of task instances associated with the current dag for
         a specified date range.
 
+        :param task_ids: List of task ids to clear
         :param task_ids_and_map_indexes: List of tuple of task_id, map_index 
to clear
         :param start_date: The minimum execution_date to clear
         :param end_date: The maximum execution_date to clear

Reply via email to