This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4bd0b56432 Refactor: less DRY in test_mapped_task_instance_endpoint
(#33710)
4bd0b56432 is described below
commit 4bd0b5643263768bc858f758512d28d6bd0494b0
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Thu Aug 24 22:29:35 2023 +0000
Refactor: less DRY in test_mapped_task_instance_endpoint (#33710)
---
.../test_mapped_task_instance_endpoint.py | 27 +++++++++-------------
1 file changed, 11 insertions(+), 16 deletions(-)
diff --git
a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index edba1ffd9c..e22a0b1e22 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import datetime as dt
+import itertools
import os
import urllib
@@ -91,8 +92,8 @@ class TestMappedTaskInstanceEndpoint:
clear_rendered_ti_fields()
def create_dag_runs_with_mapped_tasks(self, dag_maker, session, dags={}):
- for dag_id in dags:
- count = dags[dag_id]["success"] + dags[dag_id]["running"]
+ for dag_id, dag in dags.items():
+ count = dag["success"] + dag["running"]
with dag_maker(session=session, dag_id=dag_id,
start_date=DEFAULT_DATETIME_1):
task1 = BaseOperator(task_id="op1")
mapped =
MockOperator.partial(task_id="task_2").expand(arg2=task1.output)
@@ -118,22 +119,16 @@ class TestMappedTaskInstanceEndpoint:
TaskInstance.run_id == dr.run_id,
).delete()
- index = 0
- for i in range(dags[dag_id]["success"]):
- ti = TaskInstance(mapped, run_id=dr.run_id, map_index=index,
state=TaskInstanceState.SUCCESS)
- setattr(ti, "start_date", DEFAULT_DATETIME_1)
- session.add(ti)
- index += 1
- for i in range(dags[dag_id]["failed"]):
- ti = TaskInstance(mapped, run_id=dr.run_id, map_index=index,
state=TaskInstanceState.FAILED)
- setattr(ti, "start_date", DEFAULT_DATETIME_1)
- session.add(ti)
- index += 1
- for i in range(dags[dag_id]["running"]):
- ti = TaskInstance(mapped, run_id=dr.run_id, map_index=index,
state=TaskInstanceState.RUNNING)
+ for index, state in enumerate(
+ itertools.chain(
+ itertools.repeat(TaskInstanceState.SUCCESS,
dag["success"]),
+ itertools.repeat(TaskInstanceState.FAILED, dag["failed"]),
+ itertools.repeat(TaskInstanceState.RUNNING,
dag["running"]),
+ )
+ ):
+ ti = TaskInstance(mapped, run_id=dr.run_id, map_index=index,
state=state)
setattr(ti, "start_date", DEFAULT_DATETIME_1)
session.add(ti)
- index += 1
self.app.dag_bag = DagBag(os.devnull, include_examples=False)
self.app.dag_bag.dags = {dag_id: dag_maker.dag} # type: ignore