CodingJonas edited a comment on issue #9310:
URL: https://github.com/apache/airflow/issues/9310#issuecomment-913696551


   Sorry for not following up on my workaround, we moved to Kubernetes, so we 
did not finish polishing the workaround properly. I can still add the main code 
parts we used to try to fix it. Perhaps it helps you!
   
   ```python
       def _run_service(self):
           ...
           if self.enable_logging:
               service_log_stream = self.cli.service_logs(self.service['ID'], 
follow=True, stdout=True, stderr=True, is_tty=self.tty)
               _start_logging_async(self.log, service_log_stream)
           ...
   
   def _start_logging_async(logger, service_log_stream):
       """
       The logging task is blocking and thus stops the operator from 
recognizing in time when the service finishes.
       Since the logging thread is demonized, it will automatically be 
terminated once the main script finishes.
       """
       p = Thread(target=_stream_logs_to_output,
                  args=(logger, service_log_stream),
                  daemon=True)
       p.start()
   
   def _stream_logs_to_output(logger, logs):
       line = ''
       while True:
           try:
               log = next(logs)
           # TODO: Remove this clause once 
https://github.com/docker/docker-py/issues/931 is fixed
           except requests.exceptions.ConnectionError:
               logger.info("Connection Error while fetching log")
               # If the service log stream stopped sending messages, check if 
it the service has
               # terminated.
               break
           except StopIteration:
               logger.info("StopIteration while fetching log")
               # If the service log stream terminated, stop fetching logs 
further.
               break
           else:
               try:
                   log = log.decode()
               except UnicodeDecodeError:
                   continue
               if log == '\n':
                   logger.info(line)
                   line = ''
               else:
                   line += log
       # flush any remaining log stream
       if line:
           logger.info(line)
   ```
   
   The only addition we did is wrapping the `_stream_logs_to_output` method in 
a Thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to