This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new deececc  Fail tasks in scheduler when executor reports they failed 
(#15929)
deececc is described below

commit deececcabc080844ca89272a2e4ab1183cd51e3f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu May 20 11:22:01 2021 +0100

    Fail tasks in scheduler when executor reports they failed (#15929)
    
    When a task fails in executor while still queued in scheduler, the executor 
reports
    this failure but scheduler doesn't change the task state resulting in the 
task
    being queued until the scheduler is restarted. This commit fixes it by 
ensuring
    that when a task is reported to have failed in the executor, the task is 
failed
    in scheduler
---
 airflow/jobs/scheduler_job.py    | 4 +++-
 tests/jobs/test_scheduler_job.py | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e86a6e7..0a738c0 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1239,12 +1239,14 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
                     "task says its %s. (Info: %s) Was the task killed 
externally?"
                 )
                 self.log.error(msg, ti, state, ti.state, info)
+
                 request = TaskCallbackRequest(
                     full_filepath=ti.dag_model.fileloc,
                     simple_task_instance=SimpleTaskInstance(ti),
                     msg=msg % (ti, state, ti.state, info),
                 )
-
+                self.log.info('Setting task instance %s state to %s as 
reported by executor', ti, state)
+                ti.set_state(state)
                 self.processor_agent.send_callback_to_execute(request)
 
         return len(event_buffer)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 954b395..adb3855 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -907,7 +907,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.QUEUED
+        assert ti1.state == State.FAILED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,

Reply via email to