This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a398d9d4fa KubernetesPodTrigger: add exception stack trace in
TriggerEvent (#35716)
a398d9d4fa is described below
commit a398d9d4fa38479155058b95abdcb96f9b918646
Author: Dagang Wei <[email protected]>
AuthorDate: Sat Dec 16 13:16:02 2023 -0800
KubernetesPodTrigger: add exception stack trace in TriggerEvent (#35716)
---
airflow/providers/cncf/kubernetes/operators/pod.py | 7 ++++++-
airflow/providers/cncf/kubernetes/triggers/pod.py | 2 ++
tests/providers/cncf/kubernetes/triggers/test_pod.py | 2 ++
tests/providers/google/cloud/triggers/test_kubernetes_engine.py | 2 ++
4 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index b3c10d4ff1..55fe1e4c36 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -672,6 +672,7 @@ class KubernetesPodOperator(BaseOperator):
)
def execute_complete(self, context: Context, event: dict, **kwargs):
+ self.log.debug("Triggered with event: %s", event)
pod = None
try:
pod = self.hook.get_pod(
@@ -682,7 +683,11 @@ class KubernetesPodOperator(BaseOperator):
# fetch some logs when pod is failed
if self.get_logs:
self.write_logs(pod)
- raise AirflowException(event["message"])
+ if "stack_trace" in event:
+ message = f"{event['message']}\n{event['stack_trace']}"
+ else:
+ message = event["message"]
+ raise AirflowException(message)
elif event["status"] == "success":
# fetch some logs when pod is executed successfully
if self.get_logs:
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 5eda424276..b7f0348b66 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
import datetime
+import traceback
import warnings
from asyncio import CancelledError
from enum import Enum
@@ -231,6 +232,7 @@ class KubernetesPodTrigger(BaseTrigger):
"namespace": self.pod_namespace,
"status": "error",
"message": str(e),
+ "stack_trace": traceback.format_exc(),
}
)
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 5694698d10..42a0196ed7 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -181,12 +181,14 @@ class TestKubernetesPodTrigger:
generator = trigger.run()
actual = await generator.asend(None)
+ actual_stack_trace = actual.payload.pop("stack_trace")
assert (
TriggerEvent(
{"name": POD_NAME, "namespace": NAMESPACE, "status": "error",
"message": "Test exception"}
)
== actual
)
+ assert actual_stack_trace.startswith("Traceback (most recent call
last):")
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index 41f0a67211..b28d9418ef 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -199,12 +199,14 @@ class TestGKEStartPodTrigger:
generator = trigger.run()
actual = await generator.asend(None)
+ actual_stack_trace = actual.payload.pop("stack_trace")
assert (
TriggerEvent(
{"name": POD_NAME, "namespace": NAMESPACE, "status": "error",
"message": "Test exception"}
)
== actual
)
+ assert actual_stack_trace.startswith("Traceback (most recent call
last):")
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")