vincbeck commented on code in PR #31663:
URL: https://github.com/apache/airflow/pull/31663#discussion_r1218546169
##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -398,15 +420,81 @@ def consume_logs(
)
time.sleep(1)
- def await_container_completion(self, pod: V1Pod, container_name: str) ->
None:
+ def fetch_requested_container_logs(
+ self, pod: V1Pod, container_logs: list[str] | str | Literal[True],
follow_logs=False
+ ) -> list[PodLoggingStatus]:
+ """
+ Follow the logs of containers in the pod specified by input parameter
and publish
+ it to airflow logging. Returns when all the containers exit.
+ """
+ pod_logging_statuses = []
+ all_containers = self.get_container_names(pod)
+ if len(all_containers) == 0:
+ self.log.error("Could not retrieve containers for the pod: %s",
pod.metadata.name)
+ else:
+ if isinstance(container_logs, str):
+ # fetch logs only for requested container if only one
container is provided
+ if container_logs in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_logs,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "container %s whose logs were requested not found in
the pod %s",
+ container_logs,
+ pod.metadata.name,
+ )
+ elif isinstance(container_logs, bool):
+ # if True is provided, get logs for all the containers
+ if container_logs is True:
+ for container_name in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_name,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "False is not a valid value for container_logs",
+ )
Review Comment:
Is this branch needed?
##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -398,15 +420,81 @@ def consume_logs(
)
time.sleep(1)
- def await_container_completion(self, pod: V1Pod, container_name: str) ->
None:
+ def fetch_requested_container_logs(
+ self, pod: V1Pod, container_logs: list[str] | str | Literal[True],
follow_logs=False
+ ) -> list[PodLoggingStatus]:
+ """
+ Follow the logs of containers in the pod specified by input parameter
and publish
+ it to airflow logging. Returns when all the containers exit.
+ """
+ pod_logging_statuses = []
+ all_containers = self.get_container_names(pod)
+ if len(all_containers) == 0:
+ self.log.error("Could not retrieve containers for the pod: %s",
pod.metadata.name)
+ else:
+ if isinstance(container_logs, str):
+ # fetch logs only for requested container if only one
container is provided
+ if container_logs in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_logs,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "container %s whose logs were requested not found in
the pod %s",
+ container_logs,
+ pod.metadata.name,
+ )
+ elif isinstance(container_logs, bool):
+ # if True is provided, get logs for all the containers
+ if container_logs is True:
+ for container_name in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container_name,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "False is not a valid value for container_logs",
+ )
+ else:
+ # if a sequence of containers are provided, iterate for every
container in the pod
+ if isinstance(container_logs, Iterable):
+ for container in container_logs:
+ if container in all_containers:
+ status = self.fetch_container_logs(
+ pod=pod, container_name=container,
follow=follow_logs
+ )
+ pod_logging_statuses.append(status)
+ else:
+ self.log.error(
+ "Container %s whose logs were requests not
found in the pod %s",
+ container,
+ pod.metadata.name,
+ )
+ else:
+ self.log.error(
+ "Invalid type %s specified for container names input
parameter", type(container_logs)
+ )
Review Comment:
I think you can reduce the duplicates here. A suggestion:
```suggestion
if isinstance(container_logs, str):
container_logs = [container_logs]
for container in container_logs:
if container in all_containers:
status = self.fetch_container_logs(
pod=pod, container_name=container, follow=follow_logs
)
pod_logging_statuses.append(status)
else:
self.log.error(
"container %s whose logs were requested not found in
the pod %s",
container,
pod.metadata.name,
)
elif container_logs is True:
for container_name in all_containers:
status = self.fetch_container_logs(
pod=pod, container_name=container_name,
follow=follow_logs
)
pod_logging_statuses.append(status)
else:
self.log.error(
"Invalid type %s specified for container names input
parameter", type(container_logs)
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]