dstandish commented on a change in pull request #21316:
URL: https://github.com/apache/airflow/pull/21316#discussion_r806371277



##########
File path: tests/executors/test_base_executor.py
##########
@@ -66,8 +68,52 @@ def test_try_adopt_task_instances(dag_maker):
         BaseOperator(task_id="task_2", start_date=start_date)
         BaseOperator(task_id="task_3", start_date=start_date)
 
-    dagrun = dag_maker.create_dagrun(execution_date=date)
-    tis = dagrun.task_instances
+    return dag_maker.create_dagrun(execution_date=date)
+
 
+def test_try_adopt_task_instances(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
     assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
     assert BaseExecutor().try_adopt_task_instances(tis) == tis
+
+
+def enqueue_tasks(executor, dagrun):
+    for task_instance in dagrun.task_instances:
+        executor.queue_command(task_instance, ["airflow"])
+
+
+def setup_trigger_tasks(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    executor = BaseExecutor()
+    executor.execute_async = mock.Mock()
+    enqueue_tasks(executor, dagrun)
+    return executor, dagrun
+
+
[email protected]("open_slots", [1, 2, 3])
+def test_trigger_queued_tasks(dag_maker, open_slots):
+    executor, _ = setup_trigger_tasks(dag_maker)
+    executor.trigger_tasks(open_slots)
+    assert len(executor.execute_async.mock_calls) == open_slots
+
+
[email protected]("change_state_attempt", range(QUEUEING_ATTEMPTS))
+def test_trigger_running_tasks(dag_maker, change_state_attempt):
+    executor, dagrun = setup_trigger_tasks(dag_maker)
+    open_slots = triggered = len(dagrun.task_instances)
+    executor.trigger_tasks(open_slots)
+
+    # All the tasks are now running, so while we _can_ enqueue them
+    # again, they won't be triggered during `trigger_tasks` until
+    # the executor has been notified of a state change.
+    enqueue_tasks(executor, dagrun)
+    for attempt in range(QUEUEING_ATTEMPTS):
+        # On the configured attempt, we notify the executor
+        # that the task has succeeded.
+        if attempt == change_state_attempt:
+            executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
+            # We then expect an additional triggered task.
+            triggered += 1
+        executor.trigger_tasks(open_slots)
+        assert len(executor.execute_async.mock_calls) == triggered

Review comment:
       ```suggestion
   @mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS + 2))
   def test_trigger_running_tasks(dag_maker, change_state_attempt):
       executor, dagrun = setup_trigger_tasks(dag_maker)
       open_slots = 100
       executor.trigger_tasks(open_slots)
       expected_calls = len(dagrun.task_instances)  # initially `execute_async` 
called for each task
       assert len(executor.execute_async.mock_calls) == expected_calls
   
       # All the tasks are now "running", so while we enqueue them again here,
       # they won't be executed again until the executor has been notified of a 
state change.
       enqueue_tasks(executor, dagrun)
   
       for attempt in range(QUEUEING_ATTEMPTS + 2):
           # On the configured attempt, we notify the executor that the task 
has succeeded.
           if attempt == change_state_attempt:
               executor.change_state(dagrun.task_instances[0].key, 
State.SUCCESS)
               # If we have not exceeded QUEUEING_ATTEMPTS, we should expect an 
additional "execute" call
               if attempt < QUEUEING_ATTEMPTS:
                   expected_calls += 1
           executor.trigger_tasks(open_slots)
           assert len(executor.execute_async.mock_calls) == expected_calls
       if change_state_attempt < QUEUEING_ATTEMPTS:
           assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances) + 1
       else:
           assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances)
   ```
   
   while trying to verify and understand the change i modified the test to make 
it a bit  stronger.
   we can include the case where "change_state_attempt" is beyond max queue 
tries.
   also i remove any possibility of interaction with 'open_slots'  by setting 
it  to  a high  number.
   and i rename `triggered` to  `expected_calls` to  reduce possibility of 
confusion  with the  airflow service `triggerer`  since  the  two  are unrelated
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to