This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a4464c4bf18636c0cd0272e61ac95ece5c1fb64f
Author: Ryan Hatter <[email protected]>
AuthorDate: Sun Jun 30 17:12:08 2024 -0400

    revamp some confusing log messages (#40334)
    
    (cherry picked from commit 34e7cab949dcabdad04d201d05de2cd1a6a24b29)
---
 airflow/jobs/backfill_job_runner.py              |  7 +++-
 airflow/jobs/local_task_job_runner.py            |  5 ++-
 airflow/jobs/scheduler_job_runner.py             | 11 ++++--
 airflow/task/task_runner/standard_task_runner.py | 13 ++++---
 docs/apache-airflow/core-concepts/tasks.rst      |  7 +++-
 docs/apache-airflow/index.rst                    |  1 +
 docs/apache-airflow/troubleshooting.rst          | 48 ++++++++++++++++++++++++
 tests/jobs/test_scheduler_job.py                 |  6 +--
 8 files changed, 81 insertions(+), 17 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 6be72b2c95..56a5d0763a 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -304,9 +304,12 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
                 and ti.state in self.STATES_COUNT_AS_RUNNING
             ):
                 msg = (
-                    f"Executor reports task instance {ti} finished ({state}) 
although the task says its "
-                    f"{ti.state}. Was the task killed externally? Info: {info}"
+                    f"The executor reported that the task instance {ti} 
finished with state {state}, "
+                    f"but the task instance's state attribute is {ti.state}. "
+                    "Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally";
                 )
+                if info is not None:
+                    msg += f" Extra info: {info}"
                 self.log.error(msg)
                 ti.handle_failure(error=msg)
                 continue
diff --git a/airflow/jobs/local_task_job_runner.py 
b/airflow/jobs/local_task_job_runner.py
index bb520825f2..b9fd1c81e3 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -237,7 +237,10 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
             self.log.info("Task exited with return code %s (task deferral)", 
return_code)
             _set_task_deferred_context_var()
         else:
-            self.log.info("Task exited with return code %s", return_code)
+            message = f"Task exited with return code {return_code}"
+            if return_code == -signal.SIGKILL:
+                message += "For more information, see 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed";
+            self.log.info(message)
 
         if not (self.task_instance.test_mode or is_deferral):
             if conf.getboolean("scheduler", "schedule_after_task_execution", 
fallback=True):
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 0596e7f59f..caecda1703 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -763,9 +763,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
                 )
                 msg = (
-                    "Executor reports task instance %s finished (%s) although 
the "
-                    "task says it's %s. (Info: %s) Was the task killed 
externally?"
+                    "The executor reported that the task instance %s finished 
with state %s, but the task instance's state attribute is %s. "  # noqa: 
RUF100, UP031, flynt
+                    "Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally";
+                    % (ti, state, ti.state)
                 )
+                if info is not None:
+                    msg += " Extra info: %s" % info  # noqa: RUF100, UP031, 
flynt
                 self._task_context_logger.error(msg, ti, state, ti.state, 
info, ti=ti)
 
                 # Get task from the Serialized DAG
@@ -781,12 +784,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     request = TaskCallbackRequest(
                         full_filepath=ti.dag_model.fileloc,
                         simple_task_instance=SimpleTaskInstance.from_ti(ti),
-                        msg=msg % (ti, state, ti.state, info),
+                        msg=msg,
                         processor_subdir=ti.dag_model.processor_subdir,
                     )
                     self.job.executor.send_callback(request)
                 else:
-                    ti.handle_failure(error=msg % (ti, state, ti.state, info), 
session=session)
+                    ti.handle_failure(error=msg, session=session)
 
         return len(event_buffer)
 
diff --git a/airflow/task/task_runner/standard_task_runner.py 
b/airflow/task/task_runner/standard_task_runner.py
index 1d8c2c9917..a7e58e8431 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -21,6 +21,7 @@ from __future__ import annotations
 
 import logging
 import os
+import signal
 from typing import TYPE_CHECKING
 
 import psutil
@@ -65,7 +66,6 @@ class StandardTaskRunner(BaseTaskRunner):
         else:
             # Start a new process group
             set_new_process_group()
-            import signal
 
             signal.signal(signal.SIGINT, signal.SIG_DFL)
             signal.signal(signal.SIGTERM, signal.SIG_DFL)
@@ -170,13 +170,14 @@ class StandardTaskRunner(BaseTaskRunner):
 
         if self._rc is None:
             # Something else reaped it before we had a chance, so let's just 
"guess" at an error code.
-            self._rc = -9
+            self._rc = -signal.SIGKILL
 
-        if self._rc == -9:
-            # If either we or psutil gives out a -9 return code, it likely 
means
-            # an OOM happened
+        if self._rc == -signal.SIGKILL:
             self.log.error(
-                "Job %s was killed before it finished (likely due to running 
out of memory)",
+                (
+                    "Job %s was killed before it finished (likely due to 
running out of memory)",
+                    "For more information, see 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed";,
+                ),
                 self._task_instance.job_id,
             )
 
diff --git a/docs/apache-airflow/core-concepts/tasks.rst 
b/docs/apache-airflow/core-concepts/tasks.rst
index ca18196033..0e05f55bcf 100644
--- a/docs/apache-airflow/core-concepts/tasks.rst
+++ b/docs/apache-airflow/core-concepts/tasks.rst
@@ -245,7 +245,12 @@ No system runs perfectly, and task instances are expected 
to die once in a while
 
 * *Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite 
their associated jobs being inactive
   (e.g. their process did not send a recent heartbeat as it got killed, or the 
machine died). Airflow will find these
-  periodically, clean them up, and either fail or retry the task depending on 
its settings.
+  periodically, clean them up, and either fail or retry the task depending on 
its settings. Tasks can become zombies for
+  many reasons, including:
+
+    * The Airflow worker ran out of memory and was OOMKilled.
+    * The Airflow worker failed its liveness probe, so the system (for 
example, Kubernetes) restarted the worker.
+    * The system (for example, Kubernetes) scaled down and moved an Airflow 
worker from one node to another.
 
 * *Undead tasks* are tasks that are *not* supposed to be running but are, 
often caused when you manually edit Task
   Instances via the UI. Airflow will find them periodically and terminate them.
diff --git a/docs/apache-airflow/index.rst b/docs/apache-airflow/index.rst
index 760f31fde4..f2894af103 100644
--- a/docs/apache-airflow/index.rst
+++ b/docs/apache-airflow/index.rst
@@ -151,6 +151,7 @@ so coding will always be required.
     public-airflow-interface
     best-practices
     faq
+    troubleshooting
     Release Policies <release-process>
     release_notes
     privacy_notice
diff --git a/docs/apache-airflow/troubleshooting.rst 
b/docs/apache-airflow/troubleshooting.rst
new file mode 100644
index 0000000000..076aa5501b
--- /dev/null
+++ b/docs/apache-airflow/troubleshooting.rst
@@ -0,0 +1,48 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _troubleshooting:
+
+Troubleshooting
+===============
+
+Obscure task failures
+^^^^^^^^^^^^^^^^^^^^^
+
+Task state changed externally
+-----------------------------
+
+There are many potential causes for a task's state to be changed by a 
component other than the executor, which might cause some confusion when 
reviewing task instance or scheduler logs.
+
+Below are some example scenarios that could cause a task's state to change by 
a component other than the executor:
+
+- If a task's DAG failed to parse on the worker, the scheduler may mark the 
task as failed. If confirmed, consider increasing 
:ref:`core.dagbag_import_timeout <config:core__dagbag_import_timeout>` and 
:ref:`core.dag_file_processor_timeout 
<config:core__dag_file_processor_timeout>`.
+- The scheduler will mark a task as failed if the task has been queued for 
longer than :ref:`scheduler.task_queued_timeout 
<config:scheduler__task_queued_timeout>`.
+- If a task becomes a :ref:`zombie <concepts:zombies>`, it will be marked 
failed by the scheduler.
+- A user marked the task as successful or failed in the Airflow UI.
+- An external script or process used the :doc:`Airflow REST API 
<stable-rest-api-ref>` to change the state of a task.
+
+LocalTaskJob killed
+-------------------
+
+Sometimes, Airflow or some adjacent system will kill a task instance's 
``LocalTaskJob``, causing the task instance to fail.
+
+Here are some examples that could cause such an event:
+
+- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
+- An Airflow worker running out of memory
+  - Usually, Airflow workers that run out of memory receive a SIGKILL and are 
marked as a zombie and failed by the scheduler. However, in some scenarios, 
Airflow kills the task before that happens.
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 2c57ca2a9f..0d0af95c7d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -385,10 +385,10 @@ class TestSchedulerJob:
             full_filepath=dag.fileloc,
             simple_task_instance=mock.ANY,
             processor_subdir=None,
-            msg="Executor reports task instance "
+            msg="The executor reported that the task instance "
             "<TaskInstance: 
test_process_executor_events_with_callback.dummy_task test [queued]> "
-            "finished (failed) although the task says it's queued. (Info: 
None) "
-            "Was the task killed externally?",
+            "finished with state failed, but the task instance's state 
attribute is queued. "
+            "Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally";,
         )
         
scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback)
         scheduler_job.executor.callback_sink.reset_mock()

Reply via email to