hussein-awala commented on code in PR #30665:
URL: https://github.com/apache/airflow/pull/30665#discussion_r1167973633


##########
airflow/providers/docker/operators/docker_swarm.py:
##########
@@ -178,31 +178,39 @@ def _has_service_terminated(self) -> bool:
     def _stream_logs_to_output(self) -> None:
         if not self.service:
             raise Exception("The 'service' should be initialized before!")
-        logs = self.cli.service_logs(
-            self.service["ID"], follow=True, stdout=True, stderr=True, 
is_tty=self.tty
-        )
-        line = ""
-        while True:
-            try:
-                log = next(logs)
-            except StopIteration:
-                # If the service log stream terminated, stop fetching logs 
further.
-                break
-            else:
-                try:
-                    log = log.decode()
-                except UnicodeDecodeError:
-                    continue
-                if log == "\n":
-                    self.log.info(line)
-                    line = ""
-                else:
-                    line += log
-        # flush any remaining log stream
-        if line:
-            self.log.info(line)
+        logsBuffer = LogsBuffer()
+        while not self._has_service_terminated():
+            sleep(5) # Avoid overflooding the API
+            logs = self.cli.service_logs(
+                self.service["ID"], follow=False, stdout=True, stderr=True, 
is_tty=self.tty
+            )
+            logs = b''.join(logs)
+            print_lines =logsBuffer.increase(logs)
+            if print_lines != None:
+                self.log.info(print_lines)
+        self.log.info(logsBuffer.chars_remaining)
 
     def on_kill(self) -> None:
         if self.hook.client_created and self.service is not None:
             self.log.info("Removing docker service: %s", self.service["ID"])
             self.cli.remove_service(self.service["ID"])
+
+
+# A custom logs bytes buffer that keeps the current state of the
+# docker service logs, as these get continuously polled by the client.
+class LogsBuffer:

Review Comment:
   can you make a docstring from these comments and delete them?



##########
airflow/providers/docker/operators/docker_swarm.py:
##########
@@ -178,31 +178,39 @@ def _has_service_terminated(self) -> bool:
     def _stream_logs_to_output(self) -> None:
         if not self.service:
             raise Exception("The 'service' should be initialized before!")
-        logs = self.cli.service_logs(
-            self.service["ID"], follow=True, stdout=True, stderr=True, 
is_tty=self.tty
-        )
-        line = ""
-        while True:
-            try:
-                log = next(logs)
-            except StopIteration:
-                # If the service log stream terminated, stop fetching logs 
further.
-                break
-            else:
-                try:
-                    log = log.decode()
-                except UnicodeDecodeError:
-                    continue
-                if log == "\n":
-                    self.log.info(line)
-                    line = ""
-                else:
-                    line += log
-        # flush any remaining log stream
-        if line:
-            self.log.info(line)
+        logsBuffer = LogsBuffer()
+        while not self._has_service_terminated():
+            sleep(5) # Avoid overflooding the API
+            logs = self.cli.service_logs(
+                self.service["ID"], follow=False, stdout=True, stderr=True, 
is_tty=self.tty
+            )
+            logs = b''.join(logs)
+            print_lines =logsBuffer.increase(logs)
+            if print_lines != None:
+                self.log.info(print_lines)
+        self.log.info(logsBuffer.chars_remaining)
 
     def on_kill(self) -> None:
         if self.hook.client_created and self.service is not None:
             self.log.info("Removing docker service: %s", self.service["ID"])
             self.cli.remove_service(self.service["ID"])
+
+
+# A custom logs bytes buffer that keeps the current state of the
+# docker service logs, as these get continuously polled by the client.
+class LogsBuffer:
+    def __init__(self) -> None:
+        self.cur_buffer = bytes()

Review Comment:
   You can store the last printed index instead of storing the buffer in 
memory, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to