jedcunningham commented on code in PR #28336:
URL: https://github.com/apache/airflow/pull/28336#discussion_r1059470652


##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -91,6 +99,61 @@ def get_container_termination_message(pod: V1Pod, 
container_name: str):
         return container_status.state.terminated.message if container_status 
else None
 
 
+class PodLogsConsumer:
+    """
+    PodLogsConsumer is responsible for pulling pod logs from a stream with 
checking a container status before
+    reading data.
+    This class is a workaround for the issue 
https://github.com/apache/airflow/issues/23497
+    """
+
+    def __init__(
+        self,
+        response: HTTPResponse,
+        pod: V1Pod,
+        pod_manager: PodManager,
+        container_name: str,
+        timeout: int = 120,
+    ):
+        self.response = response
+        self.pod = pod
+        self.pod_manager = pod_manager
+        self.container_name = container_name
+        self.timeout = timeout
+
+    def __iter__(self) -> Generator[bytes, None, None]:
+        messages: list[bytes] = []
+        if self.logs_available():
+            for chunk in self.response.stream(amt=None, decode_content=True):
+                if b"\n" in chunk:
+                    chunks = chunk.split(b"\n")
+                    yield b"".join(messages) + chunks[0] + b"\n"
+                    for x in chunks[1:-1]:
+                        yield x + b"\n"
+                    if chunks[-1]:
+                        messages = [chunks[-1]]
+                    else:
+                        messages = []
+                else:
+                    messages.append(chunk)
+                if not self.logs_available():

Review Comment:
   This whole approach seems a little backwards to me, if I'm understanding 
things correctly. _Sometimes_ we can't continue fetching logs after 2 minutes 
(or whatever), so we _always_ stop trying then?
   
   Would setting a timeout on the request itself guard us from getting stuck 
forever? Maybe `_request_timeout` on `read_namespaced_pod_log`? If not, I feel 
like we should find a way to bail once we know we've hit it instead of just not 
attempting it in the first place.



##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -168,6 +168,7 @@ class KubernetesPodOperator(BaseOperator):
     :param labels: labels to apply to the Pod. (templated)
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
     :param get_logs: get the stdout of the container as logs of the tasks.
+    :param logs_timeout: timeout in seconds to read logs after container 
termination.

Review Comment:
   I think a constant is enough for this case. Really don't think it should be 
in the user facing api.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to