This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 44f601e Properly handle ti state difference between executor and
scheduler (#17819)
44f601e is described below
commit 44f601e51cd7d369292728b7a7460c56528dbf27
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Sep 21 21:16:57 2021 +0100
Properly handle ti state difference between executor and scheduler (#17819)
When a task fails to start, the executor fails it and its state in
scheduler is queued while its state in executor is failed. Currently
we fail this task without retries to avoid getting stuck.
This PR changes this to only fail the task if the callback cannot be
executed. This ensures the task does not get stuck
closes: #16625
Co-authored-by: Kaxil Naik <[email protected]>
---
airflow/jobs/scheduler_job.py | 26 ++++--
airflow/models/taskinstance.py | 6 ++
tests/jobs/test_scheduler_job.py | 163 ++++++++++++++++++++++++++++++--------
tests/models/test_taskinstance.py | 18 +++++
4 files changed, 174 insertions(+), 39 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8ca9aea..a0dd254 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -521,14 +521,24 @@ class SchedulerJob(BaseJob):
)
self.log.error(msg, ti, state, ti.state, info)
- request = TaskCallbackRequest(
- full_filepath=ti.dag_model.fileloc,
- simple_task_instance=SimpleTaskInstance(ti),
- msg=msg % (ti, state, ti.state, info),
- )
- self.log.info('Setting task instance %s state to %s as
reported by executor', ti, state)
- ti.set_state(state)
- self.processor_agent.send_callback_to_execute(request)
+ # Get task from the Serialized DAG
+ try:
+ dag = self.dagbag.get_dag(ti.dag_id)
+ task = dag.get_task(ti.task_id)
+ except Exception:
+ self.log.exception("Marking task instance %s as %s", ti,
state)
+ ti.set_state(state)
+ continue
+ ti.task = task
+ if task.on_retry_callback or task.on_failure_callback:
+ request = TaskCallbackRequest(
+ full_filepath=ti.dag_model.fileloc,
+ simple_task_instance=SimpleTaskInstance(ti),
+ msg=msg % (ti, state, ti.state, info),
+ )
+ self.processor_agent.send_callback_to_execute(request)
+ else:
+ ti.handle_failure(error=msg % (ti, state, ti.state, info),
session=session)
return len(event_buffer)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 9dca1f3..c60817b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1693,6 +1693,9 @@ class TaskInstance(Base, LoggingMixin):
Stats.incr(f'operator_failures_{task.task_type}', 1, 1)
Stats.incr('ti_failures')
if not test_mode:
+ # This is needed as dag_run is lazily loaded. Without it,
sqlalchemy errors with
+ # DetachedInstanceError error.
+ self.dag_run = self.get_dagrun(session=session)
session.add(Log(State.FAILED, self))
# Log failure duration
@@ -1719,6 +1722,9 @@ class TaskInstance(Base, LoggingMixin):
self.state = State.FAILED
email_for_state = task.email_on_failure
else:
+ if self.state == State.QUEUED:
+ # We increase the try_number so as to fail the task if it
fails to start after sometime
+ self._try_number += 1
self.state = State.UP_FOR_RETRY
email_for_state = task.email_on_retry
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d2779c6..fc65c25 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -185,6 +185,7 @@ class TestSchedulerJob:
dag_id = "test_process_executor_events"
task_id_1 = 'dummy_task'
+ session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
task1 = DummyOperator(task_id=task_id_1)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
@@ -196,9 +197,64 @@ class TestSchedulerJob:
mock_task_callback.return_value = task_callback
self.scheduler_job = SchedulerJob(executor=executor)
self.scheduler_job.processor_agent = mock.MagicMock()
+ ti1.state = State.QUEUED
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.FAILED, None
+
+ self.scheduler_job._process_executor_events(session=session)
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.FAILED
+
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+ self.scheduler_job.processor_agent.reset_mock()
+
+ # ti in success state
+ ti1.state = State.SUCCESS
+ session.merge(ti1)
+ session.commit()
+ executor.event_buffer[ti1.key] = State.SUCCESS, None
+
+ self.scheduler_job._process_executor_events(session=session)
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.SUCCESS
+
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+ mock_stats_incr.assert_has_calls(
+ [
+ mock.call('scheduler.tasks.killed_externally'),
+ mock.call('operator_failures_DummyOperator', 1, 1),
+ mock.call('ti_failures'),
+ ],
+ any_order=True,
+ )
+
+ @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+ @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+ def test_process_executor_events_with_no_callback(self, mock_stats_incr,
mock_task_callback, dag_maker):
+ dag_id = "test_process_executor_events_with_no_callback"
+ task_id_1 = 'dummy_task'
+
+ mock_stats_incr.reset_mock()
+ executor = MockExecutor(do_update=False)
+ task_callback = mock.MagicMock()
+ mock_task_callback.return_value = task_callback
+ self.scheduler_job = SchedulerJob(executor=executor)
+ self.scheduler_job.processor_agent = mock.MagicMock()
session = settings.Session()
+ with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
+ task1 = DummyOperator(task_id=task_id_1, retries=1)
+ ti1 = dag_maker.create_dagrun(
+ run_id='dr2', execution_date=DEFAULT_DATE + timedelta(hours=1)
+ ).get_task_instance(task1.task_id)
+ mock_stats_incr.reset_mock()
+
+ executor = MockExecutor(do_update=False)
+ task_callback = mock.MagicMock()
+ mock_task_callback.return_value = task_callback
+ self.scheduler_job = SchedulerJob(executor=executor)
+ self.scheduler_job.processor_agent = mock.MagicMock()
ti1.state = State.QUEUED
session.merge(ti1)
session.commit()
@@ -206,17 +262,9 @@ class TestSchedulerJob:
executor.event_buffer[ti1.key] = State.FAILED, None
self.scheduler_job._process_executor_events(session=session)
- ti1.refresh_from_db()
- assert ti1.state == State.FAILED
- mock_task_callback.assert_called_once_with(
- full_filepath='/test_path1/',
- simple_task_instance=mock.ANY,
- msg='Executor reports task instance '
- '<TaskInstance: test_process_executor_events.dummy_task test
[queued]> '
- 'finished (failed) although the task says its queued. (Info: None)
'
- 'Was the task killed externally?',
- )
-
self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(task_callback)
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.UP_FOR_RETRY
+
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
self.scheduler_job.processor_agent.reset_mock()
# ti in success state
@@ -226,36 +274,89 @@ class TestSchedulerJob:
executor.event_buffer[ti1.key] = State.SUCCESS, None
self.scheduler_job._process_executor_events(session=session)
- ti1.refresh_from_db()
+ ti1.refresh_from_db(session=session)
assert ti1.state == State.SUCCESS
self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+ mock_stats_incr.assert_has_calls(
+ [
+ mock.call('scheduler.tasks.killed_externally'),
+ mock.call('operator_failures_DummyOperator', 1, 1),
+ mock.call('ti_failures'),
+ ],
+ any_order=True,
+ )
+
+ @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+ @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+ def test_process_executor_events_with_callback(self, mock_stats_incr,
mock_task_callback, dag_maker):
+ dag_id = "test_process_executor_events_with_callback"
+ task_id_1 = 'dummy_task'
+
+ with dag_maker(dag_id=dag_id, fileloc='/test_path1/') as dag:
+ task1 = DummyOperator(task_id=task_id_1,
on_failure_callback=lambda x: print("hi"))
+ ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+
+ mock_stats_incr.reset_mock()
+
+ executor = MockExecutor(do_update=False)
+ task_callback = mock.MagicMock()
+ mock_task_callback.return_value = task_callback
+ self.scheduler_job = SchedulerJob(executor=executor)
+ self.scheduler_job.processor_agent = mock.MagicMock()
+ session = settings.Session()
+
+ ti1.state = State.QUEUED
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.FAILED, None
+
+ self.scheduler_job._process_executor_events(session=session)
+ ti1.refresh_from_db()
+ # The state will remain in queued here and
+ # will be set to failed in dag parsing process
+ assert ti1.state == State.QUEUED
+ mock_task_callback.assert_called_once_with(
+ full_filepath=dag.fileloc,
+ simple_task_instance=mock.ANY,
+ msg='Executor reports task instance '
+ '<TaskInstance:
test_process_executor_events_with_callback.dummy_task test [queued]> '
+ 'finished (failed) although the task says its queued. (Info: None)
'
+ 'Was the task killed externally?',
+ )
+
self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(task_callback)
+ self.scheduler_job.processor_agent.reset_mock()
mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally')
- def test_process_executor_events_uses_inmemory_try_number(self, dag_maker):
- dag_id = "dag_id"
- task_id = "task_id"
- try_number = 42
+ @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
+ @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+ def test_process_executor_event_missing_dag(self, mock_stats_incr,
mock_task_callback, dag_maker, caplog):
+ dag_id = "test_process_executor_events_with_callback"
+ task_id_1 = 'dummy_task'
- with dag_maker(dag_id=dag_id):
- DummyOperator(task_id=task_id)
+ with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
+ task1 = DummyOperator(task_id=task_id_1,
on_failure_callback=lambda x: print("hi"))
+ ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
- dr = dag_maker.create_dagrun()
+ mock_stats_incr.reset_mock()
- executor = MagicMock()
+ executor = MockExecutor(do_update=False)
+ task_callback = mock.MagicMock()
+ mock_task_callback.return_value = task_callback
self.scheduler_job = SchedulerJob(executor=executor)
- self.scheduler_job.processor_agent = MagicMock()
- event_buffer = {TaskInstanceKey(dag_id, task_id, dr.run_id,
try_number): (State.SUCCESS, None)}
- executor.get_event_buffer.return_value = event_buffer
+ self.scheduler_job.dagbag = mock.MagicMock()
+ self.scheduler_job.dagbag.get_dag.side_effect = Exception('failed')
+ self.scheduler_job.processor_agent = mock.MagicMock()
+ session = settings.Session()
- with create_session() as session:
- ti = dr.task_instances[0]
- ti.state = State.SUCCESS
- session.merge(ti)
+ ti1.state = State.QUEUED
+ session.merge(ti1)
+ session.commit()
- self.scheduler_job._process_executor_events()
- # Assert that the even_buffer is empty so the task was popped using
right
- # task instance key
- assert event_buffer == {}
+ executor.event_buffer[ti1.key] = State.FAILED, None
+ self.scheduler_job._process_executor_events(session=session)
+ ti1.refresh_from_db()
+ assert ti1.state == State.FAILED
def test_execute_task_instances_is_paused_wont_execute(self, session,
dag_maker):
dag_id =
'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 28afaa9..7cf01d1 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1683,6 +1683,24 @@ class TestTaskInstance:
assert context_arg_3 and "task_instance" in context_arg_3
mock_on_retry_3.assert_not_called()
+ def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+ session = settings.Session()
+ with dag_maker():
+ task = DummyOperator(task_id="mytask", retries=1)
+ dr = dag_maker.create_dagrun()
+ ti = TI(task=task, run_id=dr.run_id)
+ ti.state = State.QUEUED
+ session.merge(ti)
+ session.commit()
+ assert ti.state == State.QUEUED
+ assert ti.try_number == 1
+ ti.handle_failure("test queued ti", test_mode=True)
+ assert ti.state == State.UP_FOR_RETRY
+ # Assert that 'ti._try_number' is bumped from 0 to 1. This is the
last/current try
+ assert ti._try_number == 1
+ # Check 'ti.try_number' is bumped to 2. This is try_number for next run
+ assert ti.try_number == 2
+
def test_does_not_retry_on_airflow_fail_exception(self, dag_maker):
def fail():
raise AirflowFailException("hopeless")