This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 3063df77d78 Add traceback log output when sigterm was sent (#44880)
(#45077)
3063df77d78 is described below
commit 3063df77d782cb7922d7075c0ebecae9f5977526
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Dec 20 13:50:06 2024 +0100
Add traceback log output when sigterm was sent (#44880) (#45077)
Co-authored-by: Ulada Zakharava <[email protected]>
(cherry picked from commit 9186fc57907c89f2f871d54f981f2b6892920e2f)
Co-authored-by: VladaZakharova <[email protected]>
---
airflow/models/taskinstance.py | 2 ++
tests/models/test_taskinstance.py | 21 +++++++++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5a6de8e6ecf..2c1f52fc402 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -26,6 +26,7 @@ import math
import operator
import os
import signal
+import traceback
import warnings
from collections import defaultdict
from contextlib import nullcontext
@@ -3091,6 +3092,7 @@ class TaskInstance(Base, LoggingMixin):
os._exit(1)
return
self.log.error("Received SIGTERM. Terminating subprocesses.")
+ self.log.error("Stacktrace: \n%s",
"".join(traceback.format_stack()))
self.task.on_kill()
raise AirflowTaskTerminated("Task received SIGTERM signal")
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 15a9eb1ba33..107f618a00c 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -523,6 +523,27 @@ class TestTaskInstance:
ti.run()
assert "on_failure_callback called" in caplog.text
+ def test_task_sigterm_calls_with_traceback_in_logs(self, dag_maker,
caplog):
+ """
+ Test that ensures that tasks print traceback to the logs when they
receive sigterm
+ """
+
+ def task_function(ti):
+ os.kill(ti.pid, signal.SIGTERM)
+
+ with dag_maker():
+ task_ = PythonOperator(
+ task_id="test_on_failure",
+ python_callable=task_function,
+ )
+
+ dr = dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
+ ti.task = task_
+ with pytest.raises(AirflowTaskTerminated):
+ ti.run()
+ assert "Stacktrace: " in caplog.text
+
def test_task_sigterm_works_with_retries(self, dag_maker):
"""
Test that ensures that tasks are retried when they receive sigterm