jedcunningham commented on code in PR #28586:
URL: https://github.com/apache/airflow/pull/28586#discussion_r1059023638


##########
airflow/executors/base_executor.py:
##########
@@ -212,16 +253,20 @@ def trigger_tasks(self, open_slots: int) -> None:
             # removed from the running set in the meantime.
             if key in self.running:
                 attempt = self.attempts[key]
-                if attempt < QUEUEING_ATTEMPTS - 1:
-                    self.attempts[key] = attempt + 1
-                    self.log.info("task %s is still running", key)
+                if attempt.can_try_again():
+                    # if it hasn't been much time since first check, let it be 
checked again next time
+                    self.log.info("queued but still running; attempt=%s 
task=%s", attempt.total_tries, key)
                     continue
-
-                # We give up and remove the task from the queue.
-                self.log.error("could not queue task %s (still running after 
%d attempts)", key, attempt)
-                del self.attempts[key]
-                del self.queued_tasks[key]
+                else:
+                    # Otherwise, we give up and remove the task from the queue.
+                    self.log.error(
+                        "could not queue task %s (still running after %d 
attempts)", key, attempt.total_tries
+                    )
+                    del self.attempts[key]
+                    del self.queued_tasks[key]

Review Comment:
   ```suggestion
   
                   # Otherwise, we give up and remove the task from the queue.
                   self.log.error("could not queue task %s (still running after 
%d attempts)", key, attempt.total_tries)
                   del self.attempts[key]
                   del self.queued_tasks[key]
   ```
   
   nit: don't need the else



##########
tests/executors/test_base_executor.py:
##########
@@ -114,25 +126,67 @@ def test_trigger_running_tasks(dag_maker, 
change_state_attempt):
 
     # All the tasks are now "running", so while we enqueue them again here,
     # they won't be executed again until the executor has been notified of a 
state change.
-    enqueue_tasks(executor, dagrun)
+    ti = dagrun.task_instances[0]
+    assert ti.key in executor.running
+    assert ti.key not in executor.queued_tasks
+    executor.queue_command(ti, ["airflow"])
+
+    # this is the problem we're dealing with: ti.key both queued and running
+    assert ti.key in executor.queued_tasks and ti.key in executor.running
+    assert len(executor.attempts) == 0
+    executor.trigger_tasks(open_slots)
 
-    for attempt in range(QUEUEING_ATTEMPTS + 2):
-        # On the configured attempt, we notify the executor that the task has 
succeeded.
-        if attempt == change_state_attempt:
-            executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
-            # If we have not exceeded QUEUEING_ATTEMPTS, we should expect an 
additional "execute" call
-            if attempt < QUEUEING_ATTEMPTS:
-                expected_calls += 1
+    # first trigger call after queueing again creates an attempt object
+    assert len(executor.attempts) == 1
+    assert ti.key in executor.attempts
+
+    for attempt in range(2, change_state_num + 2):
         executor.trigger_tasks(open_slots)
-        assert len(executor.execute_async.mock_calls) == expected_calls
-    if change_state_attempt < QUEUEING_ATTEMPTS:
-        assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances) + 1
-    else:
-        assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances)
+        if attempt <= min(can_try_num, change_state_num):
+            assert ti.key in executor.queued_tasks and ti.key in 
executor.running
+        # On the configured attempt, we notify the executor that the task has 
succeeded.
+        if attempt == change_state_num:
+            executor.change_state(ti.key, State.SUCCESS)
+            assert ti.key not in executor.running
+    # retry was ok when state changed, ti.key will be in running (for the 
second time
+    if can_try_num >= change_state_num:
+        assert ti.key in executor.running
+    else:  # otherwise, it won't be
+        assert ti.key not in executor.running
+    # either way, ti.key not in queued -- it was either removed because never 
left running
+    # or it was moved out when run 2nd time
+    assert ti.key not in executor.queued_tasks
+    assert not executor.attempts
+
+    # we expect one more "execute_async" if TI was marked successful
+    # this would move it out of running set and free the queued TI to be 
executed again
+    if second_exec is True:
+        expected_calls += 1
+
+    assert len(executor.execute_async.mock_calls) == expected_calls

Review Comment:
   There is also another one of these further up I can't make a suggestion for.



##########
airflow/executors/base_executor.py:
##########
@@ -54,6 +57,44 @@
 # Task tuple to send to be executed
 TaskTuple = Tuple[TaskInstanceKey, CommandType, Optional[str], Optional[Any]]
 
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class RunningRetryAttemptType:
+    """
+    For keeping track of attempts to queue again when task still apparently 
running.
+
+    We don't want to slow down the loop, so we don't block, but we allow it to 
be
+    re-checked for at least MIN_SECONDS seconds.
+    """
+
+    MIN_SECONDS = 5

Review Comment:
   Is this long enough?



##########
tests/executors/test_base_executor.py:
##########
@@ -114,25 +126,67 @@ def test_trigger_running_tasks(dag_maker, 
change_state_attempt):
 
     # All the tasks are now "running", so while we enqueue them again here,
     # they won't be executed again until the executor has been notified of a 
state change.
-    enqueue_tasks(executor, dagrun)
+    ti = dagrun.task_instances[0]
+    assert ti.key in executor.running
+    assert ti.key not in executor.queued_tasks
+    executor.queue_command(ti, ["airflow"])
+
+    # this is the problem we're dealing with: ti.key both queued and running
+    assert ti.key in executor.queued_tasks and ti.key in executor.running
+    assert len(executor.attempts) == 0
+    executor.trigger_tasks(open_slots)
 
-    for attempt in range(QUEUEING_ATTEMPTS + 2):
-        # On the configured attempt, we notify the executor that the task has 
succeeded.
-        if attempt == change_state_attempt:
-            executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
-            # If we have not exceeded QUEUEING_ATTEMPTS, we should expect an 
additional "execute" call
-            if attempt < QUEUEING_ATTEMPTS:
-                expected_calls += 1
+    # first trigger call after queueing again creates an attempt object
+    assert len(executor.attempts) == 1
+    assert ti.key in executor.attempts
+
+    for attempt in range(2, change_state_num + 2):
         executor.trigger_tasks(open_slots)
-        assert len(executor.execute_async.mock_calls) == expected_calls
-    if change_state_attempt < QUEUEING_ATTEMPTS:
-        assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances) + 1
-    else:
-        assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances)
+        if attempt <= min(can_try_num, change_state_num):
+            assert ti.key in executor.queued_tasks and ti.key in 
executor.running
+        # On the configured attempt, we notify the executor that the task has 
succeeded.
+        if attempt == change_state_num:
+            executor.change_state(ti.key, State.SUCCESS)
+            assert ti.key not in executor.running
+    # retry was ok when state changed, ti.key will be in running (for the 
second time
+    if can_try_num >= change_state_num:
+        assert ti.key in executor.running
+    else:  # otherwise, it won't be
+        assert ti.key not in executor.running
+    # either way, ti.key not in queued -- it was either removed because never 
left running
+    # or it was moved out when run 2nd time
+    assert ti.key not in executor.queued_tasks
+    assert not executor.attempts
+
+    # we expect one more "execute_async" if TI was marked successful
+    # this would move it out of running set and free the queued TI to be 
executed again
+    if second_exec is True:
+        expected_calls += 1
+
+    assert len(executor.execute_async.mock_calls) == expected_calls

Review Comment:
   ```suggestion
       assert executor.execute_async.call_count == expected_calls
   ```



##########
tests/executors/test_base_executor.py:
##########
@@ -114,25 +126,67 @@ def test_trigger_running_tasks(dag_maker, 
change_state_attempt):
 
     # All the tasks are now "running", so while we enqueue them again here,
     # they won't be executed again until the executor has been notified of a 
state change.
-    enqueue_tasks(executor, dagrun)
+    ti = dagrun.task_instances[0]
+    assert ti.key in executor.running
+    assert ti.key not in executor.queued_tasks
+    executor.queue_command(ti, ["airflow"])
+
+    # this is the problem we're dealing with: ti.key both queued and running
+    assert ti.key in executor.queued_tasks and ti.key in executor.running
+    assert len(executor.attempts) == 0
+    executor.trigger_tasks(open_slots)
 
-    for attempt in range(QUEUEING_ATTEMPTS + 2):
-        # On the configured attempt, we notify the executor that the task has 
succeeded.
-        if attempt == change_state_attempt:
-            executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
-            # If we have not exceeded QUEUEING_ATTEMPTS, we should expect an 
additional "execute" call
-            if attempt < QUEUEING_ATTEMPTS:
-                expected_calls += 1
+    # first trigger call after queueing again creates an attempt object
+    assert len(executor.attempts) == 1
+    assert ti.key in executor.attempts
+
+    for attempt in range(2, change_state_num + 2):
         executor.trigger_tasks(open_slots)
-        assert len(executor.execute_async.mock_calls) == expected_calls
-    if change_state_attempt < QUEUEING_ATTEMPTS:
-        assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances) + 1
-    else:
-        assert len(executor.execute_async.mock_calls) == 
len(dagrun.task_instances)
+        if attempt <= min(can_try_num, change_state_num):
+            assert ti.key in executor.queued_tasks and ti.key in 
executor.running
+        # On the configured attempt, we notify the executor that the task has 
succeeded.
+        if attempt == change_state_num:
+            executor.change_state(ti.key, State.SUCCESS)
+            assert ti.key not in executor.running
+    # retry was ok when state changed, ti.key will be in running (for the 
second time

Review Comment:
   ```suggestion
       # retry was ok when state changed, ti.key will be in running (for the 
second time)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to