CodingJonas edited a comment on issue #9310:
URL: https://github.com/apache/airflow/issues/9310#issuecomment-913696551
Sorry for not following up on my workaround, we moved to Kubernetes, so we
did not finish polishing the workaround properly. I can still add the main code
parts we used to try to fix it. Perhaps it helps you!
```python
def _run_service(self):
...
if self.enable_logging:
service_log_stream = self.cli.service_logs(self.service['ID'],
follow=True, stdout=True, stderr=True, is_tty=self.tty)
_start_logging_async(self.log, service_log_stream)
...
def _start_logging_async(logger, service_log_stream):
"""
The logging task is blocking and thus stops the operator from
recognizing in time when the service finishes.
Since the logging thread is demonized, it will automatically be
terminated once the main script finishes.
"""
p = Thread(target=_stream_logs_to_output,
args=(logger, service_log_stream),
daemon=True)
p.start()
def _stream_logs_to_output(logger, logs):
line = ''
while True:
try:
log = next(logs)
# TODO: Remove this clause once
https://github.com/docker/docker-py/issues/931 is fixed
except requests.exceptions.ConnectionError:
logger.info("Connection Error while fetching log")
# If the service log stream stopped sending messages, check if
it the service has
# terminated.
break
except StopIteration:
logger.info("StopIteration while fetching log")
# If the service log stream terminated, stop fetching logs
further.
break
else:
try:
log = log.decode()
except UnicodeDecodeError:
continue
if log == '\n':
logger.info(line)
line = ''
else:
line += log
# flush any remaining log stream
if line:
logger.info(line)
```
The only addition we did is wrapping the `_stream_logs_to_output` method in
a Thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]