dstandish commented on a change in pull request #21316:
URL: https://github.com/apache/airflow/pull/21316#discussion_r806371277
##########
File path: tests/executors/test_base_executor.py
##########
@@ -66,8 +68,52 @@ def test_try_adopt_task_instances(dag_maker):
BaseOperator(task_id="task_2", start_date=start_date)
BaseOperator(task_id="task_3", start_date=start_date)
- dagrun = dag_maker.create_dagrun(execution_date=date)
- tis = dagrun.task_instances
+ return dag_maker.create_dagrun(execution_date=date)
+
+def test_try_adopt_task_instances(dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ tis = dagrun.task_instances
assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
assert BaseExecutor().try_adopt_task_instances(tis) == tis
+
+
+def enqueue_tasks(executor, dagrun):
+ for task_instance in dagrun.task_instances:
+ executor.queue_command(task_instance, ["airflow"])
+
+
+def setup_trigger_tasks(dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ executor = BaseExecutor()
+ executor.execute_async = mock.Mock()
+ enqueue_tasks(executor, dagrun)
+ return executor, dagrun
+
+
[email protected]("open_slots", [1, 2, 3])
+def test_trigger_queued_tasks(dag_maker, open_slots):
+ executor, _ = setup_trigger_tasks(dag_maker)
+ executor.trigger_tasks(open_slots)
+ assert len(executor.execute_async.mock_calls) == open_slots
+
+
[email protected]("change_state_attempt", range(QUEUEING_ATTEMPTS))
+def test_trigger_running_tasks(dag_maker, change_state_attempt):
+ executor, dagrun = setup_trigger_tasks(dag_maker)
+ open_slots = triggered = len(dagrun.task_instances)
+ executor.trigger_tasks(open_slots)
+
+ # All the tasks are now running, so while we _can_ enqueue them
+ # again, they won't be triggered during `trigger_tasks` until
+ # the executor has been notified of a state change.
+ enqueue_tasks(executor, dagrun)
+ for attempt in range(QUEUEING_ATTEMPTS):
+ # On the configured attempt, we notify the executor
+ # that the task has succeeded.
+ if attempt == change_state_attempt:
+ executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
+ # We then expect an additional triggered task.
+ triggered += 1
+ executor.trigger_tasks(open_slots)
+ assert len(executor.execute_async.mock_calls) == triggered
Review comment:
```suggestion
@mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS + 2))
def test_trigger_running_tasks(dag_maker, change_state_attempt):
executor, dagrun = setup_trigger_tasks(dag_maker)
open_slots = 100
executor.trigger_tasks(open_slots)
expected_calls = len(dagrun.task_instances) # initially `execute_async`
called for each task
assert len(executor.execute_async.mock_calls) == expected_calls
# All the tasks are now "running", so while we enqueue them again here,
# they won't be executed again until the executor has been notified of a
state change.
enqueue_tasks(executor, dagrun)
for attempt in range(QUEUEING_ATTEMPTS + 2):
# On the configured attempt, we notify the executor that the task
has succeeded.
if attempt == change_state_attempt:
executor.change_state(dagrun.task_instances[0].key,
State.SUCCESS)
# If we have not exceeded QUEUEING_ATTEMPTS, we should expect an
additional "execute" call
if attempt < QUEUEING_ATTEMPTS:
expected_calls += 1
executor.trigger_tasks(open_slots)
assert len(executor.execute_async.mock_calls) == expected_calls
if change_state_attempt < QUEUEING_ATTEMPTS:
assert len(executor.execute_async.mock_calls) ==
len(dagrun.task_instances) + 1
else:
assert len(executor.execute_async.mock_calls) ==
len(dagrun.task_instances)
```
while trying to verify and understand the change i modified the test to make
it a bit stronger.
we can include the case where "change_state_attempt" is beyond max queue
tries.
also i remove any possibility of interaction with 'open_slots' by setting
it to a high number.
and i rename `triggered` to `expected_calls` to reduce possibility of
confusion with the airflow service `triggerer` since the two are unrelated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]