This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 44f601e  Properly handle ti state difference between executor and 
scheduler (#17819)
44f601e is described below

commit 44f601e51cd7d369292728b7a7460c56528dbf27
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Sep 21 21:16:57 2021 +0100

    Properly handle ti state difference between executor and scheduler (#17819)
    
    When a task fails to start, the executor fails it and its state in
    scheduler is queued while its state in executor is failed. Currently
    we fail this task without retries to avoid getting stuck.
    
    This PR changes this to only fail the task if the callback cannot be
    executed. This ensures the task does not get stuck
    
    closes: #16625
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 airflow/jobs/scheduler_job.py     |  26 ++++--
 airflow/models/taskinstance.py    |   6 ++
 tests/jobs/test_scheduler_job.py  | 163 ++++++++++++++++++++++++++++++--------
 tests/models/test_taskinstance.py |  18 +++++
 4 files changed, 174 insertions(+), 39 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8ca9aea..a0dd254 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -521,14 +521,24 @@ class SchedulerJob(BaseJob):
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
-                request = TaskCallbackRequest(
-                    full_filepath=ti.dag_model.fileloc,
-                    simple_task_instance=SimpleTaskInstance(ti),
-                    msg=msg % (ti, state, ti.state, info),
-                )
-                self.log.info('Setting task instance %s state to %s as 
reported by executor', ti, state)
-                ti.set_state(state)
-                self.processor_agent.send_callback_to_execute(request)
+                # Get task from the Serialized DAG
+                try:
+                    dag = self.dagbag.get_dag(ti.dag_id)
+                    task = dag.get_task(ti.task_id)
+                except Exception:
+                    self.log.exception("Marking task instance %s as %s", ti, 
state)
+                    ti.set_state(state)
+                    continue
+                ti.task = task
+                if task.on_retry_callback or task.on_failure_callback:
+                    request = TaskCallbackRequest(
+                        full_filepath=ti.dag_model.fileloc,
+                        simple_task_instance=SimpleTaskInstance(ti),
+                        msg=msg % (ti, state, ti.state, info),
+                    )
+                    self.processor_agent.send_callback_to_execute(request)
+                else:
+                    ti.handle_failure(error=msg % (ti, state, ti.state, info), 
session=session)
 
         return len(event_buffer)
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 9dca1f3..c60817b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1693,6 +1693,9 @@ class TaskInstance(Base, LoggingMixin):
         Stats.incr(f'operator_failures_{task.task_type}', 1, 1)
         Stats.incr('ti_failures')
         if not test_mode:
+            # This is needed as dag_run is lazily loaded. Without it, 
sqlalchemy errors with
+            # DetachedInstanceError error.
+            self.dag_run = self.get_dagrun(session=session)
             session.add(Log(State.FAILED, self))
 
             # Log failure duration
@@ -1719,6 +1722,9 @@ class TaskInstance(Base, LoggingMixin):
             self.state = State.FAILED
             email_for_state = task.email_on_failure
         else:
+            if self.state == State.QUEUED:
+                # We increase the try_number so as to fail the task if it 
fails to start after sometime
+                self._try_number += 1
             self.state = State.UP_FOR_RETRY
             email_for_state = task.email_on_retry
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d2779c6..fc65c25 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -185,6 +185,7 @@ class TestSchedulerJob:
         dag_id = "test_process_executor_events"
         task_id_1 = 'dummy_task'
 
+        session = settings.Session()
         with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
             task1 = DummyOperator(task_id=task_id_1)
         ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
@@ -196,9 +197,64 @@ class TestSchedulerJob:
         mock_task_callback.return_value = task_callback
         self.scheduler_job = SchedulerJob(executor=executor)
         self.scheduler_job.processor_agent = mock.MagicMock()
+        ti1.state = State.QUEUED
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key] = State.FAILED, None
+
+        self.scheduler_job._process_executor_events(session=session)
+        ti1.refresh_from_db(session=session)
+        assert ti1.state == State.FAILED
+        
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+        self.scheduler_job.processor_agent.reset_mock()
+
+        # ti in success state
+        ti1.state = State.SUCCESS
+        session.merge(ti1)
+        session.commit()
+        executor.event_buffer[ti1.key] = State.SUCCESS, None
+
+        self.scheduler_job._process_executor_events(session=session)
+        ti1.refresh_from_db(session=session)
+        assert ti1.state == State.SUCCESS
+        
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+        mock_stats_incr.assert_has_calls(
+            [
+                mock.call('scheduler.tasks.killed_externally'),
+                mock.call('operator_failures_DummyOperator', 1, 1),
+                mock.call('ti_failures'),
+            ],
+            any_order=True,
+        )
+
+    @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_process_executor_events_with_no_callback(self, mock_stats_incr, 
mock_task_callback, dag_maker):
+        dag_id = "test_process_executor_events_with_no_callback"
+        task_id_1 = 'dummy_task'
+
+        mock_stats_incr.reset_mock()
+        executor = MockExecutor(do_update=False)
+        task_callback = mock.MagicMock()
+        mock_task_callback.return_value = task_callback
+        self.scheduler_job = SchedulerJob(executor=executor)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
         session = settings.Session()
+        with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
+            task1 = DummyOperator(task_id=task_id_1, retries=1)
+        ti1 = dag_maker.create_dagrun(
+            run_id='dr2', execution_date=DEFAULT_DATE + timedelta(hours=1)
+        ).get_task_instance(task1.task_id)
 
+        mock_stats_incr.reset_mock()
+
+        executor = MockExecutor(do_update=False)
+        task_callback = mock.MagicMock()
+        mock_task_callback.return_value = task_callback
+        self.scheduler_job = SchedulerJob(executor=executor)
+        self.scheduler_job.processor_agent = mock.MagicMock()
         ti1.state = State.QUEUED
         session.merge(ti1)
         session.commit()
@@ -206,17 +262,9 @@ class TestSchedulerJob:
         executor.event_buffer[ti1.key] = State.FAILED, None
 
         self.scheduler_job._process_executor_events(session=session)
-        ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
-        mock_task_callback.assert_called_once_with(
-            full_filepath='/test_path1/',
-            simple_task_instance=mock.ANY,
-            msg='Executor reports task instance '
-            '<TaskInstance: test_process_executor_events.dummy_task test 
[queued]> '
-            'finished (failed) although the task says its queued. (Info: None) 
'
-            'Was the task killed externally?',
-        )
-        
self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(task_callback)
+        ti1.refresh_from_db(session=session)
+        assert ti1.state == State.UP_FOR_RETRY
+        
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
         self.scheduler_job.processor_agent.reset_mock()
 
         # ti in success state
@@ -226,36 +274,89 @@ class TestSchedulerJob:
         executor.event_buffer[ti1.key] = State.SUCCESS, None
 
         self.scheduler_job._process_executor_events(session=session)
-        ti1.refresh_from_db()
+        ti1.refresh_from_db(session=session)
         assert ti1.state == State.SUCCESS
         
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+        mock_stats_incr.assert_has_calls(
+            [
+                mock.call('scheduler.tasks.killed_externally'),
+                mock.call('operator_failures_DummyOperator', 1, 1),
+                mock.call('ti_failures'),
+            ],
+            any_order=True,
+        )
+
+    @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_process_executor_events_with_callback(self, mock_stats_incr, 
mock_task_callback, dag_maker):
+        dag_id = "test_process_executor_events_with_callback"
+        task_id_1 = 'dummy_task'
+
+        with dag_maker(dag_id=dag_id, fileloc='/test_path1/') as dag:
+            task1 = DummyOperator(task_id=task_id_1, 
on_failure_callback=lambda x: print("hi"))
+        ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+
+        mock_stats_incr.reset_mock()
+
+        executor = MockExecutor(do_update=False)
+        task_callback = mock.MagicMock()
+        mock_task_callback.return_value = task_callback
+        self.scheduler_job = SchedulerJob(executor=executor)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        session = settings.Session()
+
+        ti1.state = State.QUEUED
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key] = State.FAILED, None
+
+        self.scheduler_job._process_executor_events(session=session)
+        ti1.refresh_from_db()
+        # The state will remain in queued here and
+        # will be set to failed in dag parsing process
+        assert ti1.state == State.QUEUED
+        mock_task_callback.assert_called_once_with(
+            full_filepath=dag.fileloc,
+            simple_task_instance=mock.ANY,
+            msg='Executor reports task instance '
+            '<TaskInstance: 
test_process_executor_events_with_callback.dummy_task test [queued]> '
+            'finished (failed) although the task says its queued. (Info: None) 
'
+            'Was the task killed externally?',
+        )
+        
self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(task_callback)
+        self.scheduler_job.processor_agent.reset_mock()
         
mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally')
 
-    def test_process_executor_events_uses_inmemory_try_number(self, dag_maker):
-        dag_id = "dag_id"
-        task_id = "task_id"
-        try_number = 42
+    @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_process_executor_event_missing_dag(self, mock_stats_incr, 
mock_task_callback, dag_maker, caplog):
+        dag_id = "test_process_executor_events_with_callback"
+        task_id_1 = 'dummy_task'
 
-        with dag_maker(dag_id=dag_id):
-            DummyOperator(task_id=task_id)
+        with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
+            task1 = DummyOperator(task_id=task_id_1, 
on_failure_callback=lambda x: print("hi"))
+        ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
 
-        dr = dag_maker.create_dagrun()
+        mock_stats_incr.reset_mock()
 
-        executor = MagicMock()
+        executor = MockExecutor(do_update=False)
+        task_callback = mock.MagicMock()
+        mock_task_callback.return_value = task_callback
         self.scheduler_job = SchedulerJob(executor=executor)
-        self.scheduler_job.processor_agent = MagicMock()
-        event_buffer = {TaskInstanceKey(dag_id, task_id, dr.run_id, 
try_number): (State.SUCCESS, None)}
-        executor.get_event_buffer.return_value = event_buffer
+        self.scheduler_job.dagbag = mock.MagicMock()
+        self.scheduler_job.dagbag.get_dag.side_effect = Exception('failed')
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        session = settings.Session()
 
-        with create_session() as session:
-            ti = dr.task_instances[0]
-            ti.state = State.SUCCESS
-            session.merge(ti)
+        ti1.state = State.QUEUED
+        session.merge(ti1)
+        session.commit()
 
-        self.scheduler_job._process_executor_events()
-        # Assert that the even_buffer is empty so the task was popped using 
right
-        # task instance key
-        assert event_buffer == {}
+        executor.event_buffer[ti1.key] = State.FAILED, None
+        self.scheduler_job._process_executor_events(session=session)
+        ti1.refresh_from_db()
+        assert ti1.state == State.FAILED
 
     def test_execute_task_instances_is_paused_wont_execute(self, session, 
dag_maker):
         dag_id = 
'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 28afaa9..7cf01d1 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1683,6 +1683,24 @@ class TestTaskInstance:
         assert context_arg_3 and "task_instance" in context_arg_3
         mock_on_retry_3.assert_not_called()
 
+    def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+        session = settings.Session()
+        with dag_maker():
+            task = DummyOperator(task_id="mytask", retries=1)
+        dr = dag_maker.create_dagrun()
+        ti = TI(task=task, run_id=dr.run_id)
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+        assert ti.state == State.QUEUED
+        assert ti.try_number == 1
+        ti.handle_failure("test queued ti", test_mode=True)
+        assert ti.state == State.UP_FOR_RETRY
+        # Assert that 'ti._try_number' is bumped from 0 to 1. This is the 
last/current try
+        assert ti._try_number == 1
+        # Check 'ti.try_number' is bumped to 2. This is try_number for next run
+        assert ti.try_number == 2
+
     def test_does_not_retry_on_airflow_fail_exception(self, dag_maker):
         def fail():
             raise AirflowFailException("hopeless")

Reply via email to