This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9186fc57907 Add traceback log output when sigterm was sent (#44880)
9186fc57907 is described below
commit 9186fc57907c89f2f871d54f981f2b6892920e2f
Author: VladaZakharova <[email protected]>
AuthorDate: Thu Dec 19 14:31:07 2024 +0100
Add traceback log output when sigterm was sent (#44880)
Co-authored-by: Ulada Zakharava <[email protected]>
---
airflow/models/taskinstance.py | 2 ++
tests/models/test_taskinstance.py | 21 +++++++++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 24ebcebcc37..9bd8e8da12a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -26,6 +26,7 @@ import math
import operator
import os
import signal
+import traceback
from collections import defaultdict
from collections.abc import Collection, Generator, Iterable, Mapping
from datetime import timedelta
@@ -2804,6 +2805,7 @@ class TaskInstance(Base, LoggingMixin):
os._exit(1)
return
self.log.error("Received SIGTERM. Terminating subprocesses.")
+ self.log.error("Stacktrace: \n%s",
"".join(traceback.format_stack()))
self.task.on_kill()
raise AirflowTaskTerminated("Task received SIGTERM signal")
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index b1b8370f188..244fa2ed5bf 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -516,6 +516,27 @@ class TestTaskInstance:
ti.run()
assert "on_failure_callback called" in caplog.text
+ def test_task_sigterm_calls_with_traceback_in_logs(self, dag_maker,
caplog):
+ """
+ Test that ensures that tasks print traceback to the logs when they
receive sigterm
+ """
+
+ def task_function(ti):
+ os.kill(ti.pid, signal.SIGTERM)
+
+ with dag_maker():
+ task_ = PythonOperator(
+ task_id="test_on_failure",
+ python_callable=task_function,
+ )
+
+ dr = dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
+ ti.task = task_
+ with pytest.raises(AirflowTaskTerminated):
+ ti.run()
+ assert "Stacktrace: " in caplog.text
+
def test_task_sigterm_works_with_retries(self, dag_maker):
"""
Test that ensures that tasks are retried when they receive sigterm