This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d9f5d4723cf Add customizable log prefix and formatter to 
KubernetesPodOperator (#53598)
d9f5d4723cf is described below

commit d9f5d4723cfdb0733c4623fbd518c5d4bd3a7e31
Author: Gary Hsu <[email protected]>
AuthorDate: Sun Aug 24 11:03:46 2025 +0800

    Add customizable log prefix and formatter to KubernetesPodOperator (#53598)
    
    * Add custom log formatting and prefix control to KubernetesPodOperator
    
    - Introduce  parameter to toggle container name prefix in logs
    - Add  parameter for custom log message formatting
    - Update PodManager to support log_prefix and log_formatter in log fetching
    
    * Add tests for log prefix and custom formatter in KubernetesPodOperator
    
    - Add test_log_prefix_enabled to verify default log_prefix=True behavior
    - Add test_log_prefix_disabled to confirm log_prefix=False removes 
container prefix
    - Add test_custom_log_formatter to validate custom log_formatter 
functionality
    
    * Enhance KubernetesPodOperator logging with flexible prefix and formatter 
options
    
    Consolidated log prefix and formatter tests into a single parameterized 
test_log_output_configurations, covering:
    log_prefix=True: Includes container name prefix ([base]).
    log_prefix=False: Excludes container name prefix.
    Custom log formatter: Applies user-defined formatting (e.g., CUSTOM[base]: 
message)
    
    * Remove @pytest.mark.asyncio
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    
    * Modify test_log_output_configurations to improve log message capturing:
        - Replace caplog with targeted mocking of PodManager logger
    
    * Add custom log formatting support to KubernetesPodOperator tests
    
    * feat(kubernetes): Add log formatting options and refactor log handling in 
KubernetesPodOperator
    
    - Rename log_prefix to  container_name_log_prefix_enabled parameter to 
control container name prefix in logs
    
    - Extract duplicate log formatting logic into _log_message private method 
in PodManager
    
    - Update docstrings with detailed parameter descriptions
    
    ---------
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
---
 .../tests/kubernetes_tests/test_base.py            |  2 +-
 .../test_kubernetes_pod_operator.py                | 61 ++++++++++++++++++-
 .../providers/cncf/kubernetes/operators/pod.py     | 15 +++++
 .../providers/cncf/kubernetes/utils/pod_manager.py | 71 ++++++++++++++++++----
 .../unit/cncf/kubernetes/operators/test_pod.py     | 19 +++++-
 .../kubernetes/operators/test_spark_kubernetes.py  |  2 +
 6 files changed, 153 insertions(+), 17 deletions(-)

diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py 
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index 842b9e108c0..0ee8d49c727 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -51,7 +51,7 @@ print()
 
 class StringContainingId(str):
     def __eq__(self, other):
-        return self in other
+        return self in other.strip() or self in other
 
 
 class BaseK8STest:
diff --git 
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
index 2e8bda014d4..3f74aa805bb 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -552,10 +552,11 @@ class TestKubernetesPodOperatorSystem:
                 task_id=str(uuid4()),
                 in_cluster=False,
                 do_xcom_push=False,
+                container_name_log_prefix_enabled=False,
             )
             context = create_context(k)
             k.execute(context=context)
-            mock_logger.info.assert_any_call("[%s] %s", "base", 
StringContainingId("retrieved from mount"))
+            mock_logger.info.assert_any_call("%s", 
StringContainingId("retrieved from mount"))
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod["spec"]["containers"][0]["args"] = args
             self.expected_pod["spec"]["containers"][0]["volumeMounts"] = [
@@ -1428,6 +1429,64 @@ class TestKubernetesPodOperatorSystem:
             < calls_args.find(marker_from_main_container)
         )
 
+    @pytest.mark.parametrize(
+        "log_prefix_enabled, log_formatter, expected_log_message_check",
+        [
+            pytest.param(
+                True,
+                None,
+                lambda marker, record_message: f"[base] {marker}" in 
record_message,
+                id="log_prefix_enabled",
+            ),
+            pytest.param(
+                False,
+                None,
+                lambda marker, record_message: marker in record_message and 
"[base]" not in record_message,
+                id="log_prefix_disabled",
+            ),
+            pytest.param(
+                False,  # Ignored when log_formatter is provided
+                lambda container_name, message: f"CUSTOM[{container_name}]: 
{message}",
+                lambda marker, record_message: f"CUSTOM[base]: {marker}" in 
record_message,
+                id="custom_log_formatter",
+            ),
+        ],
+    )
+    def test_log_output_configurations(self, log_prefix_enabled, 
log_formatter, expected_log_message_check):
+        """
+        Tests various log output configurations 
(container_name_log_prefix_enabled, log_formatter)
+        for KubernetesPodOperator.
+        """
+        marker = f"test_log_{uuid4()}"
+        k = KubernetesPodOperator(
+            namespace="default",
+            image="busybox",
+            cmds=["sh", "-cx"],
+            arguments=[f"echo {marker}"],
+            labels={"test_label": "test"},
+            task_id=str(uuid4()),
+            in_cluster=False,
+            do_xcom_push=False,
+            get_logs=True,
+            container_name_log_prefix_enabled=log_prefix_enabled,
+            log_formatter=log_formatter,
+        )
+
+        # Test the _log_message method directly
+        logger = 
logging.getLogger("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager")
+        with mock.patch.object(logger, "info") as mock_info:
+            k.pod_manager._log_message(
+                message=marker,
+                container_name="base",
+                container_name_log_prefix_enabled=log_prefix_enabled,
+                log_formatter=log_formatter,
+            )
+
+            # Check that the message was logged with the expected format
+            mock_info.assert_called_once()
+            logged_message = mock_info.call_args[0][1]  # Second argument is 
the message
+            assert expected_log_message_check(marker, logged_message)
+
 
 # TODO: Task SDK: https://github.com/apache/airflow/issues/45438
 @pytest.mark.skip(reason="AIP-72: Secret Masking yet to be implemented")
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 11ca739510f..76d3b4eb3ff 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -235,6 +235,11 @@ class KubernetesPodOperator(BaseOperator):
         resuming to fetch the latest logs. If ``None``, then the task will 
remain in deferred state until pod
         is done, and no logs will be visible until that time.
     :param trigger_kwargs: additional keyword parameters passed to the trigger
+    :param container_name_log_prefix_enabled: if True, will prefix container 
name to each log line.
+        Default to True.
+    :param log_formatter: custom log formatter function that takes two string 
arguments:
+        the first string is the container_name and the second string is the 
message_to_log.
+        The function should return a formatted string. If None, the default 
formatting will be used.
     """
 
     # !!! Changes in KubernetesPodOperator's arguments should be also 
reflected in !!!
@@ -343,6 +348,8 @@ class KubernetesPodOperator(BaseOperator):
         progress_callback: Callable[[str], None] | None = None,
         logging_interval: int | None = None,
         trigger_kwargs: dict | None = None,
+        container_name_log_prefix_enabled: bool = True,
+        log_formatter: Callable[[str, str], str] | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -438,6 +445,8 @@ class KubernetesPodOperator(BaseOperator):
         self._progress_callback = progress_callback
         self.callbacks = [] if not callbacks else callbacks if 
isinstance(callbacks, list) else [callbacks]
         self._killed: bool = False
+        self.container_name_log_prefix_enabled = 
container_name_log_prefix_enabled
+        self.log_formatter = log_formatter
 
     @cached_property
     def _incluster_namespace(self):
@@ -751,6 +760,8 @@ class KubernetesPodOperator(BaseOperator):
                     pod=pod,
                     init_containers=self.init_container_logs,
                     follow_logs=True,
+                    
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
+                    log_formatter=self.log_formatter,
                 )
         except kubernetes.client.exceptions.ApiException as exc:
             self._handle_api_exception(exc, pod)
@@ -767,6 +778,8 @@ class KubernetesPodOperator(BaseOperator):
                     pod=pod,
                     containers=self.container_logs,
                     follow_logs=True,
+                    
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
+                    log_formatter=self.log_formatter,
                 )
             if not self.get_logs or (
                 self.container_logs is not True and self.base_container_name 
not in self.container_logs
@@ -915,6 +928,8 @@ class KubernetesPodOperator(BaseOperator):
                         container_name=self.base_container_name,
                         follow=follow,
                         since_time=last_log_time,
+                        
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
+                        log_formatter=self.log_formatter,
                     )
 
                     self.invoke_defer_method(pod_log_status.last_log_time)
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index b426a17ccc2..a7886b89917 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -23,7 +23,7 @@ import enum
 import json
 import math
 import time
-from collections.abc import Generator, Iterable
+from collections.abc import Callable, Generator, Iterable
 from contextlib import closing, suppress
 from dataclasses import dataclass
 from datetime import timedelta
@@ -456,6 +456,26 @@ class PodManager(LoggingMixin):
 
             await asyncio.sleep(check_interval)
 
+    def _log_message(
+        self,
+        message: str,
+        container_name: str,
+        container_name_log_prefix_enabled: bool,
+        log_formatter: Callable[[str, str], str] | None,
+    ) -> None:
+        """Log a message with appropriate formatting."""
+        if is_log_group_marker(message):
+            print(message)
+        else:
+            if log_formatter:
+                formatted_message = log_formatter(container_name, message)
+                self.log.info("%s", formatted_message)
+            else:
+                log_message = (
+                    f"[{container_name}] {message}" if 
container_name_log_prefix_enabled else message
+                )
+                self.log.info("%s", log_message)
+
     def fetch_container_logs(
         self,
         pod: V1Pod,
@@ -464,6 +484,8 @@ class PodManager(LoggingMixin):
         follow=False,
         since_time: DateTime | None = None,
         post_termination_timeout: int = 120,
+        container_name_log_prefix_enabled: bool = True,
+        log_formatter: Callable[[str, str], str] | None = None,
     ) -> PodLoggingStatus:
         """
         Follow the logs of container and stream to airflow logging.
@@ -529,10 +551,12 @@ class PodManager(LoggingMixin):
                                             line=line, client=self._client, 
mode=ExecutionMode.SYNC
                                         )
                                 if message_to_log is not None:
-                                    if is_log_group_marker(message_to_log):
-                                        print(message_to_log)
-                                    else:
-                                        self.log.info("[%s] %s", 
container_name, message_to_log)
+                                    self._log_message(
+                                        message_to_log,
+                                        container_name,
+                                        container_name_log_prefix_enabled,
+                                        log_formatter,
+                                    )
                                 last_captured_timestamp = message_timestamp
                                 message_to_log = message
                                 message_timestamp = line_timestamp
@@ -548,10 +572,9 @@ class PodManager(LoggingMixin):
                                 line=line, client=self._client, 
mode=ExecutionMode.SYNC
                             )
                     if message_to_log is not None:
-                        if is_log_group_marker(message_to_log):
-                            print(message_to_log)
-                        else:
-                            self.log.info("[%s] %s", container_name, 
message_to_log)
+                        self._log_message(
+                            message_to_log, container_name, 
container_name_log_prefix_enabled, log_formatter
+                        )
                     last_captured_timestamp = message_timestamp
             except TimeoutError as e:
                 # in case of timeout, increment return time by 2 seconds to 
avoid
@@ -637,7 +660,12 @@ class PodManager(LoggingMixin):
         return containers_to_log
 
     def fetch_requested_init_container_logs(
-        self, pod: V1Pod, init_containers: Iterable[str] | str | Literal[True] 
| None, follow_logs=False
+        self,
+        pod: V1Pod,
+        init_containers: Iterable[str] | str | Literal[True] | None,
+        follow_logs=False,
+        container_name_log_prefix_enabled: bool = True,
+        log_formatter: Callable[[str, str], str] | None = None,
     ) -> list[PodLoggingStatus]:
         """
         Follow the logs of containers in the specified pod and publish it to 
airflow logging.
@@ -657,12 +685,23 @@ class PodManager(LoggingMixin):
         containers_to_log = sorted(containers_to_log, key=lambda cn: 
all_containers.index(cn))
         for c in containers_to_log:
             self._await_init_container_start(pod=pod, container_name=c)
-            status = self.fetch_container_logs(pod=pod, container_name=c, 
follow=follow_logs)
+            status = self.fetch_container_logs(
+                pod=pod,
+                container_name=c,
+                follow=follow_logs,
+                
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
+                log_formatter=log_formatter,
+            )
             pod_logging_statuses.append(status)
         return pod_logging_statuses
 
     def fetch_requested_container_logs(
-        self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], 
follow_logs=False
+        self,
+        pod: V1Pod,
+        containers: Iterable[str] | str | Literal[True],
+        follow_logs=False,
+        container_name_log_prefix_enabled: bool = True,
+        log_formatter: Callable[[str, str], str] | None = None,
     ) -> list[PodLoggingStatus]:
         """
         Follow the logs of containers in the specified pod and publish it to 
airflow logging.
@@ -679,7 +718,13 @@ class PodManager(LoggingMixin):
             pod_name=pod.metadata.name,
         )
         for c in containers_to_log:
-            status = self.fetch_container_logs(pod=pod, container_name=c, 
follow=follow_logs)
+            status = self.fetch_container_logs(
+                pod=pod,
+                container_name=c,
+                follow=follow_logs,
+                
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
+                log_formatter=log_formatter,
+            )
             pod_logging_statuses.append(status)
         return pod_logging_statuses
 
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 62ff20ea771..5641d00e9d4 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -1733,7 +1733,13 @@ class TestKubernetesPodOperator:
         pod, _ = self.run_pod(k)
 
         # check that the base container is not included in the logs
-        mock_fetch_log.assert_called_once_with(pod=pod, 
containers=["some_init_container"], follow_logs=True)
+        mock_fetch_log.assert_called_once_with(
+            pod=pod,
+            containers=["some_init_container"],
+            follow_logs=True,
+            container_name_log_prefix_enabled=True,
+            log_formatter=None,
+        )
         # check that KPO waits for the base container to complete before 
proceeding to extract XCom
         mock_await_container_completion.assert_called_once_with(
             pod=pod, container_name="base", polling_time=1
@@ -1999,7 +2005,16 @@ class TestKubernetesPodOperator:
 
         if get_logs:
             fetch_requested_container_logs.assert_has_calls(
-                [mock.call(pod=pod, containers=k.container_logs, 
follow_logs=True)] * 3
+                [
+                    mock.call(
+                        pod=pod,
+                        containers=k.container_logs,
+                        follow_logs=True,
+                        container_name_log_prefix_enabled=True,
+                        log_formatter=None,
+                    )
+                ]
+                * 3
             )
         else:
             mock_await_container_completion.assert_has_calls(
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
index ed20be1f80f..bf5673d706b 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -729,6 +729,8 @@ class TestSparkKubernetesOperator:
             pod=op.pod,
             containers="spark-kubernetes-driver",
             follow_logs=True,
+            container_name_log_prefix_enabled=True,
+            log_formatter=None,
         )
 
     @pytest.mark.asyncio

Reply via email to