potiuk commented on code in PR #31663:
URL: https://github.com/apache/airflow/pull/31663#discussion_r1242503308
##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -398,15 +417,81 @@ def consume_logs(
)
time.sleep(1)
- def await_container_completion(self, pod: V1Pod, container_name: str) ->
None:
+ def fetch_requested_container_logs(
+ self, pod: V1Pod, container_logs: list[str] | str | Literal[True],
follow_logs=False
+ ) -> list[PodLoggingStatus]:
+ """
+ Follow the logs of containers in the pod specified by input parameter
and publish
+ it to airflow logging. Returns when all the containers exit.
+ """
+ pod_logging_statuses = []
+ all_containers = self.get_container_names(pod)
+ if len(all_containers) == 0:
+ self.log.error("Could not retrieve containers for the pod: %s",
pod.metadata.name)
+ else:
+ if isinstance(container_logs, str):
+ # fetch logs only for requested container if only one
container is provided
+ if container_logs in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_logs,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "container %s whose logs were requested not found in
the pod %s",
+ container_logs,
+ pod.metadata.name,
+ )
+ elif isinstance(container_logs, bool):
+ # if True is provided, get logs for all the containers
+ if container_logs is True:
+ for container_name in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_name,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "False is not a valid value for container_logs",
+ )
+ else:
+ # if a sequence of containers are provided, iterate for every
container in the pod
+ if isinstance(container_logs, Iterable):
+ for container in container_logs:
+ if container in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "Container %s whose logs were requests not
found in the pod %s",
+ container,
+ pod.metadata.name,
+ )
+ else:
+ self.log.error(
+ "Invalid type %s specified for container names input
parameter", type(container_logs)
+ )
+
+ return pod_logging_statuses
+
+ def await_container_completion(self, pod: V1Pod, container_name: str) ->
bool:
"""
Waits for the given container in the given pod to be completed
:param pod: pod spec that will be monitored
:param container_name: name of the container within the pod to monitor
+ :return: if container has completed
"""
- while not self.container_is_terminated(pod=pod,
container_name=container_name):
+ while True:
+ remote_pod = self.read_pod(pod)
+ terminated = container_is_completed(remote_pod, container_name)
+ if terminated:
+ break
+ self.log.info("Waiting for container '%s' state to be completed",
container_name)
time.sleep(1)
+ return terminated
Review Comment:
I mean - why do we need to return the bool ?
##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -398,15 +417,81 @@ def consume_logs(
)
time.sleep(1)
- def await_container_completion(self, pod: V1Pod, container_name: str) ->
None:
+ def fetch_requested_container_logs(
+ self, pod: V1Pod, container_logs: list[str] | str | Literal[True],
follow_logs=False
+ ) -> list[PodLoggingStatus]:
+ """
+ Follow the logs of containers in the pod specified by input parameter
and publish
+ it to airflow logging. Returns when all the containers exit.
+ """
+ pod_logging_statuses = []
+ all_containers = self.get_container_names(pod)
+ if len(all_containers) == 0:
+ self.log.error("Could not retrieve containers for the pod: %s",
pod.metadata.name)
+ else:
+ if isinstance(container_logs, str):
+ # fetch logs only for requested container if only one
container is provided
+ if container_logs in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_logs,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "container %s whose logs were requested not found in
the pod %s",
+ container_logs,
+ pod.metadata.name,
+ )
+ elif isinstance(container_logs, bool):
+ # if True is provided, get logs for all the containers
+ if container_logs is True:
+ for container_name in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_name,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "False is not a valid value for container_logs",
+ )
+ else:
+ # if a sequence of containers are provided, iterate for every
container in the pod
+ if isinstance(container_logs, Iterable):
+ for container in container_logs:
+ if container in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "Container %s whose logs were requests not
found in the pod %s",
+ container,
+ pod.metadata.name,
+ )
+ else:
+ self.log.error(
+ "Invalid type %s specified for container names input
parameter", type(container_logs)
+ )
+
+ return pod_logging_statuses
+
+ def await_container_completion(self, pod: V1Pod, container_name: str) ->
bool:
"""
Waits for the given container in the given pod to be completed
:param pod: pod spec that will be monitored
:param container_name: name of the container within the pod to monitor
+ :return: if container has completed
"""
- while not self.container_is_terminated(pod=pod,
container_name=container_name):
+ while True:
+ remote_pod = self.read_pod(pod)
+ terminated = container_is_completed(remote_pod, container_name)
+ if terminated:
+ break
+ self.log.info("Waiting for container '%s' state to be completed",
container_name)
time.sleep(1)
+ return terminated
Review Comment:
It will always be True right ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]