This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 68b3b7b468 Simplify KPO multi container log reconciliation logic
(#35450)
68b3b7b468 is described below
commit 68b3b7b4683c8e06098dfa8820be18f253d55f47
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Nov 7 10:22:24 2023 -0800
Simplify KPO multi container log reconciliation logic (#35450)
Easier to follow this way.
---
airflow/providers/cncf/kubernetes/operators/pod.py | 2 +-
.../providers/cncf/kubernetes/utils/pod_manager.py | 69 ++++++++++++++--------
.../cncf/kubernetes/operators/test_pod.py | 4 +-
.../cncf/kubernetes/utils/test_pod_manager.py | 2 +-
4 files changed, 46 insertions(+), 31 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 5b58b1bedb..58e54a72d3 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -611,7 +611,7 @@ class KubernetesPodOperator(BaseOperator):
if self.get_logs:
self.pod_manager.fetch_requested_container_logs(
pod=self.pod,
- container_logs=self.container_logs,
+ containers=self.container_logs,
follow_logs=True,
)
if not self.get_logs or (
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 2d5abec0e9..75ff82fc2f 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -381,6 +381,8 @@ class PodManager(LoggingMixin):
Between when the pod starts and logs being available, there might be a
delay due to CSR not approved
and signed yet. In such situation, ApiException is thrown. This is why
we are retrying on this
specific exception.
+
+ :meta private:
"""
@tenacity.retry(
@@ -476,53 +478,68 @@ class PodManager(LoggingMixin):
else: # follow requested, but container is done
break
- def fetch_requested_container_logs(
- self, pod: V1Pod, container_logs: Iterable[str] | str | Literal[True],
follow_logs=False
- ) -> None:
- """
- Follow the logs of containers in the specified pod and publish it to
airflow logging.
-
- Returns when all the containers exit.
- """
- all_containers = self.get_container_names(pod)
- if all_containers:
- if isinstance(container_logs, str):
+ def _reconcile_requested_log_containers(
+ self, requested: Iterable[str] | str | bool, actual: list[str],
pod_name
+ ) -> list[str]:
+ """Return actual containers based on requested."""
+ containers_to_log = []
+ if actual:
+ if isinstance(requested, str):
# fetch logs only for requested container if only one
container is provided
- if container_logs in all_containers:
- self.fetch_container_logs(pod=pod,
container_name=container_logs, follow=follow_logs)
+ if requested in actual:
+ containers_to_log.append(requested)
else:
self.log.error(
"container %s whose logs were requested not found in
the pod %s",
- container_logs,
- pod.metadata.name,
+ requested,
+ pod_name,
)
- elif isinstance(container_logs, bool):
+ elif isinstance(requested, bool):
# if True is provided, get logs for all the containers
- if container_logs is True:
- for container_name in all_containers:
- self.fetch_container_logs(pod=pod,
container_name=container_name, follow=follow_logs)
+ if requested is True:
+ containers_to_log.extend(actual)
else:
self.log.error(
"False is not a valid value for container_logs",
)
else:
# if a sequence of containers are provided, iterate for every
container in the pod
- if isinstance(container_logs, Iterable):
- for container in container_logs:
- if container in all_containers:
- self.fetch_container_logs(pod=pod,
container_name=container, follow=follow_logs)
+ if isinstance(requested, Iterable):
+ for container in requested:
+ if container in actual:
+ containers_to_log.append(container)
else:
self.log.error(
"Container %s whose logs were requests not
found in the pod %s",
container,
- pod.metadata.name,
+ pod_name,
)
else:
self.log.error(
- "Invalid type %s specified for container names input
parameter", type(container_logs)
+ "Invalid type %s specified for container names input
parameter", type(requested)
)
else:
- self.log.error("Could not retrieve containers for the pod: %s",
pod.metadata.name)
+ self.log.error("Could not retrieve containers for the pod: %s",
pod_name)
+ return containers_to_log
+
+ def fetch_requested_container_logs(
+ self, pod: V1Pod, containers: Iterable[str] | str | Literal[True],
follow_logs=False
+ ) -> None:
+ """
+ Follow the logs of containers in the specified pod and publish it to
airflow logging.
+
+ Returns when all the containers exit.
+
+ :meta private:
+ """
+ all_containers = self.get_container_names(pod)
+ containers_to_log = self._reconcile_requested_log_containers(
+ requested=containers,
+ actual=all_containers,
+ pod_name=pod.metadata.name,
+ )
+ for c in containers_to_log:
+ self.fetch_container_logs(pod=pod, container_name=c,
follow=follow_logs)
def await_container_completion(self, pod: V1Pod, container_name: str) ->
None:
"""
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 4ad631ac4e..ae49d7fe7d 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1371,9 +1371,7 @@ class TestKubernetesPodOperator:
pod = self.run_pod(k)
# check that the base container is not included in the logs
- mock_fetch_log.assert_called_once_with(
- pod=pod, container_logs=["some_init_container"], follow_logs=True
- )
+ mock_fetch_log.assert_called_once_with(pod=pod,
containers=["some_init_container"], follow_logs=True)
# check that KPO waits for the base container to complete before
proceeding to extract XCom
mock_await_container_completion.assert_called_once_with(pod=pod,
container_name="base")
# check that we wait for the xcom sidecar to start before extracting
XCom
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 11dce0e17b..e6f071559b 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -421,7 +421,7 @@ class TestPodManager:
)
self.pod_manager.fetch_requested_container_logs(
- pod=mock_pod, container_logs=container_logs, follow_logs=follow
+ pod=mock_pod, containers=container_logs, follow_logs=follow
)
calls = {tuple(x[1].values()) for x in
container_is_running.call_args_list}
pod = self.pod_manager.read_pod.return_value