This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0640e6d595 KPO Maintain backward compatibility for execute_complete 
and trigger run method (#37363)
0640e6d595 is described below

commit 0640e6d595c01dd96f2b90812a546bc091f87743
Author: Pankaj Singh <[email protected]>
AuthorDate: Thu Feb 15 00:34:40 2024 +0530

    KPO Maintain backward compatibility for execute_complete and trigger run 
method (#37363)
    
    * Restore KPO trigger event param name
    
    * sync with execute_complete
    
    * change trigger emit state done to failed/scuccess
    
    * Add deprecation warning
    
    * Address review feedback
    
    * Fix tests and cleanup
    
    * Fix tests
    
    * Remove test log stmpt
    
    * Fix tests
    
    ---------
    
    Co-authored-by: Pankaj Koti <[email protected]>
---
 airflow/providers/cncf/kubernetes/operators/pod.py | 150 +++++++++------------
 airflow/providers/cncf/kubernetes/triggers/pod.py  |  70 +++++++---
 .../cncf/kubernetes/operators/test_pod.py          |  34 +++--
 .../providers/cncf/kubernetes/triggers/test_pod.py |  92 +++++++------
 .../cloud/triggers/test_kubernetes_engine.py       |  51 +++----
 5 files changed, 208 insertions(+), 189 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 73389f4038..61442a6014 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -18,6 +18,7 @@
 
 from __future__ import annotations
 
+import datetime
 import json
 import logging
 import re
@@ -30,6 +31,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
 
 import kubernetes
+from deprecated import deprecated
 from kubernetes.client import CoreV1Api, V1Pod, models as k8s
 from kubernetes.stream import stream
 from urllib3.exceptions import HTTPError
@@ -68,7 +70,6 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import (
     EMPTY_XCOM_RESULT,
     OnFinishAction,
     PodLaunchFailedException,
-    PodLaunchTimeoutException,
     PodManager,
     PodNotFoundException,
     PodOperatorHookProtocol,
@@ -79,7 +80,6 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import (
 from airflow.settings import pod_mutation_hook
 from airflow.utils import yaml
 from airflow.utils.helpers import prune_dict, validate_key
-from airflow.utils.timezone import utcnow
 from airflow.version import version as airflow_version
 
 if TYPE_CHECKING:
@@ -656,7 +656,7 @@ class KubernetesPodOperator(BaseOperator):
 
     def invoke_defer_method(self, last_log_time: DateTime | None = None):
         """Redefine triggers which are being used in child classes."""
-        trigger_start_time = utcnow()
+        trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
         self.defer(
             trigger=KubernetesPodTrigger(
                 pod_name=self.pod.metadata.name,  # type: ignore[union-attr]
@@ -678,117 +678,87 @@ class KubernetesPodOperator(BaseOperator):
             method_name="trigger_reentry",
         )
 
-    @staticmethod
-    def raise_for_trigger_status(event: dict[str, Any]) -> None:
-        """Raise exception if pod is not in expected state."""
-        if event["status"] == "error":
-            error_type = event["error_type"]
-            description = event["description"]
-            if error_type == "PodLaunchTimeoutException":
-                raise PodLaunchTimeoutException(description)
-            else:
-                raise AirflowException(description)
-
     def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
         """
         Point of re-entry from trigger.
 
-        If ``logging_interval`` is None, then at this point the pod should be 
done and we'll just fetch
+        If ``logging_interval`` is None, then at this point, the pod should be 
done, and we'll just fetch
         the logs and exit.
 
-        If ``logging_interval`` is not None, it could be that the pod is still 
running and we'll just
+        If ``logging_interval`` is not None, it could be that the pod is still 
running, and we'll just
         grab the latest logs and defer back to the trigger again.
         """
-        remote_pod = None
+        self.pod = None
         try:
-            self.pod_request_obj = self.build_pod_request_obj(context)
-            self.pod = self.find_pod(
-                namespace=self.namespace or 
self.pod_request_obj.metadata.namespace,
-                context=context,
-            )
+            pod_name = event["name"]
+            pod_namespace = event["namespace"]
 
-            # we try to find pod before possibly raising so that on_kill will 
have `pod` attr
-            self.raise_for_trigger_status(event)
+            self.pod = self.hook.get_pod(pod_name, pod_namespace)
 
             if not self.pod:
                 raise PodNotFoundException("Could not find pod after resuming 
from deferral")
 
-            if self.get_logs:
-                last_log_time = event and event.get("last_log_time")
-                if last_log_time:
-                    self.log.info("Resuming logs read from time %r", 
last_log_time)
-                pod_log_status = self.pod_manager.fetch_container_logs(
-                    pod=self.pod,
-                    container_name=self.BASE_CONTAINER_NAME,
-                    follow=self.logging_interval is None,
-                    since_time=last_log_time,
-                )
-                if pod_log_status.running:
-                    self.log.info("Container still running; deferring again.")
-                    self.invoke_defer_method(pod_log_status.last_log_time)
-
-            if self.do_xcom_push:
-                result = self.extract_xcom(pod=self.pod)
-            remote_pod = self.pod_manager.await_pod_completion(self.pod)
-        except TaskDeferred:
-            raise
-        except Exception:
-            self.cleanup(
-                pod=self.pod or self.pod_request_obj,
-                remote_pod=remote_pod,
-            )
-            raise
-        self.cleanup(
-            pod=self.pod or self.pod_request_obj,
-            remote_pod=remote_pod,
-        )
-        if self.do_xcom_push:
-            return result
-
-    def execute_complete(self, context: Context, event: dict, **kwargs):
-        self.log.debug("Triggered with event: %s", event)
-        pod = None
-        try:
-            pod = self.hook.get_pod(
-                event["name"],
-                event["namespace"],
-            )
-            if self.callbacks:
+            if self.callbacks and event["status"] != "running":
                 self.callbacks.on_operator_resuming(
-                    pod=pod, event=event, client=self.client, 
mode=ExecutionMode.SYNC
+                    pod=self.pod, event=event, client=self.client, 
mode=ExecutionMode.SYNC
                 )
+
             if event["status"] in ("error", "failed", "timeout"):
-                # fetch some logs when pod is failed
-                if self.get_logs:
-                    self.write_logs(pod)
-                if "stack_trace" in event:
-                    message = f"{event['message']}\n{event['stack_trace']}"
-                else:
-                    message = event["message"]
                 if self.do_xcom_push:
-                    # In the event of base container failure, we need to kill 
the xcom sidecar.
-                    # We disregard xcom output and do that here
-                    _ = self.extract_xcom(pod=pod)
+                    _ = self.extract_xcom(pod=self.pod)
+
+                message = event.get("stack_trace", event["message"])
                 raise AirflowException(message)
-            elif event["status"] == "success":
-                # fetch some logs when pod is executed successfully
+
+            elif event["status"] == "running":
                 if self.get_logs:
-                    self.write_logs(pod)
+                    last_log_time = event.get("last_log_time")
+                    self.log.info("Resuming logs read from time %r", 
last_log_time)
+
+                    pod_log_status = self.pod_manager.fetch_container_logs(
+                        pod=self.pod,
+                        container_name=self.BASE_CONTAINER_NAME,
+                        follow=self.logging_interval is None,
+                        since_time=last_log_time,
+                    )
 
+                    if pod_log_status.running:
+                        self.log.info("Container still running; deferring 
again.")
+                        self.invoke_defer_method(pod_log_status.last_log_time)
+                else:
+                    self.invoke_defer_method()
+
+            elif event["status"] == "success":
                 if self.do_xcom_push:
-                    xcom_sidecar_output = self.extract_xcom(pod=pod)
+                    xcom_sidecar_output = self.extract_xcom(pod=self.pod)
                     return xcom_sidecar_output
+                return
+        except TaskDeferred:
+            raise
         finally:
-            istio_enabled = self.is_istio_enabled(pod)
-            # Skip await_pod_completion when the event is 'timeout' due to the 
pod can hang
-            # on the ErrImagePull or ContainerCreating step and it will never 
complete
-            if event["status"] != "timeout":
-                pod = self.pod_manager.await_pod_completion(pod, 
istio_enabled, self.base_container_name)
-            if pod is not None:
-                self.post_complete_action(
-                    pod=pod,
-                    remote_pod=pod,
-                )
+            self._clean(event)
+
+    def _clean(self, event: dict[str, Any]):
+        if event["status"] == "running":
+            return
+        if self.get_logs:
+            self.write_logs(self.pod)
+        istio_enabled = self.is_istio_enabled(self.pod)
+        # Skip await_pod_completion when the event is 'timeout' due to the pod 
can hang
+        # on the ErrImagePull or ContainerCreating step and it will never 
complete
+        if event["status"] != "timeout":
+            self.pod = self.pod_manager.await_pod_completion(
+                self.pod, istio_enabled, self.base_container_name
+            )
+        if self.pod is not None:
+            self.post_complete_action(
+                pod=self.pod,
+                remote_pod=self.pod,
+            )
+
+    @deprecated(reason="use `trigger_reentry` instead.", 
category=AirflowProviderDeprecationWarning)
+    def execute_complete(self, context: Context, event: dict, **kwargs):
+        self.trigger_reentry(context=context, event=event)
 
     def write_logs(self, pod: k8s.V1Pod):
         try:
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index e34a73f146..c9b1e62226 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -30,10 +30,8 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import (
     OnFinishAction,
     PodLaunchTimeoutException,
     PodPhase,
-    container_is_running,
 )
 from airflow.triggers.base import BaseTrigger, TriggerEvent
-from airflow.utils import timezone
 
 if TYPE_CHECKING:
     from kubernetes_asyncio.client.models import V1Pod
@@ -160,22 +158,49 @@ class KubernetesPodTrigger(BaseTrigger):
         self.log.info("Checking pod %r in namespace %r.", self.pod_name, 
self.pod_namespace)
         try:
             state = await self._wait_for_pod_start()
-            if state in PodPhase.terminal_states:
+            if state == ContainerState.TERMINATED:
                 event = TriggerEvent(
-                    {"status": "done", "namespace": self.pod_namespace, 
"pod_name": self.pod_name}
+                    {
+                        "status": "success",
+                        "namespace": self.pod_namespace,
+                        "name": self.pod_name,
+                        "message": "All containers inside pod have started 
successfully.",
+                    }
+                )
+            elif state == ContainerState.FAILED:
+                event = TriggerEvent(
+                    {
+                        "status": "failed",
+                        "namespace": self.pod_namespace,
+                        "name": self.pod_name,
+                        "message": "pod failed",
+                    }
                 )
             else:
                 event = await self._wait_for_container_completion()
             yield event
+            return
+        except PodLaunchTimeoutException as e:
+            message = self._format_exception_description(e)
+            yield TriggerEvent(
+                {
+                    "name": self.pod_name,
+                    "namespace": self.pod_namespace,
+                    "status": "timeout",
+                    "message": message,
+                }
+            )
         except Exception as e:
-            description = self._format_exception_description(e)
             yield TriggerEvent(
                 {
+                    "name": self.pod_name,
+                    "namespace": self.pod_namespace,
                     "status": "error",
-                    "error_type": e.__class__.__name__,
-                    "description": description,
+                    "message": str(e),
+                    "stack_trace": traceback.format_exc(),
                 }
             )
+            return
 
     def _format_exception_description(self, exc: Exception) -> Any:
         if isinstance(exc, PodLaunchTimeoutException):
@@ -189,14 +214,13 @@ class KubernetesPodTrigger(BaseTrigger):
         description += f"\ntrigger traceback:\n{curr_traceback}"
         return description
 
-    async def _wait_for_pod_start(self) -> Any:
+    async def _wait_for_pod_start(self) -> ContainerState:
         """Loops until pod phase leaves ``PENDING`` If timeout is reached, 
throws error."""
-        start_time = timezone.utcnow()
-        timeout_end = start_time + 
datetime.timedelta(seconds=self.startup_timeout)
-        while timeout_end > timezone.utcnow():
+        delta = datetime.datetime.now(tz=datetime.timezone.utc) - 
self.trigger_start_time
+        while self.startup_timeout >= delta.total_seconds():
             pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
             if not pod.status.phase == "Pending":
-                return pod.status.phase
+                return self.define_container_state(pod)
             self.log.info("Still waiting for pod to start. The pod state is 
%s", pod.status.phase)
             await asyncio.sleep(self.poll_interval)
         raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase 
within specified timeout")
@@ -208,18 +232,30 @@ class KubernetesPodTrigger(BaseTrigger):
         Waits until container is no longer in running state. If trigger is 
configured with a logging period,
         then will emit an event to resume the task for the purpose of fetching 
more logs.
         """
-        time_begin = timezone.utcnow()
+        time_begin = datetime.datetime.now(tz=datetime.timezone.utc)
         time_get_more_logs = None
         if self.logging_interval is not None:
             time_get_more_logs = time_begin + 
datetime.timedelta(seconds=self.logging_interval)
         while True:
             pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
-            if not container_is_running(pod=pod, 
container_name=self.base_container_name):
+            container_state = self.define_container_state(pod)
+            if container_state == ContainerState.TERMINATED:
+                return TriggerEvent(
+                    {"status": "success", "namespace": self.pod_namespace, 
"name": self.pod_name}
+                )
+            elif container_state == ContainerState.FAILED:
+                return TriggerEvent(
+                    {"status": "failed", "namespace": self.pod_namespace, 
"name": self.pod_name}
+                )
+            if time_get_more_logs and 
datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs:
                 return TriggerEvent(
-                    {"status": "done", "namespace": self.pod_namespace, 
"pod_name": self.pod_name}
+                    {
+                        "status": "running",
+                        "last_log_time": self.last_log_time,
+                        "namespace": self.pod_namespace,
+                        "name": self.pod_name,
+                    }
                 )
-            if time_get_more_logs and timezone.utcnow() > time_get_more_logs:
-                return TriggerEvent({"status": "running", "last_log_time": 
self.last_log_time})
             await asyncio.sleep(self.poll_interval)
 
     def _get_async_hook(self) -> AsyncKubernetesHook:
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index c27cd23146..faa21eb7d7 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -35,7 +35,6 @@ from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperato
 from airflow.providers.cncf.kubernetes.secret import Secret
 from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
-    PodLaunchTimeoutException,
     PodLoggingStatus,
     PodPhase,
 )
@@ -1973,41 +1972,39 @@ class TestKubernetesPodOperatorAsync:
         with pytest.raises(AirflowException, match=expect_match):
             k.cleanup(pod, pod)
 
-    @mock.patch(
-        
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status"
-    )
-    
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
+    @mock.patch(f"{HOOK_CLASS}.get_pod")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
     def test_get_logs_running(
         self,
         fetch_container_logs,
         await_pod_completion,
-        find_pod,
-        raise_for_trigger_status,
+        get_pod,
     ):
         """When logs fetch exits with status running, raise task deferred"""
         pod = MagicMock()
-        find_pod.return_value = pod
+        get_pod.return_value = pod
         op = KubernetesPodOperator(task_id="test_task", name="test-pod", 
get_logs=True)
         await_pod_completion.return_value = None
         fetch_container_logs.return_value = PodLoggingStatus(True, None)
         with pytest.raises(TaskDeferred):
-            op.trigger_reentry(create_context(op), None)
+            op.trigger_reentry(
+                create_context(op),
+                event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, 
"status": "running"},
+            )
         fetch_container_logs.is_called_with(pod, "base")
 
     
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
-    @mock.patch(
-        
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status"
-    )
     
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
-    def test_get_logs_not_running(self, fetch_container_logs, find_pod, 
raise_for_trigger_status, cleanup):
+    def test_get_logs_not_running(self, fetch_container_logs, find_pod, 
cleanup):
         pod = MagicMock()
         find_pod.return_value = pod
         op = KubernetesPodOperator(task_id="test_task", name="test-pod", 
get_logs=True)
         fetch_container_logs.return_value = PodLoggingStatus(False, None)
-        op.trigger_reentry(create_context(op), None)
+        op.trigger_reentry(
+            create_context(op), event={"name": TEST_NAME, "namespace": 
TEST_NAMESPACE, "status": "success"}
+        )
         fetch_container_logs.is_called_with(pod, "base")
 
     
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
@@ -2016,14 +2013,15 @@ class TestKubernetesPodOperatorAsync:
         """Assert that trigger_reentry raise exception in case of error"""
         find_pod.return_value = MagicMock()
         op = KubernetesPodOperator(task_id="test_task", name="test-pod", 
get_logs=True)
-        with pytest.raises(PodLaunchTimeoutException):
+        with pytest.raises(AirflowException):
             context = create_context(op)
             op.trigger_reentry(
                 context,
                 {
-                    "status": "error",
-                    "error_type": "PodLaunchTimeoutException",
-                    "description": "any message",
+                    "status": "timeout",
+                    "message": "any message",
+                    "name": TEST_NAME,
+                    "namespace": TEST_NAMESPACE,
                 },
             )
 
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py 
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index d12100e4e3..bed52811fc 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -122,9 +122,10 @@ class TestKubernetesPodTrigger:
 
         expected_event = TriggerEvent(
             {
-                "pod_name": POD_NAME,
-                "namespace": NAMESPACE,
-                "status": "done",
+                "status": "success",
+                "namespace": "default",
+                "name": "test-pod-name",
+                "message": "All containers inside pod have started 
successfully.",
             }
         )
         actual_event = await trigger.run().asend(None)
@@ -132,16 +133,11 @@ class TestKubernetesPodTrigger:
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
-    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
-    async def test_run_loop_return_waiting_event(
-        self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, 
trigger, caplog
-    ):
+    async def test_run_loop_return_waiting_event(self, mock_hook, mock_method, 
trigger, caplog):
         mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
         mock_method.return_value = ContainerState.WAITING
-        mock_container_is_running.return_value = True
 
         caplog.set_level(logging.INFO)
 
@@ -153,16 +149,11 @@ class TestKubernetesPodTrigger:
         assert f"Sleeping for {POLL_INTERVAL} seconds."
 
     @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
-    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
-    async def test_run_loop_return_running_event(
-        self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, 
trigger, caplog
-    ):
+    async def test_run_loop_return_running_event(self, mock_hook, mock_method, 
trigger, caplog):
         mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
         mock_method.return_value = ContainerState.RUNNING
-        mock_container_is_running.return_value = True
 
         caplog.set_level(logging.INFO)
 
@@ -187,11 +178,7 @@ class TestKubernetesPodTrigger:
         mock_method.return_value = ContainerState.FAILED
 
         expected_event = TriggerEvent(
-            {
-                "pod_name": POD_NAME,
-                "namespace": NAMESPACE,
-                "status": "done",
-            }
+            {"status": "failed", "namespace": "default", "name": 
"test-pod-name", "message": "pod failed"}
         )
         actual_event = await trigger.run().asend(None)
 
@@ -210,8 +197,14 @@ class TestKubernetesPodTrigger:
 
         generator = trigger.run()
         actual = await generator.asend(None)
-        actual_stack_trace = actual.payload.pop("description")
-        assert actual_stack_trace.startswith("Trigger KubernetesPodTrigger 
failed with exception Exception")
+        actual_stack_trace = actual.payload.pop("stack_trace")
+        assert (
+            TriggerEvent(
+                {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", 
"message": "Test exception"}
+            )
+            == actual
+        )
+        assert actual_stack_trace.startswith("Traceback (most recent call 
last):")
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_PATH}.define_container_state")
@@ -235,16 +228,24 @@ class TestKubernetesPodTrigger:
     @pytest.mark.parametrize(
         "logging_interval, exp_event",
         [
-            param(0, {"status": "running", "last_log_time": DateTime(2022, 1, 
1)}, id="short_interval"),
-            param(None, {"status": "done", "namespace": mock.ANY, "pod_name": 
mock.ANY}, id="no_interval"),
+            param(
+                0,
+                {
+                    "status": "running",
+                    "last_log_time": DateTime(2022, 1, 1),
+                    "name": POD_NAME,
+                    "namespace": NAMESPACE,
+                },
+                id="short_interval",
+            ),
         ],
     )
-    @mock.patch(
-        "kubernetes_asyncio.client.CoreV1Api.read_namespaced_pod",
-        new=get_read_pod_mock_containers([1, 1, None, None]),
-    )
-    @mock.patch("kubernetes_asyncio.config.load_kube_config")
-    async def test_running_log_interval(self, load_kube_config, 
logging_interval, exp_event):
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.AsyncKubernetesHook.get_pod")
+    async def test_running_log_interval(
+        self, mock_get_pod, mock_wait_for_pod_start, define_container_state, 
logging_interval, exp_event
+    ):
         """
         If log interval given, should emit event with running status and last 
log time.
         Otherwise, should make it to second loop and emit "done" event.
@@ -254,14 +255,15 @@ class TestKubernetesPodTrigger:
         interval is None, the second "running" status will just result in 
continuation of the loop.  And
         when in the next loop we get a non-running status, the trigger fires a 
"done" event.
         """
+        define_container_state.return_value = "running"
         trigger = KubernetesPodTrigger(
-            pod_name=mock.ANY,
-            pod_namespace=mock.ANY,
-            trigger_start_time=mock.ANY,
-            base_container_name=mock.ANY,
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            trigger_start_time=datetime.datetime.now(tz=datetime.timezone.utc),
+            base_container_name=BASE_CONTAINER_NAME,
             startup_timeout=5,
             poll_interval=1,
-            logging_interval=logging_interval,
+            logging_interval=1,
             last_log_time=DateTime(2022, 1, 1),
         )
         assert await trigger.run().__anext__() == TriggerEvent(exp_event)
@@ -306,12 +308,12 @@ class TestKubernetesPodTrigger:
 
     @pytest.mark.asyncio
     @pytest.mark.parametrize("container_state", [ContainerState.WAITING, 
ContainerState.UNDEFINED])
-    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_timeout_event(
         self, mock_hook, mock_method, trigger, caplog, container_state
     ):
-        trigger.trigger_start_time = TRIGGER_START_TIME - 
datetime.timedelta(seconds=5)
+        trigger.trigger_start_time = TRIGGER_START_TIME - 
datetime.timedelta(minutes=2)
         mock_hook.get_pod.return_value = self._mock_pod_result(
             mock.MagicMock(
                 status=mock.MagicMock(
@@ -325,4 +327,14 @@ class TestKubernetesPodTrigger:
 
         generator = trigger.run()
         actual = await generator.asend(None)
-        assert actual == TriggerEvent({"status": "done", "namespace": 
NAMESPACE, "pod_name": POD_NAME})
+        assert (
+            TriggerEvent(
+                {
+                    "name": POD_NAME,
+                    "namespace": NAMESPACE,
+                    "status": "timeout",
+                    "message": "Pod did not leave 'Pending' phase within 
specified timeout",
+                }
+            )
+            == actual
+        )
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py 
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index ca7b7ba358..c6a2d4e72f 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -108,19 +108,20 @@ class TestGKEStartPodTrigger:
         }
 
     @pytest.mark.asyncio
-    @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
     @mock.patch(f"{TRIGGER_GKE_PATH}.hook")
     async def test_run_loop_return_success_event_should_execute_successfully(
-        self, mock_hook, mock_method, trigger
+        self, mock_hook, mock_wait_pod, trigger
     ):
         mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
-        mock_method.return_value = ContainerState.TERMINATED
+        mock_wait_pod.return_value = ContainerState.TERMINATED
 
         expected_event = TriggerEvent(
             {
-                "pod_name": POD_NAME,
+                "name": POD_NAME,
                 "namespace": NAMESPACE,
-                "status": "done",
+                "status": "success",
+                "message": "All containers inside pod have started 
successfully.",
             }
         )
         actual_event = await trigger.run().asend(None)
@@ -128,10 +129,10 @@ class TestGKEStartPodTrigger:
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
-    @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
     @mock.patch(f"{TRIGGER_GKE_PATH}.hook")
     async def test_run_loop_return_failed_event_should_execute_successfully(
-        self, mock_hook, mock_method, trigger
+        self, mock_hook, mock_wait_pod, trigger
     ):
         mock_hook.get_pod.return_value = self._mock_pod_result(
             mock.MagicMock(
@@ -140,13 +141,14 @@ class TestGKEStartPodTrigger:
                 )
             )
         )
-        mock_method.return_value = ContainerState.FAILED
+        mock_wait_pod.return_value = ContainerState.FAILED
 
         expected_event = TriggerEvent(
             {
-                "pod_name": POD_NAME,
+                "name": POD_NAME,
                 "namespace": NAMESPACE,
-                "status": "done",
+                "status": "failed",
+                "message": "pod failed",
             }
         )
         actual_event = await trigger.run().asend(None)
@@ -154,18 +156,15 @@ class TestGKEStartPodTrigger:
         assert actual_event == expected_event
 
     @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
     @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_GKE_PATH}.hook")
     async def test_run_loop_return_waiting_event_should_execute_successfully(
-        self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, 
trigger, caplog
+        self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
     ):
         mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
-        mock_method.return_value = ContainerState.RUNNING
-        mock_container_is_running.return_value = True
+        mock_method.return_value = ContainerState.WAITING
 
-        trigger.logging_interval = 10
         caplog.set_level(logging.INFO)
 
         task = asyncio.create_task(trigger.run().__anext__())
@@ -176,15 +175,13 @@ class TestGKEStartPodTrigger:
         assert f"Sleeping for {POLL_INTERVAL} seconds."
 
     @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
     @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_GKE_PATH}.hook")
     async def test_run_loop_return_running_event_should_execute_successfully(
-        self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, 
trigger, caplog
+        self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
     ):
         mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
-        mock_container_is_running.return_value = True
         mock_method.return_value = ContainerState.RUNNING
 
         caplog.set_level(logging.INFO)
@@ -197,9 +194,10 @@ class TestGKEStartPodTrigger:
         assert f"Sleeping for {POLL_INTERVAL} seconds."
 
     @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
     @mock.patch(f"{TRIGGER_GKE_PATH}.hook")
     async def 
test_logging_in_trigger_when_exception_should_execute_successfully(
-        self, mock_hook, trigger, caplog
+        self, mock_hook, mock_wait_pod, trigger, caplog
     ):
         """
         Test that GKEStartPodTrigger fires the correct event in case of an 
error.
@@ -208,9 +206,14 @@ class TestGKEStartPodTrigger:
 
         generator = trigger.run()
         actual = await generator.asend(None)
-
-        actual_stack_trace = actual.payload.pop("description")
-        assert actual_stack_trace.startswith("Trigger GKEStartPodTrigger 
failed with exception Exception")
+        actual_stack_trace = actual.payload.pop("stack_trace")
+        assert (
+            TriggerEvent(
+                {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", 
"message": "Test exception"}
+            )
+            == actual
+        )
+        assert actual_stack_trace.startswith("Traceback (most recent call 
last):")
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")


Reply via email to