This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch mapped-instance-actions in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7ea4ace57dcd80920f63a9cd439eed53da5d67da Author: Ephraim Anierobi <[email protected]> AuthorDate: Wed Apr 13 21:03:34 2022 +0100 fixup! fixup! fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI --- airflow/api/common/mark_tasks.py | 3 +-- airflow/models/dag.py | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py index 84fd48f4e4..8885ebb59e 100644 --- a/airflow/api/common/mark_tasks.py +++ b/airflow/api/common/mark_tasks.py @@ -271,8 +271,7 @@ def _iter_existing_dag_run_infos(dag: DAG, run_ids: List[str]) -> Iterator[_DagR yield _DagRunInfo(dag_run.logical_date, dag.get_run_data_interval(dag_run)) -@provide_session -def find_task_relatives(tasks, downstream, upstream, session: SASession = NEW_SESSION): +def find_task_relatives(tasks, downstream, upstream): """Yield task ids and optionally ancestor and descendant ids.""" for item in tasks: if isinstance(item, tuple): diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 931fd469d7..8856a841d3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1370,8 +1370,8 @@ class DAG(LoggingMixin): def _get_task_instances( self, *, - task_ids: Iterable[str], - task_ids_and_map_indexes: Optional[Iterable[Tuple[str, int]]], + task_ids, + task_ids_and_map_indexes, start_date: Optional[datetime], end_date: Optional[datetime], run_id: Optional[str], @@ -1380,7 +1380,7 @@ class DAG(LoggingMixin): include_parentdag: bool, include_dependent_dags: bool, exclude_task_ids: Collection[str], - exclude_task_ids_and_map_indexes: Collection[Tuple[str, int]], + exclude_task_ids_and_map_indexes, session: Session, dag_bag: Optional["DagBag"] = ..., ) -> Iterable[TaskInstance]: @@ -1390,8 +1390,8 @@ class DAG(LoggingMixin): def _get_task_instances( self, *, - task_ids: Iterable[str], - task_ids_and_map_indexes: Optional[Iterable[Tuple[str, int]]], + task_ids, + task_ids_and_map_indexes, as_pk_tuple: Literal[True], start_date: Optional[datetime], end_date: Optional[datetime], @@ -1401,7 +1401,7 @@ class DAG(LoggingMixin): include_parentdag: bool, include_dependent_dags: bool, exclude_task_ids: Collection[str], - exclude_task_ids_and_map_indexes: Collection[Tuple[str, int]], + exclude_task_ids_and_map_indexes, session: Session, dag_bag: Optional["DagBag"] = ..., recursion_depth: int = ..., @@ -1413,8 +1413,8 @@ class DAG(LoggingMixin): def _get_task_instances( self, *, - task_ids: Iterable[str], - task_ids_and_map_indexes: Optional[Iterable[Tuple[str, int]]], + task_ids, + task_ids_and_map_indexes, as_pk_tuple: Literal[True, None] = None, start_date: Optional[datetime], end_date: Optional[datetime], @@ -1424,7 +1424,7 @@ class DAG(LoggingMixin): include_parentdag: bool, include_dependent_dags: bool, exclude_task_ids: Collection[str], - exclude_task_ids_and_map_indexes: Collection[Tuple[str, int]], + exclude_task_ids_and_map_indexes, session: Session, dag_bag: Optional["DagBag"] = None, recursion_depth: int = 0, @@ -1587,6 +1587,7 @@ class DAG(LoggingMixin): ) result.update( downstream._get_task_instances( + task_ids=None, task_ids_and_map_indexes=None, run_id=tii.run_id, start_date=None, @@ -1596,6 +1597,7 @@ class DAG(LoggingMixin): include_dependent_dags=include_dependent_dags, include_parentdag=False, as_pk_tuple=True, + exclude_task_ids=exclude_task_ids, exclude_task_ids_and_map_indexes=exclude_task_ids_and_map_indexes, dag_bag=dag_bag, session=session, @@ -1800,6 +1802,7 @@ class DAG(LoggingMixin): Clears a set of task instances associated with the current dag for a specified date range. + :param task_ids: List of task ids to clear :param task_ids_and_map_indexes: List of tuple of task_id, map_index to clear :param start_date: The minimum execution_date to clear :param end_date: The maximum execution_date to clear
