This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a4464c4bf18636c0cd0272e61ac95ece5c1fb64f Author: Ryan Hatter <[email protected]> AuthorDate: Sun Jun 30 17:12:08 2024 -0400 revamp some confusing log messages (#40334) (cherry picked from commit 34e7cab949dcabdad04d201d05de2cd1a6a24b29) --- airflow/jobs/backfill_job_runner.py | 7 +++- airflow/jobs/local_task_job_runner.py | 5 ++- airflow/jobs/scheduler_job_runner.py | 11 ++++-- airflow/task/task_runner/standard_task_runner.py | 13 ++++--- docs/apache-airflow/core-concepts/tasks.rst | 7 +++- docs/apache-airflow/index.rst | 1 + docs/apache-airflow/troubleshooting.rst | 48 ++++++++++++++++++++++++ tests/jobs/test_scheduler_job.py | 6 +-- 8 files changed, 81 insertions(+), 17 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 6be72b2c95..56a5d0763a 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -304,9 +304,12 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): and ti.state in self.STATES_COUNT_AS_RUNNING ): msg = ( - f"Executor reports task instance {ti} finished ({state}) although the task says its " - f"{ti.state}. Was the task killed externally? Info: {info}" + f"The executor reported that the task instance {ti} finished with state {state}, " + f"but the task instance's state attribute is {ti.state}. " + "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally" ) + if info is not None: + msg += f" Extra info: {info}" self.log.error(msg) ti.handle_failure(error=msg) continue diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index bb520825f2..b9fd1c81e3 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -237,7 +237,10 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.log.info("Task exited with return code %s (task deferral)", return_code) _set_task_deferred_context_var() else: - self.log.info("Task exited with return code %s", return_code) + message = f"Task exited with return code {return_code}" + if return_code == -signal.SIGKILL: + message += "For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed" + self.log.info(message) if not (self.task_instance.test_mode or is_deferral): if conf.getboolean("scheduler", "schedule_after_task_execution", fallback=True): diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 0596e7f59f..caecda1703 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -763,9 +763,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): tags={"dag_id": ti.dag_id, "task_id": ti.task_id}, ) msg = ( - "Executor reports task instance %s finished (%s) although the " - "task says it's %s. (Info: %s) Was the task killed externally?" + "The executor reported that the task instance %s finished with state %s, but the task instance's state attribute is %s. " # noqa: RUF100, UP031, flynt + "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally" + % (ti, state, ti.state) ) + if info is not None: + msg += " Extra info: %s" % info # noqa: RUF100, UP031, flynt self._task_context_logger.error(msg, ti, state, ti.state, info, ti=ti) # Get task from the Serialized DAG @@ -781,12 +784,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): request = TaskCallbackRequest( full_filepath=ti.dag_model.fileloc, simple_task_instance=SimpleTaskInstance.from_ti(ti), - msg=msg % (ti, state, ti.state, info), + msg=msg, processor_subdir=ti.dag_model.processor_subdir, ) self.job.executor.send_callback(request) else: - ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session) + ti.handle_failure(error=msg, session=session) return len(event_buffer) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 1d8c2c9917..a7e58e8431 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -21,6 +21,7 @@ from __future__ import annotations import logging import os +import signal from typing import TYPE_CHECKING import psutil @@ -65,7 +66,6 @@ class StandardTaskRunner(BaseTaskRunner): else: # Start a new process group set_new_process_group() - import signal signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) @@ -170,13 +170,14 @@ class StandardTaskRunner(BaseTaskRunner): if self._rc is None: # Something else reaped it before we had a chance, so let's just "guess" at an error code. - self._rc = -9 + self._rc = -signal.SIGKILL - if self._rc == -9: - # If either we or psutil gives out a -9 return code, it likely means - # an OOM happened + if self._rc == -signal.SIGKILL: self.log.error( - "Job %s was killed before it finished (likely due to running out of memory)", + ( + "Job %s was killed before it finished (likely due to running out of memory)", + "For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed", + ), self._task_instance.job_id, ) diff --git a/docs/apache-airflow/core-concepts/tasks.rst b/docs/apache-airflow/core-concepts/tasks.rst index ca18196033..0e05f55bcf 100644 --- a/docs/apache-airflow/core-concepts/tasks.rst +++ b/docs/apache-airflow/core-concepts/tasks.rst @@ -245,7 +245,12 @@ No system runs perfectly, and task instances are expected to die once in a while * *Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive (e.g. their process did not send a recent heartbeat as it got killed, or the machine died). Airflow will find these - periodically, clean them up, and either fail or retry the task depending on its settings. + periodically, clean them up, and either fail or retry the task depending on its settings. Tasks can become zombies for + many reasons, including: + + * The Airflow worker ran out of memory and was OOMKilled. + * The Airflow worker failed its liveness probe, so the system (for example, Kubernetes) restarted the worker. + * The system (for example, Kubernetes) scaled down and moved an Airflow worker from one node to another. * *Undead tasks* are tasks that are *not* supposed to be running but are, often caused when you manually edit Task Instances via the UI. Airflow will find them periodically and terminate them. diff --git a/docs/apache-airflow/index.rst b/docs/apache-airflow/index.rst index 760f31fde4..f2894af103 100644 --- a/docs/apache-airflow/index.rst +++ b/docs/apache-airflow/index.rst @@ -151,6 +151,7 @@ so coding will always be required. public-airflow-interface best-practices faq + troubleshooting Release Policies <release-process> release_notes privacy_notice diff --git a/docs/apache-airflow/troubleshooting.rst b/docs/apache-airflow/troubleshooting.rst new file mode 100644 index 0000000000..076aa5501b --- /dev/null +++ b/docs/apache-airflow/troubleshooting.rst @@ -0,0 +1,48 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _troubleshooting: + +Troubleshooting +=============== + +Obscure task failures +^^^^^^^^^^^^^^^^^^^^^ + +Task state changed externally +----------------------------- + +There are many potential causes for a task's state to be changed by a component other than the executor, which might cause some confusion when reviewing task instance or scheduler logs. + +Below are some example scenarios that could cause a task's state to change by a component other than the executor: + +- If a task's DAG failed to parse on the worker, the scheduler may mark the task as failed. If confirmed, consider increasing :ref:`core.dagbag_import_timeout <config:core__dagbag_import_timeout>` and :ref:`core.dag_file_processor_timeout <config:core__dag_file_processor_timeout>`. +- The scheduler will mark a task as failed if the task has been queued for longer than :ref:`scheduler.task_queued_timeout <config:scheduler__task_queued_timeout>`. +- If a task becomes a :ref:`zombie <concepts:zombies>`, it will be marked failed by the scheduler. +- A user marked the task as successful or failed in the Airflow UI. +- An external script or process used the :doc:`Airflow REST API <stable-rest-api-ref>` to change the state of a task. + +LocalTaskJob killed +------------------- + +Sometimes, Airflow or some adjacent system will kill a task instance's ``LocalTaskJob``, causing the task instance to fail. + +Here are some examples that could cause such an event: + +- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition. +- An Airflow worker running out of memory + - Usually, Airflow workers that run out of memory receive a SIGKILL and are marked as a zombie and failed by the scheduler. However, in some scenarios, Airflow kills the task before that happens. diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 2c57ca2a9f..0d0af95c7d 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -385,10 +385,10 @@ class TestSchedulerJob: full_filepath=dag.fileloc, simple_task_instance=mock.ANY, processor_subdir=None, - msg="Executor reports task instance " + msg="The executor reported that the task instance " "<TaskInstance: test_process_executor_events_with_callback.dummy_task test [queued]> " - "finished (failed) although the task says it's queued. (Info: None) " - "Was the task killed externally?", + "finished with state failed, but the task instance's state attribute is queued. " + "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally", ) scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback) scheduler_job.executor.callback_sink.reset_mock()
