This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new deececc Fail tasks in scheduler when executor reports they failed
(#15929)
deececc is described below
commit deececcabc080844ca89272a2e4ab1183cd51e3f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu May 20 11:22:01 2021 +0100
Fail tasks in scheduler when executor reports they failed (#15929)
When a task fails in executor while still queued in scheduler, the executor
reports
this failure but scheduler doesn't change the task state resulting in the
task
being queued until the scheduler is restarted. This commit fixes it by
ensuring
that when a task is reported to have failed in the executor, the task is
failed
in scheduler
---
airflow/jobs/scheduler_job.py | 4 +++-
tests/jobs/test_scheduler_job.py | 2 +-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e86a6e7..0a738c0 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1239,12 +1239,14 @@ class SchedulerJob(BaseJob): # pylint:
disable=too-many-instance-attributes
"task says its %s. (Info: %s) Was the task killed
externally?"
)
self.log.error(msg, ti, state, ti.state, info)
+
request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance(ti),
msg=msg % (ti, state, ti.state, info),
)
-
+ self.log.info('Setting task instance %s state to %s as
reported by executor', ti, state)
+ ti.set_state(state)
self.processor_agent.send_callback_to_execute(request)
return len(event_buffer)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 954b395..adb3855 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -907,7 +907,7 @@ class TestSchedulerJob(unittest.TestCase):
self.scheduler_job._process_executor_events(session=session)
ti1.refresh_from_db()
- assert ti1.state == State.QUEUED
+ assert ti1.state == State.FAILED
mock_task_callback.assert_called_once_with(
full_filepath='/test_path1/',
simple_task_instance=mock.ANY,