This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a7b7928a2a fix: reduce irrelevant error logs for pod events. (#37944)
a7b7928a2a is described below
commit a7b7928a2a9ff22f3629f8ee421f627ab3a3c3f3
Author: Sudipto Baral <[email protected]>
AuthorDate: Thu Mar 7 08:06:46 2024 -0500
fix: reduce irrelevant error logs for pod events. (#37944)
Signed-off-by: sudipto baral <[email protected]>
---
airflow/providers/cncf/kubernetes/operators/pod.py | 16 ++++++--
.../cncf/kubernetes/operators/test_pod.py | 44 +++++++++++++++++++++-
2 files changed, 55 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 4955db7633..e926a1f49d 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -27,6 +27,7 @@ import string
import warnings
from collections.abc import Container
from contextlib import AbstractContextManager
+from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
@@ -95,6 +96,13 @@ alphanum_lower = string.ascii_lowercase + string.digits
KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
+class PodEventType(Enum):
+ """Type of Events emitted by kubernetes pod."""
+
+ WARNING = "Warning"
+ NORMAL = "Normal"
+
+
class PodReattachFailure(AirflowException):
"""When we expect to be able to find a pod but cannot."""
@@ -548,8 +556,7 @@ class KubernetesPodOperator(BaseOperator):
)
except PodLaunchFailedException:
if self.log_events_on_failure:
- for event in self.pod_manager.read_pod_events(pod).items:
- self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ self._read_pod_events(pod, reraise=False)
raise
def extract_xcom(self, pod: k8s.V1Pod):
@@ -855,7 +862,10 @@ class KubernetesPodOperator(BaseOperator):
"""Will fetch and emit events from pod."""
with _optionally_suppress(reraise=reraise):
for event in self.pod_manager.read_pod_events(pod).items:
- self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ if event.type == PodEventType.NORMAL.value:
+ self.log.info("Pod Event: %s - %s", event.reason,
event.message)
+ else:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
def is_istio_enabled(self, pod: V1Pod) -> bool:
"""Check if istio is enabled for the namespace of the pod by
inspecting the namespace labels."""
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 5d914c8d60..3b1d61c536 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -24,14 +24,18 @@ from unittest.mock import MagicMock, patch
import pendulum
import pytest
-from kubernetes.client import ApiClient, V1PodSecurityContext, V1PodStatus,
models as k8s
+from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext,
V1PodStatus, models as k8s
from urllib3 import HTTPResponse
from airflow.exceptions import AirflowException, AirflowSkipException,
TaskDeferred
from airflow.models import DAG, DagModel, DagRun, TaskInstance
from airflow.models.xcom import XCom
from airflow.providers.cncf.kubernetes import pod_generator
-from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator, _optionally_suppress
+from airflow.providers.cncf.kubernetes.operators.pod import (
+ KubernetesPodOperator,
+ PodEventType,
+ _optionally_suppress,
+)
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
@@ -2154,3 +2158,39 @@ def
test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager, mocked
# assert that the cleanup is called
post_complete_action.assert_called_once()
+
+
+@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.pod_manager")
+@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.log")
+def test_read_pod_events(mock_log, mock_pod_manager):
+ # Create a mock pod
+ pod = V1Pod()
+
+ # Create mock events
+ mock_event_normal = MagicMock()
+ mock_event_normal.type = PodEventType.NORMAL.value
+ mock_event_normal.reason = "test-reason-normal"
+ mock_event_normal.message = "test-message-normal"
+
+ mock_event_error = MagicMock()
+ mock_event_error.type = PodEventType.WARNING.value
+ mock_event_error.reason = "test-reason-error"
+ mock_event_error.message = "test-message-error"
+
+ mock_pod_manager.read_pod_events.return_value.items = [mock_event_normal,
mock_event_error]
+
+ operator = KubernetesPodOperator(task_id="test-task")
+ operator._read_pod_events(pod)
+
+ # Assert that event with type `Normal` is logged as info.
+ mock_log.info.assert_called_once_with(
+ "Pod Event: %s - %s",
+ mock_event_normal.reason,
+ mock_event_normal.message,
+ )
+ # Assert that event with type `Warning` is logged as error.
+ mock_log.error.assert_called_once_with(
+ "Pod Event: %s - %s",
+ mock_event_error.reason,
+ mock_event_error.message,
+ )