This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d9f5d4723cf Add customizable log prefix and formatter to
KubernetesPodOperator (#53598)
d9f5d4723cf is described below
commit d9f5d4723cfdb0733c4623fbd518c5d4bd3a7e31
Author: Gary Hsu <[email protected]>
AuthorDate: Sun Aug 24 11:03:46 2025 +0800
Add customizable log prefix and formatter to KubernetesPodOperator (#53598)
* Add custom log formatting and prefix control to KubernetesPodOperator
- Introduce parameter to toggle container name prefix in logs
- Add parameter for custom log message formatting
- Update PodManager to support log_prefix and log_formatter in log fetching
* Add tests for log prefix and custom formatter in KubernetesPodOperator
- Add test_log_prefix_enabled to verify default log_prefix=True behavior
- Add test_log_prefix_disabled to confirm log_prefix=False removes
container prefix
- Add test_custom_log_formatter to validate custom log_formatter
functionality
* Enhance KubernetesPodOperator logging with flexible prefix and formatter
options
Consolidated log prefix and formatter tests into a single parameterized
test_log_output_configurations, covering:
log_prefix=True: Includes container name prefix ([base]).
log_prefix=False: Excludes container name prefix.
Custom log formatter: Applies user-defined formatting (e.g., CUSTOM[base]:
message)
* Remove @pytest.mark.asyncio
Co-authored-by: LIU ZHE YOU <[email protected]>
* Modify test_log_output_configurations to improve log message capturing:
- Replace caplog with targeted mocking of PodManager logger
* Add custom log formatting support to KubernetesPodOperator tests
* feat(kubernetes): Add log formatting options and refactor log handling in
KubernetesPodOperator
- Rename log_prefix to container_name_log_prefix_enabled parameter to
control container name prefix in logs
- Extract duplicate log formatting logic into _log_message private method
in PodManager
- Update docstrings with detailed parameter descriptions
---------
Co-authored-by: LIU ZHE YOU <[email protected]>
---
.../tests/kubernetes_tests/test_base.py | 2 +-
.../test_kubernetes_pod_operator.py | 61 ++++++++++++++++++-
.../providers/cncf/kubernetes/operators/pod.py | 15 +++++
.../providers/cncf/kubernetes/utils/pod_manager.py | 71 ++++++++++++++++++----
.../unit/cncf/kubernetes/operators/test_pod.py | 19 +++++-
.../kubernetes/operators/test_spark_kubernetes.py | 2 +
6 files changed, 153 insertions(+), 17 deletions(-)
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index 842b9e108c0..0ee8d49c727 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -51,7 +51,7 @@ print()
class StringContainingId(str):
def __eq__(self, other):
- return self in other
+ return self in other.strip() or self in other
class BaseK8STest:
diff --git
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
index 2e8bda014d4..3f74aa805bb 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -552,10 +552,11 @@ class TestKubernetesPodOperatorSystem:
task_id=str(uuid4()),
in_cluster=False,
do_xcom_push=False,
+ container_name_log_prefix_enabled=False,
)
context = create_context(k)
k.execute(context=context)
- mock_logger.info.assert_any_call("[%s] %s", "base",
StringContainingId("retrieved from mount"))
+ mock_logger.info.assert_any_call("%s",
StringContainingId("retrieved from mount"))
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod["spec"]["containers"][0]["args"] = args
self.expected_pod["spec"]["containers"][0]["volumeMounts"] = [
@@ -1428,6 +1429,64 @@ class TestKubernetesPodOperatorSystem:
< calls_args.find(marker_from_main_container)
)
+ @pytest.mark.parametrize(
+ "log_prefix_enabled, log_formatter, expected_log_message_check",
+ [
+ pytest.param(
+ True,
+ None,
+ lambda marker, record_message: f"[base] {marker}" in
record_message,
+ id="log_prefix_enabled",
+ ),
+ pytest.param(
+ False,
+ None,
+ lambda marker, record_message: marker in record_message and
"[base]" not in record_message,
+ id="log_prefix_disabled",
+ ),
+ pytest.param(
+ False, # Ignored when log_formatter is provided
+ lambda container_name, message: f"CUSTOM[{container_name}]:
{message}",
+ lambda marker, record_message: f"CUSTOM[base]: {marker}" in
record_message,
+ id="custom_log_formatter",
+ ),
+ ],
+ )
+ def test_log_output_configurations(self, log_prefix_enabled,
log_formatter, expected_log_message_check):
+ """
+ Tests various log output configurations
(container_name_log_prefix_enabled, log_formatter)
+ for KubernetesPodOperator.
+ """
+ marker = f"test_log_{uuid4()}"
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="busybox",
+ cmds=["sh", "-cx"],
+ arguments=[f"echo {marker}"],
+ labels={"test_label": "test"},
+ task_id=str(uuid4()),
+ in_cluster=False,
+ do_xcom_push=False,
+ get_logs=True,
+ container_name_log_prefix_enabled=log_prefix_enabled,
+ log_formatter=log_formatter,
+ )
+
+ # Test the _log_message method directly
+ logger =
logging.getLogger("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager")
+ with mock.patch.object(logger, "info") as mock_info:
+ k.pod_manager._log_message(
+ message=marker,
+ container_name="base",
+ container_name_log_prefix_enabled=log_prefix_enabled,
+ log_formatter=log_formatter,
+ )
+
+ # Check that the message was logged with the expected format
+ mock_info.assert_called_once()
+ logged_message = mock_info.call_args[0][1] # Second argument is
the message
+ assert expected_log_message_check(marker, logged_message)
+
# TODO: Task SDK: https://github.com/apache/airflow/issues/45438
@pytest.mark.skip(reason="AIP-72: Secret Masking yet to be implemented")
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 11ca739510f..76d3b4eb3ff 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -235,6 +235,11 @@ class KubernetesPodOperator(BaseOperator):
resuming to fetch the latest logs. If ``None``, then the task will
remain in deferred state until pod
is done, and no logs will be visible until that time.
:param trigger_kwargs: additional keyword parameters passed to the trigger
+ :param container_name_log_prefix_enabled: if True, will prefix container
name to each log line.
+ Default to True.
+ :param log_formatter: custom log formatter function that takes two string
arguments:
+ the first string is the container_name and the second string is the
message_to_log.
+ The function should return a formatted string. If None, the default
formatting will be used.
"""
# !!! Changes in KubernetesPodOperator's arguments should be also
reflected in !!!
@@ -343,6 +348,8 @@ class KubernetesPodOperator(BaseOperator):
progress_callback: Callable[[str], None] | None = None,
logging_interval: int | None = None,
trigger_kwargs: dict | None = None,
+ container_name_log_prefix_enabled: bool = True,
+ log_formatter: Callable[[str, str], str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -438,6 +445,8 @@ class KubernetesPodOperator(BaseOperator):
self._progress_callback = progress_callback
self.callbacks = [] if not callbacks else callbacks if
isinstance(callbacks, list) else [callbacks]
self._killed: bool = False
+ self.container_name_log_prefix_enabled =
container_name_log_prefix_enabled
+ self.log_formatter = log_formatter
@cached_property
def _incluster_namespace(self):
@@ -751,6 +760,8 @@ class KubernetesPodOperator(BaseOperator):
pod=pod,
init_containers=self.init_container_logs,
follow_logs=True,
+
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
+ log_formatter=self.log_formatter,
)
except kubernetes.client.exceptions.ApiException as exc:
self._handle_api_exception(exc, pod)
@@ -767,6 +778,8 @@ class KubernetesPodOperator(BaseOperator):
pod=pod,
containers=self.container_logs,
follow_logs=True,
+
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
+ log_formatter=self.log_formatter,
)
if not self.get_logs or (
self.container_logs is not True and self.base_container_name
not in self.container_logs
@@ -915,6 +928,8 @@ class KubernetesPodOperator(BaseOperator):
container_name=self.base_container_name,
follow=follow,
since_time=last_log_time,
+
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
+ log_formatter=self.log_formatter,
)
self.invoke_defer_method(pod_log_status.last_log_time)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index b426a17ccc2..a7886b89917 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -23,7 +23,7 @@ import enum
import json
import math
import time
-from collections.abc import Generator, Iterable
+from collections.abc import Callable, Generator, Iterable
from contextlib import closing, suppress
from dataclasses import dataclass
from datetime import timedelta
@@ -456,6 +456,26 @@ class PodManager(LoggingMixin):
await asyncio.sleep(check_interval)
+ def _log_message(
+ self,
+ message: str,
+ container_name: str,
+ container_name_log_prefix_enabled: bool,
+ log_formatter: Callable[[str, str], str] | None,
+ ) -> None:
+ """Log a message with appropriate formatting."""
+ if is_log_group_marker(message):
+ print(message)
+ else:
+ if log_formatter:
+ formatted_message = log_formatter(container_name, message)
+ self.log.info("%s", formatted_message)
+ else:
+ log_message = (
+ f"[{container_name}] {message}" if
container_name_log_prefix_enabled else message
+ )
+ self.log.info("%s", log_message)
+
def fetch_container_logs(
self,
pod: V1Pod,
@@ -464,6 +484,8 @@ class PodManager(LoggingMixin):
follow=False,
since_time: DateTime | None = None,
post_termination_timeout: int = 120,
+ container_name_log_prefix_enabled: bool = True,
+ log_formatter: Callable[[str, str], str] | None = None,
) -> PodLoggingStatus:
"""
Follow the logs of container and stream to airflow logging.
@@ -529,10 +551,12 @@ class PodManager(LoggingMixin):
line=line, client=self._client,
mode=ExecutionMode.SYNC
)
if message_to_log is not None:
- if is_log_group_marker(message_to_log):
- print(message_to_log)
- else:
- self.log.info("[%s] %s",
container_name, message_to_log)
+ self._log_message(
+ message_to_log,
+ container_name,
+ container_name_log_prefix_enabled,
+ log_formatter,
+ )
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
@@ -548,10 +572,9 @@ class PodManager(LoggingMixin):
line=line, client=self._client,
mode=ExecutionMode.SYNC
)
if message_to_log is not None:
- if is_log_group_marker(message_to_log):
- print(message_to_log)
- else:
- self.log.info("[%s] %s", container_name,
message_to_log)
+ self._log_message(
+ message_to_log, container_name,
container_name_log_prefix_enabled, log_formatter
+ )
last_captured_timestamp = message_timestamp
except TimeoutError as e:
# in case of timeout, increment return time by 2 seconds to
avoid
@@ -637,7 +660,12 @@ class PodManager(LoggingMixin):
return containers_to_log
def fetch_requested_init_container_logs(
- self, pod: V1Pod, init_containers: Iterable[str] | str | Literal[True]
| None, follow_logs=False
+ self,
+ pod: V1Pod,
+ init_containers: Iterable[str] | str | Literal[True] | None,
+ follow_logs=False,
+ container_name_log_prefix_enabled: bool = True,
+ log_formatter: Callable[[str, str], str] | None = None,
) -> list[PodLoggingStatus]:
"""
Follow the logs of containers in the specified pod and publish it to
airflow logging.
@@ -657,12 +685,23 @@ class PodManager(LoggingMixin):
containers_to_log = sorted(containers_to_log, key=lambda cn:
all_containers.index(cn))
for c in containers_to_log:
self._await_init_container_start(pod=pod, container_name=c)
- status = self.fetch_container_logs(pod=pod, container_name=c,
follow=follow_logs)
+ status = self.fetch_container_logs(
+ pod=pod,
+ container_name=c,
+ follow=follow_logs,
+
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
+ log_formatter=log_formatter,
+ )
pod_logging_statuses.append(status)
return pod_logging_statuses
def fetch_requested_container_logs(
- self, pod: V1Pod, containers: Iterable[str] | str | Literal[True],
follow_logs=False
+ self,
+ pod: V1Pod,
+ containers: Iterable[str] | str | Literal[True],
+ follow_logs=False,
+ container_name_log_prefix_enabled: bool = True,
+ log_formatter: Callable[[str, str], str] | None = None,
) -> list[PodLoggingStatus]:
"""
Follow the logs of containers in the specified pod and publish it to
airflow logging.
@@ -679,7 +718,13 @@ class PodManager(LoggingMixin):
pod_name=pod.metadata.name,
)
for c in containers_to_log:
- status = self.fetch_container_logs(pod=pod, container_name=c,
follow=follow_logs)
+ status = self.fetch_container_logs(
+ pod=pod,
+ container_name=c,
+ follow=follow_logs,
+
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
+ log_formatter=log_formatter,
+ )
pod_logging_statuses.append(status)
return pod_logging_statuses
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 62ff20ea771..5641d00e9d4 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -1733,7 +1733,13 @@ class TestKubernetesPodOperator:
pod, _ = self.run_pod(k)
# check that the base container is not included in the logs
- mock_fetch_log.assert_called_once_with(pod=pod,
containers=["some_init_container"], follow_logs=True)
+ mock_fetch_log.assert_called_once_with(
+ pod=pod,
+ containers=["some_init_container"],
+ follow_logs=True,
+ container_name_log_prefix_enabled=True,
+ log_formatter=None,
+ )
# check that KPO waits for the base container to complete before
proceeding to extract XCom
mock_await_container_completion.assert_called_once_with(
pod=pod, container_name="base", polling_time=1
@@ -1999,7 +2005,16 @@ class TestKubernetesPodOperator:
if get_logs:
fetch_requested_container_logs.assert_has_calls(
- [mock.call(pod=pod, containers=k.container_logs,
follow_logs=True)] * 3
+ [
+ mock.call(
+ pod=pod,
+ containers=k.container_logs,
+ follow_logs=True,
+ container_name_log_prefix_enabled=True,
+ log_formatter=None,
+ )
+ ]
+ * 3
)
else:
mock_await_container_completion.assert_has_calls(
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
index ed20be1f80f..bf5673d706b 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -729,6 +729,8 @@ class TestSparkKubernetesOperator:
pod=op.pod,
containers="spark-kubernetes-driver",
follow_logs=True,
+ container_name_log_prefix_enabled=True,
+ log_formatter=None,
)
@pytest.mark.asyncio