This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c84efe77a5 KPO Maintain backward compatibility for execute_complete
and trigger run method (#37454)
c84efe77a5 is described below
commit c84efe77a5881d6bd554341b9bfc4712601051f2
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Feb 16 00:18:44 2024 +0530
KPO Maintain backward compatibility for execute_complete and trigger run
method (#37454)
In #37279 I introduce periodic logging of the container.
During the process, I also changed a few event Dict key names
and that is problematic for someone extending the KPO trigger.
Also, the current execute_compelete method was unused in the KPO operator
and was problematic if someone using it in an extended class since
now the trigger can also emit an event even if the pod is in the pod
intermediate state.
one reported issue: #37279 (comment)
In this PR I'm restoring the trigger event dict structure.
Also, deprecating the execute_complete method
---
airflow/providers/cncf/kubernetes/operators/pod.py | 150 +++++++++------------
airflow/providers/cncf/kubernetes/triggers/pod.py | 70 +++++++---
.../cncf/kubernetes/operators/test_pod.py | 34 +++--
.../providers/cncf/kubernetes/triggers/test_pod.py | 102 ++++++++------
.../cloud/triggers/test_kubernetes_engine.py | 51 +++----
5 files changed, 215 insertions(+), 192 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 73389f4038..61442a6014 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -18,6 +18,7 @@
from __future__ import annotations
+import datetime
import json
import logging
import re
@@ -30,6 +31,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
import kubernetes
+from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.stream import stream
from urllib3.exceptions import HTTPError
@@ -68,7 +70,6 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import (
EMPTY_XCOM_RESULT,
OnFinishAction,
PodLaunchFailedException,
- PodLaunchTimeoutException,
PodManager,
PodNotFoundException,
PodOperatorHookProtocol,
@@ -79,7 +80,6 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import (
from airflow.settings import pod_mutation_hook
from airflow.utils import yaml
from airflow.utils.helpers import prune_dict, validate_key
-from airflow.utils.timezone import utcnow
from airflow.version import version as airflow_version
if TYPE_CHECKING:
@@ -656,7 +656,7 @@ class KubernetesPodOperator(BaseOperator):
def invoke_defer_method(self, last_log_time: DateTime | None = None):
"""Redefine triggers which are being used in child classes."""
- trigger_start_time = utcnow()
+ trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
self.defer(
trigger=KubernetesPodTrigger(
pod_name=self.pod.metadata.name, # type: ignore[union-attr]
@@ -678,117 +678,87 @@ class KubernetesPodOperator(BaseOperator):
method_name="trigger_reentry",
)
- @staticmethod
- def raise_for_trigger_status(event: dict[str, Any]) -> None:
- """Raise exception if pod is not in expected state."""
- if event["status"] == "error":
- error_type = event["error_type"]
- description = event["description"]
- if error_type == "PodLaunchTimeoutException":
- raise PodLaunchTimeoutException(description)
- else:
- raise AirflowException(description)
-
def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
"""
Point of re-entry from trigger.
- If ``logging_interval`` is None, then at this point the pod should be
done and we'll just fetch
+ If ``logging_interval`` is None, then at this point, the pod should be
done, and we'll just fetch
the logs and exit.
- If ``logging_interval`` is not None, it could be that the pod is still
running and we'll just
+ If ``logging_interval`` is not None, it could be that the pod is still
running, and we'll just
grab the latest logs and defer back to the trigger again.
"""
- remote_pod = None
+ self.pod = None
try:
- self.pod_request_obj = self.build_pod_request_obj(context)
- self.pod = self.find_pod(
- namespace=self.namespace or
self.pod_request_obj.metadata.namespace,
- context=context,
- )
+ pod_name = event["name"]
+ pod_namespace = event["namespace"]
- # we try to find pod before possibly raising so that on_kill will
have `pod` attr
- self.raise_for_trigger_status(event)
+ self.pod = self.hook.get_pod(pod_name, pod_namespace)
if not self.pod:
raise PodNotFoundException("Could not find pod after resuming
from deferral")
- if self.get_logs:
- last_log_time = event and event.get("last_log_time")
- if last_log_time:
- self.log.info("Resuming logs read from time %r",
last_log_time)
- pod_log_status = self.pod_manager.fetch_container_logs(
- pod=self.pod,
- container_name=self.BASE_CONTAINER_NAME,
- follow=self.logging_interval is None,
- since_time=last_log_time,
- )
- if pod_log_status.running:
- self.log.info("Container still running; deferring again.")
- self.invoke_defer_method(pod_log_status.last_log_time)
-
- if self.do_xcom_push:
- result = self.extract_xcom(pod=self.pod)
- remote_pod = self.pod_manager.await_pod_completion(self.pod)
- except TaskDeferred:
- raise
- except Exception:
- self.cleanup(
- pod=self.pod or self.pod_request_obj,
- remote_pod=remote_pod,
- )
- raise
- self.cleanup(
- pod=self.pod or self.pod_request_obj,
- remote_pod=remote_pod,
- )
- if self.do_xcom_push:
- return result
-
- def execute_complete(self, context: Context, event: dict, **kwargs):
- self.log.debug("Triggered with event: %s", event)
- pod = None
- try:
- pod = self.hook.get_pod(
- event["name"],
- event["namespace"],
- )
- if self.callbacks:
+ if self.callbacks and event["status"] != "running":
self.callbacks.on_operator_resuming(
- pod=pod, event=event, client=self.client,
mode=ExecutionMode.SYNC
+ pod=self.pod, event=event, client=self.client,
mode=ExecutionMode.SYNC
)
+
if event["status"] in ("error", "failed", "timeout"):
- # fetch some logs when pod is failed
- if self.get_logs:
- self.write_logs(pod)
- if "stack_trace" in event:
- message = f"{event['message']}\n{event['stack_trace']}"
- else:
- message = event["message"]
if self.do_xcom_push:
- # In the event of base container failure, we need to kill
the xcom sidecar.
- # We disregard xcom output and do that here
- _ = self.extract_xcom(pod=pod)
+ _ = self.extract_xcom(pod=self.pod)
+
+ message = event.get("stack_trace", event["message"])
raise AirflowException(message)
- elif event["status"] == "success":
- # fetch some logs when pod is executed successfully
+
+ elif event["status"] == "running":
if self.get_logs:
- self.write_logs(pod)
+ last_log_time = event.get("last_log_time")
+ self.log.info("Resuming logs read from time %r",
last_log_time)
+
+ pod_log_status = self.pod_manager.fetch_container_logs(
+ pod=self.pod,
+ container_name=self.BASE_CONTAINER_NAME,
+ follow=self.logging_interval is None,
+ since_time=last_log_time,
+ )
+ if pod_log_status.running:
+ self.log.info("Container still running; deferring
again.")
+ self.invoke_defer_method(pod_log_status.last_log_time)
+ else:
+ self.invoke_defer_method()
+
+ elif event["status"] == "success":
if self.do_xcom_push:
- xcom_sidecar_output = self.extract_xcom(pod=pod)
+ xcom_sidecar_output = self.extract_xcom(pod=self.pod)
return xcom_sidecar_output
+ return
+ except TaskDeferred:
+ raise
finally:
- istio_enabled = self.is_istio_enabled(pod)
- # Skip await_pod_completion when the event is 'timeout' due to the
pod can hang
- # on the ErrImagePull or ContainerCreating step and it will never
complete
- if event["status"] != "timeout":
- pod = self.pod_manager.await_pod_completion(pod,
istio_enabled, self.base_container_name)
- if pod is not None:
- self.post_complete_action(
- pod=pod,
- remote_pod=pod,
- )
+ self._clean(event)
+
+ def _clean(self, event: dict[str, Any]):
+ if event["status"] == "running":
+ return
+ if self.get_logs:
+ self.write_logs(self.pod)
+ istio_enabled = self.is_istio_enabled(self.pod)
+ # Skip await_pod_completion when the event is 'timeout' due to the pod
can hang
+ # on the ErrImagePull or ContainerCreating step and it will never
complete
+ if event["status"] != "timeout":
+ self.pod = self.pod_manager.await_pod_completion(
+ self.pod, istio_enabled, self.base_container_name
+ )
+ if self.pod is not None:
+ self.post_complete_action(
+ pod=self.pod,
+ remote_pod=self.pod,
+ )
+
+ @deprecated(reason="use `trigger_reentry` instead.",
category=AirflowProviderDeprecationWarning)
+ def execute_complete(self, context: Context, event: dict, **kwargs):
+ self.trigger_reentry(context=context, event=event)
def write_logs(self, pod: k8s.V1Pod):
try:
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index e34a73f146..c9b1e62226 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -30,10 +30,8 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import (
OnFinishAction,
PodLaunchTimeoutException,
PodPhase,
- container_is_running,
)
from airflow.triggers.base import BaseTrigger, TriggerEvent
-from airflow.utils import timezone
if TYPE_CHECKING:
from kubernetes_asyncio.client.models import V1Pod
@@ -160,22 +158,49 @@ class KubernetesPodTrigger(BaseTrigger):
self.log.info("Checking pod %r in namespace %r.", self.pod_name,
self.pod_namespace)
try:
state = await self._wait_for_pod_start()
- if state in PodPhase.terminal_states:
+ if state == ContainerState.TERMINATED:
event = TriggerEvent(
- {"status": "done", "namespace": self.pod_namespace,
"pod_name": self.pod_name}
+ {
+ "status": "success",
+ "namespace": self.pod_namespace,
+ "name": self.pod_name,
+ "message": "All containers inside pod have started
successfully.",
+ }
+ )
+ elif state == ContainerState.FAILED:
+ event = TriggerEvent(
+ {
+ "status": "failed",
+ "namespace": self.pod_namespace,
+ "name": self.pod_name,
+ "message": "pod failed",
+ }
)
else:
event = await self._wait_for_container_completion()
yield event
+ return
+ except PodLaunchTimeoutException as e:
+ message = self._format_exception_description(e)
+ yield TriggerEvent(
+ {
+ "name": self.pod_name,
+ "namespace": self.pod_namespace,
+ "status": "timeout",
+ "message": message,
+ }
+ )
except Exception as e:
- description = self._format_exception_description(e)
yield TriggerEvent(
{
+ "name": self.pod_name,
+ "namespace": self.pod_namespace,
"status": "error",
- "error_type": e.__class__.__name__,
- "description": description,
+ "message": str(e),
+ "stack_trace": traceback.format_exc(),
}
)
+ return
def _format_exception_description(self, exc: Exception) -> Any:
if isinstance(exc, PodLaunchTimeoutException):
@@ -189,14 +214,13 @@ class KubernetesPodTrigger(BaseTrigger):
description += f"\ntrigger traceback:\n{curr_traceback}"
return description
- async def _wait_for_pod_start(self) -> Any:
+ async def _wait_for_pod_start(self) -> ContainerState:
"""Loops until pod phase leaves ``PENDING`` If timeout is reached,
throws error."""
- start_time = timezone.utcnow()
- timeout_end = start_time +
datetime.timedelta(seconds=self.startup_timeout)
- while timeout_end > timezone.utcnow():
+ delta = datetime.datetime.now(tz=datetime.timezone.utc) -
self.trigger_start_time
+ while self.startup_timeout >= delta.total_seconds():
pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
if not pod.status.phase == "Pending":
- return pod.status.phase
+ return self.define_container_state(pod)
self.log.info("Still waiting for pod to start. The pod state is
%s", pod.status.phase)
await asyncio.sleep(self.poll_interval)
raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase
within specified timeout")
@@ -208,18 +232,30 @@ class KubernetesPodTrigger(BaseTrigger):
Waits until container is no longer in running state. If trigger is
configured with a logging period,
then will emit an event to resume the task for the purpose of fetching
more logs.
"""
- time_begin = timezone.utcnow()
+ time_begin = datetime.datetime.now(tz=datetime.timezone.utc)
time_get_more_logs = None
if self.logging_interval is not None:
time_get_more_logs = time_begin +
datetime.timedelta(seconds=self.logging_interval)
while True:
pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
- if not container_is_running(pod=pod,
container_name=self.base_container_name):
+ container_state = self.define_container_state(pod)
+ if container_state == ContainerState.TERMINATED:
+ return TriggerEvent(
+ {"status": "success", "namespace": self.pod_namespace,
"name": self.pod_name}
+ )
+ elif container_state == ContainerState.FAILED:
+ return TriggerEvent(
+ {"status": "failed", "namespace": self.pod_namespace,
"name": self.pod_name}
+ )
+ if time_get_more_logs and
datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs:
return TriggerEvent(
- {"status": "done", "namespace": self.pod_namespace,
"pod_name": self.pod_name}
+ {
+ "status": "running",
+ "last_log_time": self.last_log_time,
+ "namespace": self.pod_namespace,
+ "name": self.pod_name,
+ }
)
- if time_get_more_logs and timezone.utcnow() > time_get_more_logs:
- return TriggerEvent({"status": "running", "last_log_time":
self.last_log_time})
await asyncio.sleep(self.poll_interval)
def _get_async_hook(self) -> AsyncKubernetesHook:
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index c27cd23146..faa21eb7d7 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -35,7 +35,6 @@ from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperato
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
- PodLaunchTimeoutException,
PodLoggingStatus,
PodPhase,
)
@@ -1973,41 +1972,39 @@ class TestKubernetesPodOperatorAsync:
with pytest.raises(AirflowException, match=expect_match):
k.cleanup(pod, pod)
- @mock.patch(
-
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status"
- )
-
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
+ @mock.patch(f"{HOOK_CLASS}.get_pod")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
def test_get_logs_running(
self,
fetch_container_logs,
await_pod_completion,
- find_pod,
- raise_for_trigger_status,
+ get_pod,
):
"""When logs fetch exits with status running, raise task deferred"""
pod = MagicMock()
- find_pod.return_value = pod
+ get_pod.return_value = pod
op = KubernetesPodOperator(task_id="test_task", name="test-pod",
get_logs=True)
await_pod_completion.return_value = None
fetch_container_logs.return_value = PodLoggingStatus(True, None)
with pytest.raises(TaskDeferred):
- op.trigger_reentry(create_context(op), None)
+ op.trigger_reentry(
+ create_context(op),
+ event={"name": TEST_NAME, "namespace": TEST_NAMESPACE,
"status": "running"},
+ )
fetch_container_logs.is_called_with(pod, "base")
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
- @mock.patch(
-
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status"
- )
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
- def test_get_logs_not_running(self, fetch_container_logs, find_pod,
raise_for_trigger_status, cleanup):
+ def test_get_logs_not_running(self, fetch_container_logs, find_pod,
cleanup):
pod = MagicMock()
find_pod.return_value = pod
op = KubernetesPodOperator(task_id="test_task", name="test-pod",
get_logs=True)
fetch_container_logs.return_value = PodLoggingStatus(False, None)
- op.trigger_reentry(create_context(op), None)
+ op.trigger_reentry(
+ create_context(op), event={"name": TEST_NAME, "namespace":
TEST_NAMESPACE, "status": "success"}
+ )
fetch_container_logs.is_called_with(pod, "base")
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
@@ -2016,14 +2013,15 @@ class TestKubernetesPodOperatorAsync:
"""Assert that trigger_reentry raise exception in case of error"""
find_pod.return_value = MagicMock()
op = KubernetesPodOperator(task_id="test_task", name="test-pod",
get_logs=True)
- with pytest.raises(PodLaunchTimeoutException):
+ with pytest.raises(AirflowException):
context = create_context(op)
op.trigger_reentry(
context,
{
- "status": "error",
- "error_type": "PodLaunchTimeoutException",
- "description": "any message",
+ "status": "timeout",
+ "message": "any message",
+ "name": TEST_NAME,
+ "namespace": TEST_NAMESPACE,
},
)
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index d12100e4e3..228daa967d 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -114,17 +114,16 @@ class TestKubernetesPodTrigger:
}
@pytest.mark.asyncio
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
- @mock.patch(f"{TRIGGER_PATH}.hook")
- async def test_run_loop_return_success_event(self, mock_hook, mock_method,
trigger):
- mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
- mock_method.return_value = ContainerState.TERMINATED
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ async def test_run_loop_return_success_event(self, mock_wait_pod, trigger):
+ mock_wait_pod.return_value = ContainerState.TERMINATED
expected_event = TriggerEvent(
{
- "pod_name": POD_NAME,
- "namespace": NAMESPACE,
- "status": "done",
+ "status": "success",
+ "namespace": "default",
+ "name": "test-pod-name",
+ "message": "All containers inside pod have started
successfully.",
}
)
actual_event = await trigger.run().asend(None)
@@ -132,16 +131,14 @@ class TestKubernetesPodTrigger:
assert actual_event == expected_event
@pytest.mark.asyncio
-
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_waiting_event(
- self, mock_hook, mock_method, mock_get_pod, mock_container_is_running,
trigger, caplog
+ self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
):
mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.WAITING
- mock_container_is_running.return_value = True
caplog.set_level(logging.INFO)
@@ -153,16 +150,14 @@ class TestKubernetesPodTrigger:
assert f"Sleeping for {POLL_INTERVAL} seconds."
@pytest.mark.asyncio
-
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_running_event(
- self, mock_hook, mock_method, mock_get_pod, mock_container_is_running,
trigger, caplog
+ self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
):
mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.RUNNING
- mock_container_is_running.return_value = True
caplog.set_level(logging.INFO)
@@ -174,9 +169,10 @@ class TestKubernetesPodTrigger:
assert f"Sleeping for {POLL_INTERVAL} seconds."
@pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
- async def test_run_loop_return_failed_event(self, mock_hook, mock_method,
trigger):
+ async def test_run_loop_return_failed_event(self, mock_hook, mock_method,
mock_wait_pod, trigger):
mock_hook.get_pod.return_value = self._mock_pod_result(
mock.MagicMock(
status=mock.MagicMock(
@@ -186,21 +182,16 @@ class TestKubernetesPodTrigger:
)
mock_method.return_value = ContainerState.FAILED
- expected_event = TriggerEvent(
- {
- "pod_name": POD_NAME,
- "namespace": NAMESPACE,
- "status": "done",
- }
- )
+ expected_event = TriggerEvent({"status": "failed", "namespace":
"default", "name": "test-pod-name"})
actual_event = await trigger.run().asend(None)
assert actual_event == expected_event
@pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def
test_logging_in_trigger_when_exception_should_execute_successfully(
- self, mock_hook, trigger, caplog
+ self, mock_hook, mock_wait_pod, trigger, caplog
):
"""
Test that KubernetesPodTrigger fires the correct event in case of an
error.
@@ -210,8 +201,14 @@ class TestKubernetesPodTrigger:
generator = trigger.run()
actual = await generator.asend(None)
- actual_stack_trace = actual.payload.pop("description")
- assert actual_stack_trace.startswith("Trigger KubernetesPodTrigger
failed with exception Exception")
+ actual_stack_trace = actual.payload.pop("stack_trace")
+ assert (
+ TriggerEvent(
+ {"name": POD_NAME, "namespace": NAMESPACE, "status": "error",
"message": "Test exception"}
+ )
+ == actual
+ )
+ assert actual_stack_trace.startswith("Traceback (most recent call
last):")
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
@@ -235,16 +232,24 @@ class TestKubernetesPodTrigger:
@pytest.mark.parametrize(
"logging_interval, exp_event",
[
- param(0, {"status": "running", "last_log_time": DateTime(2022, 1,
1)}, id="short_interval"),
- param(None, {"status": "done", "namespace": mock.ANY, "pod_name":
mock.ANY}, id="no_interval"),
+ param(
+ 0,
+ {
+ "status": "running",
+ "last_log_time": DateTime(2022, 1, 1),
+ "name": POD_NAME,
+ "namespace": NAMESPACE,
+ },
+ id="short_interval",
+ ),
],
)
- @mock.patch(
- "kubernetes_asyncio.client.CoreV1Api.read_namespaced_pod",
- new=get_read_pod_mock_containers([1, 1, None, None]),
- )
- @mock.patch("kubernetes_asyncio.config.load_kube_config")
- async def test_running_log_interval(self, load_kube_config,
logging_interval, exp_event):
+ @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.AsyncKubernetesHook.get_pod")
+ async def test_running_log_interval(
+ self, mock_get_pod, mock_wait_pod, define_container_state,
logging_interval, exp_event
+ ):
"""
If log interval given, should emit event with running status and last
log time.
Otherwise, should make it to second loop and emit "done" event.
@@ -254,14 +259,15 @@ class TestKubernetesPodTrigger:
interval is None, the second "running" status will just result in
continuation of the loop. And
when in the next loop we get a non-running status, the trigger fires a
"done" event.
"""
+ define_container_state.return_value = "running"
trigger = KubernetesPodTrigger(
- pod_name=mock.ANY,
- pod_namespace=mock.ANY,
- trigger_start_time=mock.ANY,
- base_container_name=mock.ANY,
+ pod_name=POD_NAME,
+ pod_namespace=NAMESPACE,
+ trigger_start_time=datetime.datetime.now(tz=datetime.timezone.utc),
+ base_container_name=BASE_CONTAINER_NAME,
startup_timeout=5,
poll_interval=1,
- logging_interval=logging_interval,
+ logging_interval=1,
last_log_time=DateTime(2022, 1, 1),
)
assert await trigger.run().__anext__() == TriggerEvent(exp_event)
@@ -306,12 +312,12 @@ class TestKubernetesPodTrigger:
@pytest.mark.asyncio
@pytest.mark.parametrize("container_state", [ContainerState.WAITING,
ContainerState.UNDEFINED])
- @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_timeout_event(
self, mock_hook, mock_method, trigger, caplog, container_state
):
- trigger.trigger_start_time = TRIGGER_START_TIME -
datetime.timedelta(seconds=5)
+ trigger.trigger_start_time = TRIGGER_START_TIME -
datetime.timedelta(minutes=2)
mock_hook.get_pod.return_value = self._mock_pod_result(
mock.MagicMock(
status=mock.MagicMock(
@@ -325,4 +331,14 @@ class TestKubernetesPodTrigger:
generator = trigger.run()
actual = await generator.asend(None)
- assert actual == TriggerEvent({"status": "done", "namespace":
NAMESPACE, "pod_name": POD_NAME})
+ assert (
+ TriggerEvent(
+ {
+ "name": POD_NAME,
+ "namespace": NAMESPACE,
+ "status": "timeout",
+ "message": "Pod did not leave 'Pending' phase within
specified timeout",
+ }
+ )
+ == actual
+ )
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index ca7b7ba358..c6a2d4e72f 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -108,19 +108,20 @@ class TestGKEStartPodTrigger:
}
@pytest.mark.asyncio
- @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
@mock.patch(f"{TRIGGER_GKE_PATH}.hook")
async def test_run_loop_return_success_event_should_execute_successfully(
- self, mock_hook, mock_method, trigger
+ self, mock_hook, mock_wait_pod, trigger
):
mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
- mock_method.return_value = ContainerState.TERMINATED
+ mock_wait_pod.return_value = ContainerState.TERMINATED
expected_event = TriggerEvent(
{
- "pod_name": POD_NAME,
+ "name": POD_NAME,
"namespace": NAMESPACE,
- "status": "done",
+ "status": "success",
+ "message": "All containers inside pod have started
successfully.",
}
)
actual_event = await trigger.run().asend(None)
@@ -128,10 +129,10 @@ class TestGKEStartPodTrigger:
assert actual_event == expected_event
@pytest.mark.asyncio
- @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
@mock.patch(f"{TRIGGER_GKE_PATH}.hook")
async def test_run_loop_return_failed_event_should_execute_successfully(
- self, mock_hook, mock_method, trigger
+ self, mock_hook, mock_wait_pod, trigger
):
mock_hook.get_pod.return_value = self._mock_pod_result(
mock.MagicMock(
@@ -140,13 +141,14 @@ class TestGKEStartPodTrigger:
)
)
)
- mock_method.return_value = ContainerState.FAILED
+ mock_wait_pod.return_value = ContainerState.FAILED
expected_event = TriggerEvent(
{
- "pod_name": POD_NAME,
+ "name": POD_NAME,
"namespace": NAMESPACE,
- "status": "done",
+ "status": "failed",
+ "message": "pod failed",
}
)
actual_event = await trigger.run().asend(None)
@@ -154,18 +156,15 @@ class TestGKEStartPodTrigger:
assert actual_event == expected_event
@pytest.mark.asyncio
-
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
@mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_GKE_PATH}.hook")
async def test_run_loop_return_waiting_event_should_execute_successfully(
- self, mock_hook, mock_method, mock_get_pod, mock_container_is_running,
trigger, caplog
+ self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
):
mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
- mock_method.return_value = ContainerState.RUNNING
- mock_container_is_running.return_value = True
+ mock_method.return_value = ContainerState.WAITING
- trigger.logging_interval = 10
caplog.set_level(logging.INFO)
task = asyncio.create_task(trigger.run().__anext__())
@@ -176,15 +175,13 @@ class TestGKEStartPodTrigger:
assert f"Sleeping for {POLL_INTERVAL} seconds."
@pytest.mark.asyncio
-
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running")
-
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
@mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_GKE_PATH}.hook")
async def test_run_loop_return_running_event_should_execute_successfully(
- self, mock_hook, mock_method, mock_get_pod, mock_container_is_running,
trigger, caplog
+ self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
):
mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
- mock_container_is_running.return_value = True
mock_method.return_value = ContainerState.RUNNING
caplog.set_level(logging.INFO)
@@ -197,9 +194,10 @@ class TestGKEStartPodTrigger:
assert f"Sleeping for {POLL_INTERVAL} seconds."
@pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start")
@mock.patch(f"{TRIGGER_GKE_PATH}.hook")
async def
test_logging_in_trigger_when_exception_should_execute_successfully(
- self, mock_hook, trigger, caplog
+ self, mock_hook, mock_wait_pod, trigger, caplog
):
"""
Test that GKEStartPodTrigger fires the correct event in case of an
error.
@@ -208,9 +206,14 @@ class TestGKEStartPodTrigger:
generator = trigger.run()
actual = await generator.asend(None)
-
- actual_stack_trace = actual.payload.pop("description")
- assert actual_stack_trace.startswith("Trigger GKEStartPodTrigger
failed with exception Exception")
+ actual_stack_trace = actual.payload.pop("stack_trace")
+ assert (
+ TriggerEvent(
+ {"name": POD_NAME, "namespace": NAMESPACE, "status": "error",
"message": "Test exception"}
+ )
+ == actual
+ )
+ assert actual_stack_trace.startswith("Traceback (most recent call
last):")
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")