This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66ffe39b0b Do not fail requeued TIs (#23846)
66ffe39b0b is described below

commit 66ffe39b0b3ae233aeb80e77eea1b2b867cc8c45
Author: Tanel Kiis <[email protected]>
AuthorDate: Wed Jun 29 01:14:09 2022 +0300

    Do not fail requeued TIs (#23846)
---
 airflow/jobs/scheduler_job.py    | 16 +++++++++--
 tests/jobs/test_scheduler_job.py | 62 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 22ba5decb4..9bcb0c9176 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -628,7 +628,6 @@ class SchedulerJob(BaseJob):
             buffer_key = ti.key.with_try_number(try_number)
             state, info = event_buffer.pop(buffer_key)
 
-            # TODO: should we fail RUNNING as well, as we do in Backfills?
             if state == TaskInstanceState.QUEUED:
                 ti.external_executor_id = info
                 self.log.info("Setting external_id for %s to %s", ti, info)
@@ -664,7 +663,20 @@ class SchedulerJob(BaseJob):
                 ti.pid,
             )
 
-            if ti.try_number == buffer_key.try_number and ti.state == 
State.QUEUED:
+            # There are two scenarios why the same TI with the same try_number 
is queued
+            # after executor is finished with it:
+            # 1) the TI was killed externally and it had no time to mark 
itself failed
+            # - in this case we should mark it as failed here.
+            # 2) the TI has been requeued after getting deferred - in this 
case either our executor has it
+            # or the TI is queued by another job. Either ways we should not 
fail it.
+
+            # All of this could also happen if the state is "running",
+            # but that is handled by the zombie detection.
+
+            ti_queued = ti.try_number == buffer_key.try_number and ti.state == 
TaskInstanceState.QUEUED
+            ti_requeued = ti.queued_by_job_id != self.id or 
self.executor.has_task(ti)
+
+            if ti_queued and not ti_requeued:
                 Stats.incr('scheduler.tasks.killed_externally')
                 msg = (
                     "Executor reports task instance %s finished (%s) although 
the "
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6e0135ade7..db6df7dfeb 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -381,6 +381,68 @@ class TestSchedulerJob:
         ti1.refresh_from_db()
         assert ti1.state == State.FAILED
 
+    @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_process_executor_events_ti_requeued(self, mock_stats_incr, 
mock_task_callback, dag_maker):
+        dag_id = "test_process_executor_events_ti_requeued"
+        task_id_1 = 'dummy_task'
+
+        session = settings.Session()
+        with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
+            task1 = EmptyOperator(task_id=task_id_1)
+        ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+
+        mock_stats_incr.reset_mock()
+
+        executor = MockExecutor(do_update=False)
+        task_callback = mock.MagicMock()
+        mock_task_callback.return_value = task_callback
+        self.scheduler_job = SchedulerJob(executor=executor)
+        self.scheduler_job.id = 1
+        self.scheduler_job.processor_agent = mock.MagicMock()
+
+        # ti is queued with another try number - do not fail it
+        ti1.state = State.QUEUED
+        ti1.queued_by_job_id = 1
+        ti1.try_number = 2
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None
+
+        self.scheduler_job._process_executor_events(session=session)
+        ti1.refresh_from_db(session=session)
+        assert ti1.state == State.QUEUED
+        self.scheduler_job.executor.callback_sink.send.assert_not_called()
+
+        # ti is queued by another scheduler - do not fail it
+        ti1.state = State.QUEUED
+        ti1.queued_by_job_id = 2
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key] = State.SUCCESS, None
+
+        self.scheduler_job._process_executor_events(session=session)
+        ti1.refresh_from_db(session=session)
+        assert ti1.state == State.QUEUED
+        self.scheduler_job.executor.callback_sink.send.assert_not_called()
+
+        # ti is queued by this scheduler but it is handed back to the executor 
- do not fail it
+        ti1.state = State.QUEUED
+        ti1.queued_by_job_id = 1
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key] = State.SUCCESS, None
+        executor.has_task = mock.MagicMock(return_value=True)
+
+        self.scheduler_job._process_executor_events(session=session)
+        ti1.refresh_from_db(session=session)
+        assert ti1.state == State.QUEUED
+        self.scheduler_job.executor.callback_sink.send.assert_not_called()
+        mock_stats_incr.assert_not_called()
+
     def test_execute_task_instances_is_paused_wont_execute(self, session, 
dag_maker):
         dag_id = 
'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
         task_id_1 = 'dummy_task'

Reply via email to