This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66ffe39b0b Do not fail requeued TIs (#23846)
66ffe39b0b is described below
commit 66ffe39b0b3ae233aeb80e77eea1b2b867cc8c45
Author: Tanel Kiis <[email protected]>
AuthorDate: Wed Jun 29 01:14:09 2022 +0300
Do not fail requeued TIs (#23846)
---
airflow/jobs/scheduler_job.py | 16 +++++++++--
tests/jobs/test_scheduler_job.py | 62 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 22ba5decb4..9bcb0c9176 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -628,7 +628,6 @@ class SchedulerJob(BaseJob):
buffer_key = ti.key.with_try_number(try_number)
state, info = event_buffer.pop(buffer_key)
- # TODO: should we fail RUNNING as well, as we do in Backfills?
if state == TaskInstanceState.QUEUED:
ti.external_executor_id = info
self.log.info("Setting external_id for %s to %s", ti, info)
@@ -664,7 +663,20 @@ class SchedulerJob(BaseJob):
ti.pid,
)
- if ti.try_number == buffer_key.try_number and ti.state ==
State.QUEUED:
+ # There are two scenarios why the same TI with the same try_number
is queued
+ # after executor is finished with it:
+ # 1) the TI was killed externally and it had no time to mark
itself failed
+ # - in this case we should mark it as failed here.
+ # 2) the TI has been requeued after getting deferred - in this
case either our executor has it
+ # or the TI is queued by another job. Either ways we should not
fail it.
+
+ # All of this could also happen if the state is "running",
+ # but that is handled by the zombie detection.
+
+ ti_queued = ti.try_number == buffer_key.try_number and ti.state ==
TaskInstanceState.QUEUED
+ ti_requeued = ti.queued_by_job_id != self.id or
self.executor.has_task(ti)
+
+ if ti_queued and not ti_requeued:
Stats.incr('scheduler.tasks.killed_externally')
msg = (
"Executor reports task instance %s finished (%s) although
the "
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6e0135ade7..db6df7dfeb 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -381,6 +381,68 @@ class TestSchedulerJob:
ti1.refresh_from_db()
assert ti1.state == State.FAILED
+ @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+ @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+ def test_process_executor_events_ti_requeued(self, mock_stats_incr,
mock_task_callback, dag_maker):
+ dag_id = "test_process_executor_events_ti_requeued"
+ task_id_1 = 'dummy_task'
+
+ session = settings.Session()
+ with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
+ task1 = EmptyOperator(task_id=task_id_1)
+ ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+
+ mock_stats_incr.reset_mock()
+
+ executor = MockExecutor(do_update=False)
+ task_callback = mock.MagicMock()
+ mock_task_callback.return_value = task_callback
+ self.scheduler_job = SchedulerJob(executor=executor)
+ self.scheduler_job.id = 1
+ self.scheduler_job.processor_agent = mock.MagicMock()
+
+ # ti is queued with another try number - do not fail it
+ ti1.state = State.QUEUED
+ ti1.queued_by_job_id = 1
+ ti1.try_number = 2
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None
+
+ self.scheduler_job._process_executor_events(session=session)
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.QUEUED
+ self.scheduler_job.executor.callback_sink.send.assert_not_called()
+
+ # ti is queued by another scheduler - do not fail it
+ ti1.state = State.QUEUED
+ ti1.queued_by_job_id = 2
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.SUCCESS, None
+
+ self.scheduler_job._process_executor_events(session=session)
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.QUEUED
+ self.scheduler_job.executor.callback_sink.send.assert_not_called()
+
+ # ti is queued by this scheduler but it is handed back to the executor
- do not fail it
+ ti1.state = State.QUEUED
+ ti1.queued_by_job_id = 1
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.SUCCESS, None
+ executor.has_task = mock.MagicMock(return_value=True)
+
+ self.scheduler_job._process_executor_events(session=session)
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.QUEUED
+ self.scheduler_job.executor.callback_sink.send.assert_not_called()
+ mock_stats_incr.assert_not_called()
+
def test_execute_task_instances_is_paused_wont_execute(self, session,
dag_maker):
dag_id =
'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
task_id_1 = 'dummy_task'