This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch mapped-instance-actions in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9d5d8b28cb14ac6b780acb08f413f49ba0a9d93b Author: Ephraim Anierobi <[email protected]> AuthorDate: Fri Apr 15 09:20:02 2022 +0100 Apply suggestions from code review Co-authored-by: Jed Cunningham <[email protected]> Co-authored-by: Tzu-ping Chung <[email protected]> --- airflow/api/common/mark_tasks.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py index 8885ebb59e..a9e4f4812e 100644 --- a/airflow/api/common/mark_tasks.py +++ b/airflow/api/common/mark_tasks.py @@ -96,7 +96,7 @@ def set_state( tasks that did not exist. It will not create dag runs that are missing on the schedule (but it will as for subdag dag runs if needed). - :param tasks: the iterable of tasks or task, map_index tuple from which to work. + :param tasks: the iterable of tasks or (task, map_index) tuples from which to work. task.task.dag needs to be set :param run_id: the run_id of the dagrun to start looking from :param execution_date: the execution date from which to start looking(deprecated) @@ -119,9 +119,10 @@ def set_state( if execution_date and not timezone.is_localized(execution_date): raise ValueError(f"Received non-localized date {execution_date}") - t_dags = {task.dag for task in tasks if not isinstance(task, tuple)} - t_dags_2 = {item[0].dag for item in tasks if isinstance(item, tuple)} - task_dags = t_dags | t_dags_2 + task_dags = { + task[0].dag if isinstance(task, tuple) else task.dag + for task in tasks + } if len(task_dags) > 1: raise ValueError(f"Received tasks from multiple DAGs: {task_dags}") dag = next(iter(task_dags))
