dstandish commented on code in PR #28336:
URL: https://github.com/apache/airflow/pull/28336#discussion_r1058615241


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -168,6 +168,7 @@ class KubernetesPodOperator(BaseOperator):
     :param labels: labels to apply to the Pod. (templated)
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
     :param get_logs: get the stdout of the container as logs of the tasks.
+    :param logs_timeout: timeout in seconds to read logs after container 
termination.

Review Comment:
   I am not sure this needs to be configurable.
   
   It reads like a setting "if no logs for logs_timeout settings then quit". 
but... really this timeout is applied only after pod completes. and, in this 
case, 2 minutes seems fine, it's a potentially confusing param, and i don't 
think we need to make that configurable.



##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -91,6 +99,61 @@ def get_container_termination_message(pod: V1Pod, 
container_name: str):
         return container_status.state.terminated.message if container_status 
else None
 
 
+class PodLogsConsumer:
+    """
+    PodLogsConsumer is responsible for pulling pod logs from a stream with 
checking a container status before
+    reading data.
+    This class is a workaround for the issue 
https://github.com/apache/airflow/issues/23497
+    """
+
+    def __init__(
+        self,
+        response: HTTPResponse,
+        pod: V1Pod,
+        pod_manager: PodManager,
+        container_name: str,
+        timeout: int = 120,
+    ):
+        self.response = response
+        self.pod = pod
+        self.pod_manager = pod_manager
+        self.container_name = container_name
+        self.timeout = timeout
+
+    def __iter__(self) -> Generator[bytes, None, None]:
+        messages: list[bytes] = []
+        if self.logs_available():
+            for chunk in self.response.stream(amt=None, decode_content=True):

Review Comment:
   does this work when follow=False?



##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -91,6 +99,61 @@ def get_container_termination_message(pod: V1Pod, 
container_name: str):
         return container_status.state.terminated.message if container_status 
else None
 
 
+class PodLogsConsumer:
+    """
+    PodLogsConsumer is responsible for pulling pod logs from a stream with 
checking a container status before
+    reading data.
+    This class is a workaround for the issue 
https://github.com/apache/airflow/issues/23497
+    """
+
+    def __init__(
+        self,
+        response: HTTPResponse,
+        pod: V1Pod,
+        pod_manager: PodManager,
+        container_name: str,
+        timeout: int = 120,
+    ):
+        self.response = response
+        self.pod = pod
+        self.pod_manager = pod_manager
+        self.container_name = container_name
+        self.timeout = timeout
+
+    def __iter__(self) -> Generator[bytes, None, None]:
+        messages: list[bytes] = []
+        if self.logs_available():
+            for chunk in self.response.stream(amt=None, decode_content=True):
+                if b"\n" in chunk:
+                    chunks = chunk.split(b"\n")
+                    yield b"".join(messages) + chunks[0] + b"\n"
+                    for x in chunks[1:-1]:
+                        yield x + b"\n"
+                    if chunks[-1]:
+                        messages = [chunks[-1]]
+                    else:
+                        messages = []
+                else:
+                    messages.append(chunk)
+                if not self.logs_available():

Review Comment:
   this would seem to result in a _lot_ of calls to the kube API.  it may be 
wise to limit such cheking to every couple minutes?  wdyt? @jedcunningham ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to