This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 335f64c6830 support grouping of log lines for KubernetesPodOperator
(#44428)
335f64c6830 is described below
commit 335f64c6830b3d67028d38e915f2f69cd99ae3fa
Author: karunpoudel <[email protected]>
AuthorDate: Wed Nov 27 22:30:44 2024 -0500
support grouping of log lines for KubernetesPodOperator (#44428)
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index e123ef0b0d2..bda798cc928 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -451,7 +451,10 @@ class PodManager(LoggingMixin):
line=line, client=self._client,
mode=ExecutionMode.SYNC
)
if message_to_log is not None:
- self.log.info("[%s] %s", container_name,
message_to_log)
+ if is_log_group_marker(message_to_log):
+ print(message_to_log)
+ else:
+ self.log.info("[%s] %s",
container_name, message_to_log)
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
@@ -467,7 +470,10 @@ class PodManager(LoggingMixin):
line=line, client=self._client,
mode=ExecutionMode.SYNC
)
if message_to_log is not None:
- self.log.info("[%s] %s", container_name,
message_to_log)
+ if is_log_group_marker(message_to_log):
+ print(message_to_log)
+ else:
+ self.log.info("[%s] %s", container_name,
message_to_log)
last_captured_timestamp = message_timestamp
except TimeoutError as e:
# in case of timeout, increment return time by 2 seconds to
avoid
@@ -820,3 +826,8 @@ class OnFinishAction(str, enum.Enum):
KEEP_POD = "keep_pod"
DELETE_POD = "delete_pod"
DELETE_SUCCEEDED_POD = "delete_succeeded_pod"
+
+
+def is_log_group_marker(line: str) -> bool:
+ """Check if the line is a log group marker like `::group::` or
`::endgroup::`."""
+ return line.startswith("::group::") or line.startswith("::endgroup::")