This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a7b7928a2a fix: reduce irrelevant error logs for pod events. (#37944)
a7b7928a2a is described below

commit a7b7928a2a9ff22f3629f8ee421f627ab3a3c3f3
Author: Sudipto Baral <[email protected]>
AuthorDate: Thu Mar 7 08:06:46 2024 -0500

    fix: reduce irrelevant error logs for pod events. (#37944)
    
    Signed-off-by: sudipto baral <[email protected]>
---
 airflow/providers/cncf/kubernetes/operators/pod.py | 16 ++++++--
 .../cncf/kubernetes/operators/test_pod.py          | 44 +++++++++++++++++++++-
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 4955db7633..e926a1f49d 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -27,6 +27,7 @@ import string
 import warnings
 from collections.abc import Container
 from contextlib import AbstractContextManager
+from enum import Enum
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
 
@@ -95,6 +96,13 @@ alphanum_lower = string.ascii_lowercase + string.digits
 KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
 
 
+class PodEventType(Enum):
+    """Type of Events emitted by kubernetes pod."""
+
+    WARNING = "Warning"
+    NORMAL = "Normal"
+
+
 class PodReattachFailure(AirflowException):
     """When we expect to be able to find a pod but cannot."""
 
@@ -548,8 +556,7 @@ class KubernetesPodOperator(BaseOperator):
             )
         except PodLaunchFailedException:
             if self.log_events_on_failure:
-                for event in self.pod_manager.read_pod_events(pod).items:
-                    self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
+                self._read_pod_events(pod, reraise=False)
             raise
 
     def extract_xcom(self, pod: k8s.V1Pod):
@@ -855,7 +862,10 @@ class KubernetesPodOperator(BaseOperator):
         """Will fetch and emit events from pod."""
         with _optionally_suppress(reraise=reraise):
             for event in self.pod_manager.read_pod_events(pod).items:
-                self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
+                if event.type == PodEventType.NORMAL.value:
+                    self.log.info("Pod Event: %s - %s", event.reason, 
event.message)
+                else:
+                    self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
 
     def is_istio_enabled(self, pod: V1Pod) -> bool:
         """Check if istio is enabled for the namespace of the pod by 
inspecting the namespace labels."""
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 5d914c8d60..3b1d61c536 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -24,14 +24,18 @@ from unittest.mock import MagicMock, patch
 
 import pendulum
 import pytest
-from kubernetes.client import ApiClient, V1PodSecurityContext, V1PodStatus, 
models as k8s
+from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, 
V1PodStatus, models as k8s
 from urllib3 import HTTPResponse
 
 from airflow.exceptions import AirflowException, AirflowSkipException, 
TaskDeferred
 from airflow.models import DAG, DagModel, DagRun, TaskInstance
 from airflow.models.xcom import XCom
 from airflow.providers.cncf.kubernetes import pod_generator
-from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator, _optionally_suppress
+from airflow.providers.cncf.kubernetes.operators.pod import (
+    KubernetesPodOperator,
+    PodEventType,
+    _optionally_suppress,
+)
 from airflow.providers.cncf.kubernetes.secret import Secret
 from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
@@ -2154,3 +2158,39 @@ def 
test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager, mocked
 
     # assert that the cleanup is called
     post_complete_action.assert_called_once()
+
+
+@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.pod_manager")
+@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.log")
+def test_read_pod_events(mock_log, mock_pod_manager):
+    # Create a mock pod
+    pod = V1Pod()
+
+    # Create mock events
+    mock_event_normal = MagicMock()
+    mock_event_normal.type = PodEventType.NORMAL.value
+    mock_event_normal.reason = "test-reason-normal"
+    mock_event_normal.message = "test-message-normal"
+
+    mock_event_error = MagicMock()
+    mock_event_error.type = PodEventType.WARNING.value
+    mock_event_error.reason = "test-reason-error"
+    mock_event_error.message = "test-message-error"
+
+    mock_pod_manager.read_pod_events.return_value.items = [mock_event_normal, 
mock_event_error]
+
+    operator = KubernetesPodOperator(task_id="test-task")
+    operator._read_pod_events(pod)
+
+    # Assert that event with type `Normal` is logged as info.
+    mock_log.info.assert_called_once_with(
+        "Pod Event: %s - %s",
+        mock_event_normal.reason,
+        mock_event_normal.message,
+    )
+    # Assert that event with type `Warning` is logged as error.
+    mock_log.error.assert_called_once_with(
+        "Pod Event: %s - %s",
+        mock_event_error.reason,
+        mock_event_error.message,
+    )

Reply via email to