This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a398d9d4fa KubernetesPodTrigger: add exception stack trace in 
TriggerEvent (#35716)
a398d9d4fa is described below

commit a398d9d4fa38479155058b95abdcb96f9b918646
Author: Dagang Wei <[email protected]>
AuthorDate: Sat Dec 16 13:16:02 2023 -0800

    KubernetesPodTrigger: add exception stack trace in TriggerEvent (#35716)
---
 airflow/providers/cncf/kubernetes/operators/pod.py              | 7 ++++++-
 airflow/providers/cncf/kubernetes/triggers/pod.py               | 2 ++
 tests/providers/cncf/kubernetes/triggers/test_pod.py            | 2 ++
 tests/providers/google/cloud/triggers/test_kubernetes_engine.py | 2 ++
 4 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index b3c10d4ff1..55fe1e4c36 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -672,6 +672,7 @@ class KubernetesPodOperator(BaseOperator):
         )
 
     def execute_complete(self, context: Context, event: dict, **kwargs):
+        self.log.debug("Triggered with event: %s", event)
         pod = None
         try:
             pod = self.hook.get_pod(
@@ -682,7 +683,11 @@ class KubernetesPodOperator(BaseOperator):
                 # fetch some logs when pod is failed
                 if self.get_logs:
                     self.write_logs(pod)
-                raise AirflowException(event["message"])
+                if "stack_trace" in event:
+                    message = f"{event['message']}\n{event['stack_trace']}"
+                else:
+                    message = event["message"]
+                raise AirflowException(message)
             elif event["status"] == "success":
                 # fetch some logs when pod is executed successfully
                 if self.get_logs:
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 5eda424276..b7f0348b66 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import asyncio
 import datetime
+import traceback
 import warnings
 from asyncio import CancelledError
 from enum import Enum
@@ -231,6 +232,7 @@ class KubernetesPodTrigger(BaseTrigger):
                     "namespace": self.pod_namespace,
                     "status": "error",
                     "message": str(e),
+                    "stack_trace": traceback.format_exc(),
                 }
             )
 
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py 
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 5694698d10..42a0196ed7 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -181,12 +181,14 @@ class TestKubernetesPodTrigger:
 
         generator = trigger.run()
         actual = await generator.asend(None)
+        actual_stack_trace = actual.payload.pop("stack_trace")
         assert (
             TriggerEvent(
                 {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", 
"message": "Test exception"}
             )
             == actual
         )
+        assert actual_stack_trace.startswith("Traceback (most recent call 
last):")
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_PATH}.define_container_state")
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py 
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index 41f0a67211..b28d9418ef 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -199,12 +199,14 @@ class TestGKEStartPodTrigger:
 
         generator = trigger.run()
         actual = await generator.asend(None)
+        actual_stack_trace = actual.payload.pop("stack_trace")
         assert (
             TriggerEvent(
                 {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", 
"message": "Test exception"}
             )
             == actual
         )
+        assert actual_stack_trace.startswith("Traceback (most recent call 
last):")
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")

Reply via email to