This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7857fa7 [BEAM-7109] Do not reconnect logging at termination new d866fed Merge pull request #8367 from angoenka/logging_thread_leak 7857fa7 is described below commit 7857fa76b391fe44ea4021d683922a84eab410d7 Author: Ankur Goenka <ankurgoe...@gmail.com> AuthorDate: Fri Apr 19 16:14:44 2019 -0700 [BEAM-7109] Do not reconnect logging at termination --- .../apache_beam/runners/worker/log_handler.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py index cbd68f5..4eac50c 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler.py +++ b/sdks/python/apache_beam/runners/worker/log_handler.py @@ -24,6 +24,7 @@ import math import queue import sys import threading +import time from builtins import range import grpc @@ -59,6 +60,7 @@ class FnApiLogRecordHandler(logging.Handler): def __init__(self, log_service_descriptor): super(FnApiLogRecordHandler, self).__init__() + self._alive = True self._dropped_logs = 0 self._log_entry_queue = queue.Queue(maxsize=self._QUEUE_SIZE) @@ -66,9 +68,6 @@ class FnApiLogRecordHandler(logging.Handler): # Make sure the channel is ready to avoid [BEAM-4649] grpc.channel_ready_future(ch).result(timeout=60) self._log_channel = grpc.intercept_channel(ch, WorkerIdInterceptor()) - self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub( - self._log_channel) - self._reader = threading.Thread( target=lambda: self._read_log_control_messages(), name='read_log_control_messages') @@ -76,6 +75,10 @@ class FnApiLogRecordHandler(logging.Handler): self._reader.start() def connect(self): + if hasattr(self, '_logging_stub'): + del self._logging_stub + self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub( + self._log_channel) return self._logging_stub.Logging(self._write_log_entries()) def emit(self, record): @@ -96,6 +99,7 @@ class FnApiLogRecordHandler(logging.Handler): def close(self): """Flush out all existing log entries and unregister this handler.""" + self._alive = False # Acquiring the handler lock ensures ``emit`` is not run until the lock is # released. self.acquire() @@ -122,7 +126,14 @@ class FnApiLogRecordHandler(logging.Handler): yield beam_fn_api_pb2.LogEntry.List(log_entries=log_entries) def _read_log_control_messages(self): - while True: + # Only reconnect when we are alive. + # We can drop some logs in the unlikely event of logging connection + # dropped(not closed) during termination when we still have logs to be sent. + # This case is unlikely and the chance of reconnection and successful + # transmission of logs is also very less as the process is terminating. + # I choose not to handle this case to avoid un-necessary code complexity. + while self._alive: + # Loop for reconnection. log_control_iterator = self.connect() if self._dropped_logs > 0: logging.warn("Dropped %d logs while logging client disconnected", @@ -130,6 +141,7 @@ class FnApiLogRecordHandler(logging.Handler): self._dropped_logs = 0 try: for _ in log_control_iterator: + # Loop for consuming messages from server. # TODO(vikasrk): Handle control messages. pass # iterator is closed @@ -137,3 +149,5 @@ class FnApiLogRecordHandler(logging.Handler): except Exception as ex: print("Logging client failed: {}... resetting".format(ex), file=sys.stderr) + # Wait a bit before trying a reconnect + time.sleep(0.5) # 0.5 seconds