This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to tag v2.3.3+astro.2 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit dfb4e81e33e77efd92727b0ef7db592845cc6354 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Fri Aug 5 18:36:24 2022 +0100 Ensure that zombie tasks for dags with errors get cleaned up (#25550) If there is a parse error in a DAG the zombie cleanup request never ran, which resulted in the TI never leaving running state and just continually being detected as a zombie. (Prior to AIP-45 landing, this bug/behaviour resulted in a DAG with a parse error never actually leaving the queued state.) The fix here is to _always_ make sure we run `ti.handle_failure` when we are given a request, even if we can't load the DAG. To _try_ and work as well as we can, we try to load the serialized_dag if we can, but in cases where we can't for whatever reason we also make sure TaskInstance.handle_failure is able to operate even when `self.task` is None. (cherry picked from commit 1d8507af07353e5cf29a860314b5ba5caad5cdf3) --- airflow/dag_processing/processor.py | 93 ++++++++++++++++++++++++++++------ airflow/models/log.py | 3 +- airflow/models/taskinstance.py | 8 ++- tests/conftest.py | 2 +- tests/dag_processing/test_processor.py | 41 +++++++++++++-- tests/models/test_taskinstance.py | 33 +++++++++++- 6 files changed, 157 insertions(+), 23 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 469b55cfeb..df22371e6f 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -25,10 +25,10 @@ import time from contextlib import redirect_stderr, redirect_stdout, suppress from datetime import timedelta from multiprocessing.connection import Connection as MultiprocessingConnection -from typing import Iterator, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Iterator, List, Optional, Set, Tuple from setproctitle import setproctitle -from sqlalchemy import func, or_ +from sqlalchemy import exc, func, or_ from sqlalchemy.orm.session import Session from airflow import models, settings @@ -51,6 +51,9 @@ from airflow.utils.mixins import MultiprocessingStartMethodMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import State +if TYPE_CHECKING: + from airflow.models.operator import Operator + DR = models.DagRun TI = models.TaskInstance @@ -575,7 +578,7 @@ class DagFileProcessor(LoggingMixin): self.log.debug("Processing Callback Request: %s", request) try: if isinstance(request, TaskCallbackRequest): - self._execute_task_callbacks(dagbag, request) + self._execute_task_callbacks(dagbag, request, session=session) elif isinstance(request, SlaCallbackRequest): self.manage_slas(dagbag.get_dag(request.dag_id), session=session) elif isinstance(request, DagCallbackRequest): @@ -587,7 +590,27 @@ class DagFileProcessor(LoggingMixin): request.full_filepath, ) - session.commit() + session.flush() + + def execute_callbacks_without_dag( + self, callback_requests: List[CallbackRequest], session: Session + ) -> None: + """ + Execute what callbacks we can as "best effort" when the dag cannot be found/had parse errors. + + This is so important so that tasks that failed when there is a parse + error don't get stuck in queued state. + """ + for request in callback_requests: + self.log.debug("Processing Callback Request: %s", request) + if isinstance(request, TaskCallbackRequest): + self._execute_task_callbacks(None, request, session) + else: + self.log.info( + "Not executing %s callback for file %s as there was a dag parse error", + request.__class__.__name__, + request.full_filepath, + ) @provide_session def _execute_dag_callbacks(self, dagbag: DagBag, request: DagCallbackRequest, session: Session): @@ -597,18 +620,51 @@ class DagFileProcessor(LoggingMixin): dagrun=dag_run, success=not request.is_failure_callback, reason=request.msg, session=session ) - def _execute_task_callbacks(self, dagbag: DagBag, request: TaskCallbackRequest): + def _execute_task_callbacks( + self, dagbag: Optional[DagBag], request: TaskCallbackRequest, session: Session + ): + if not request.is_failure_callback: + return + simple_ti = request.simple_task_instance - if simple_ti.dag_id in dagbag.dags: + ti: Optional[TI] = ( + session.query(TI) + .filter_by( + dag_id=simple_ti.dag_id, + run_id=simple_ti.run_id, + task_id=simple_ti.task_id, + map_index=simple_ti.map_index, + ) + .one_or_none() + ) + if not ti: + return + + task: Optional["Operator"] = None + + if dagbag and simple_ti.dag_id in dagbag.dags: dag = dagbag.dags[simple_ti.dag_id] if simple_ti.task_id in dag.task_ids: task = dag.get_task(simple_ti.task_id) - if request.is_failure_callback: - ti = TI(task, run_id=simple_ti.run_id, map_index=simple_ti.map_index) - # TODO: Use simple_ti to improve performance here in the future - ti.refresh_from_db() - ti.handle_failure_with_callback(error=request.msg, test_mode=self.UNIT_TEST_MODE) - self.log.info('Executed failure callback for %s in state %s', ti, ti.state) + else: + # We don't have the _real_ dag here (perhaps it had a parse error?) but we still want to run + # `handle_failure` so that the state of the TI gets progressed. + # + # Since handle_failure _really_ wants a task, we do our best effort to give it one + from airflow.models.serialized_dag import SerializedDagModel + + try: + model = session.query(SerializedDagModel).get(simple_ti.dag_id) + if model: + task = model.dag.get_task(simple_ti.task_id) + except (exc.NoResultFound, TaskNotFound): + pass + if task: + ti.refresh_from_task(task) + + ti.handle_failure_with_callback(error=request.msg, test_mode=self.UNIT_TEST_MODE, session=session) + self.log.info('Executed failure callback for %s in state %s', ti, ti.state) + session.flush() @provide_session def process_file( @@ -616,7 +672,7 @@ class DagFileProcessor(LoggingMixin): file_path: str, callback_requests: List[CallbackRequest], pickle_dags: bool = False, - session: Session = None, + session: Session = NEW_SESSION, ) -> Tuple[int, int]: """ Process a Python file containing Airflow DAGs. @@ -652,12 +708,19 @@ class DagFileProcessor(LoggingMixin): else: self.log.warning("No viable dags retrieved from %s", file_path) self.update_import_errors(session, dagbag) + if callback_requests: + # If there were callback requests for this file but there was a + # parse error we still need to progress the state of TIs, + # otherwise they might be stuck in queued/running for ever! + self.execute_callbacks_without_dag(callback_requests, session) return 0, len(dagbag.import_errors) - self.execute_callbacks(dagbag, callback_requests) + self.execute_callbacks(dagbag, callback_requests, session) + session.commit() # Save individual DAGs in the ORM - dagbag.sync_to_db() + dagbag.sync_to_db(session) + session.commit() if pickle_dags: paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids) diff --git a/airflow/models/log.py b/airflow/models/log.py index b2a5639dcd..4633dd3785 100644 --- a/airflow/models/log.py +++ b/airflow/models/log.py @@ -55,7 +55,8 @@ class Log(Base): self.task_id = task_instance.task_id self.execution_date = task_instance.execution_date self.map_index = task_instance.map_index - task_owner = task_instance.task.owner + if task_instance.task: + task_owner = task_instance.task.owner if 'task_id' in kwargs: self.task_id = kwargs['task_id'] diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index debd0aa6b0..72efd4c8db 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1919,7 +1919,7 @@ class TaskInstance(Base, LoggingMixin): self.end_date = timezone.utcnow() self.set_duration() - Stats.incr(f'operator_failures_{self.task.task_type}') + Stats.incr(f'operator_failures_{self.operator}') Stats.incr('ti_failures') if not test_mode: session.add(Log(State.FAILED, self)) @@ -1943,7 +1943,8 @@ class TaskInstance(Base, LoggingMixin): task = None try: - task = self.task.unmap() + if self.task: + task = self.task.unmap() except Exception: self.log.error("Unable to unmap task, can't determine if we need to send an alert email or not") @@ -1985,6 +1986,9 @@ class TaskInstance(Base, LoggingMixin): # If a task is cleared when running, it goes into RESTARTING state and is always # eligible for retry return True + if not self.task: + # Couldn't load the task, don't know number of retries, guess: + return self.try_number <= self.max_tries return self.task.retries and self.try_number <= self.max_tries diff --git a/tests/conftest.py b/tests/conftest.py index b153c213d5..8447ba240f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -611,7 +611,7 @@ def dag_maker(request): if not dag_ids: return # To isolate problems here with problems from elsewhere on the session object - self.session.flush() + self.session.rollback() self.session.query(SerializedDagModel).filter( SerializedDagModel.dag_id.in_(dag_ids) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 1c0b164be8..2bad13e5f6 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -30,6 +30,7 @@ from airflow.configuration import TEST_DAGS_FOLDER, conf from airflow.dag_processing.manager import DagFileProcessorAgent from airflow.dag_processing.processor import DagFileProcessor from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors +from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance from airflow.operators.empty import EmptyOperator from airflow.utils import timezone @@ -386,10 +387,44 @@ class TestDagFileProcessor: full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message" ) ] - dag_file_processor.execute_callbacks(dagbag, requests) + dag_file_processor.execute_callbacks(dagbag, requests, session) + mock_ti_handle_failure.assert_called_once_with( + error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session + ) + + @pytest.mark.parametrize( + ["has_serialized_dag"], + [pytest.param(True, id="dag_in_db"), pytest.param(False, id="no_dag_found")], + ) + @patch.object(TaskInstance, 'handle_failure') + def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, has_serialized_dag): + dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + with create_session() as session: + session.query(TaskInstance).delete() + dag = dagbag.get_dag('example_branch_operator') + dagrun = dag.create_dagrun( + state=State.RUNNING, + execution_date=DEFAULT_DATE, + run_type=DagRunType.SCHEDULED, + session=session, + ) + task = dag.get_task(task_id='run_this_first') + ti = TaskInstance(task, run_id=dagrun.run_id, state=State.QUEUED) + session.add(ti) + + if has_serialized_dag: + assert SerializedDagModel.write_dag(dag, session=session) is True + session.flush() + + requests = [ + TaskCallbackRequest( + full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message" + ) + ] + dag_file_processor.execute_callbacks_without_dag(requests, session) mock_ti_handle_failure.assert_called_once_with( - error="Message", - test_mode=conf.getboolean('core', 'unit_test_mode'), + error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session ) def test_failure_callbacks_should_not_drop_hostname(self): diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 3990c3cbf5..b1385c1179 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1915,7 +1915,7 @@ class TestTaskInstance: ti = TI(task=task, run_id=dr.run_id) ti.state = State.QUEUED session.merge(ti) - session.commit() + session.flush() assert ti.state == State.QUEUED assert ti.try_number == 1 ti.handle_failure("test queued ti", test_mode=True) @@ -1925,6 +1925,37 @@ class TestTaskInstance: # Check 'ti.try_number' is bumped to 2. This is try_number for next run assert ti.try_number == 2 + @patch.object(Stats, 'incr') + def test_handle_failure_no_task(self, Stats_incr, dag_maker): + """ + When a zombie is detected for a DAG with a parse error, we need to be able to run handle_failure + _without_ ti.task being set + """ + session = settings.Session() + with dag_maker(): + task = EmptyOperator(task_id="mytask", retries=1) + dr = dag_maker.create_dagrun() + ti = TI(task=task, run_id=dr.run_id) + ti = session.merge(ti) + ti.task = None + ti.state = State.QUEUED + session.flush() + + assert ti.task is None, "Check critical pre-condition" + + assert ti.state == State.QUEUED + assert ti.try_number == 1 + + ti.handle_failure("test queued ti", test_mode=False) + assert ti.state == State.UP_FOR_RETRY + # Assert that 'ti._try_number' is bumped from 0 to 1. This is the last/current try + assert ti._try_number == 1 + # Check 'ti.try_number' is bumped to 2. This is try_number for next run + assert ti.try_number == 2 + + Stats_incr.assert_any_call('ti_failures') + Stats_incr.assert_any_call('operator_failures_EmptyOperator') + def test_does_not_retry_on_airflow_fail_exception(self, dag_maker): def fail(): raise AirflowFailException("hopeless")
