dimberman commented on code in PR #28336:
URL: https://github.com/apache/airflow/pull/28336#discussion_r1104625619


##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -91,6 +99,77 @@ def get_container_termination_message(pod: V1Pod, 
container_name: str):
         return container_status.state.terminated.message if container_status 
else None
 
 
+class PodLogsConsumer:
+    """
+    PodLogsConsumer is responsible for pulling pod logs from a stream with 
checking a container status before
+    reading data.
+    This class is a workaround for the issue 
https://github.com/apache/airflow/issues/23497
+
+    :meta private:
+    """
+
+    def __init__(
+        self,
+        response: HTTPResponse,
+        pod: V1Pod,
+        pod_manager: PodManager,
+        container_name: str,
+        post_termination_timeout: int = 120,
+        read_pod_cache_timeout: int = 120,

Review Comment:
   Can you please add descriptions for these variables into the docstring



##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -70,15 +73,20 @@ class PodPhase:
     terminal_states = {FAILED, SUCCEEDED}
 
 
+def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus 
| None:
+    """Retrieves container status"""
+    container_statuses = pod.status.container_statuses if pod and pod.status 
else None
+    if container_statuses:
+        return next((x for x in container_statuses if x.name == 
container_name), None)

Review Comment:
   Can you leave a short comment above this line explaining why you're using 
next. I get it, but it's a bit "deep python" and might come off as magic for 
someone just reading this the first time.



##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -91,6 +99,77 @@ def get_container_termination_message(pod: V1Pod, 
container_name: str):
         return container_status.state.terminated.message if container_status 
else None
 
 
+class PodLogsConsumer:
+    """
+    PodLogsConsumer is responsible for pulling pod logs from a stream with 
checking a container status before
+    reading data.
+    This class is a workaround for the issue 
https://github.com/apache/airflow/issues/23497
+
+    :meta private:
+    """
+
+    def __init__(
+        self,
+        response: HTTPResponse,
+        pod: V1Pod,
+        pod_manager: PodManager,
+        container_name: str,
+        post_termination_timeout: int = 120,
+        read_pod_cache_timeout: int = 120,
+    ):
+        self.response = response
+        self.pod = pod
+        self.pod_manager = pod_manager
+        self.container_name = container_name
+        self.post_termination_timeout = post_termination_timeout
+        self.last_read_pod_at = None
+        self.read_pod_cache = None
+        self.read_pod_cache_timeout = read_pod_cache_timeout
+
+    def __iter__(self) -> Generator[bytes, None, None]:
+        messages: list[bytes] = []
+        if self.logs_available():
+            for chunk in self.response.stream(amt=None, decode_content=True):
+                if b"\n" in chunk:
+                    chunks = chunk.split(b"\n")
+                    yield b"".join(messages) + chunks[0] + b"\n"
+                    for x in chunks[1:-1]:
+                        yield x + b"\n"
+                    if chunks[-1]:
+                        messages = [chunks[-1]]
+                    else:
+                        messages = []
+                else:
+                    messages.append(chunk)
+                if not self.logs_available():
+                    break
+        if messages:
+            yield b"".join(messages)

Review Comment:
   I'm not a fan of how deep the nesting gets over here. Can you please break 
out a few helper functions to simplify this code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to