This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7857fa7  [BEAM-7109] Do not reconnect logging at termination
     new d866fed  Merge pull request #8367 from angoenka/logging_thread_leak
7857fa7 is described below

commit 7857fa76b391fe44ea4021d683922a84eab410d7
Author: Ankur Goenka <ankurgoe...@gmail.com>
AuthorDate: Fri Apr 19 16:14:44 2019 -0700

    [BEAM-7109] Do not reconnect logging at termination
---
 .../apache_beam/runners/worker/log_handler.py      | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py 
b/sdks/python/apache_beam/runners/worker/log_handler.py
index cbd68f5..4eac50c 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -24,6 +24,7 @@ import math
 import queue
 import sys
 import threading
+import time
 from builtins import range
 
 import grpc
@@ -59,6 +60,7 @@ class FnApiLogRecordHandler(logging.Handler):
   def __init__(self, log_service_descriptor):
     super(FnApiLogRecordHandler, self).__init__()
 
+    self._alive = True
     self._dropped_logs = 0
     self._log_entry_queue = queue.Queue(maxsize=self._QUEUE_SIZE)
 
@@ -66,9 +68,6 @@ class FnApiLogRecordHandler(logging.Handler):
     # Make sure the channel is ready to avoid [BEAM-4649]
     grpc.channel_ready_future(ch).result(timeout=60)
     self._log_channel = grpc.intercept_channel(ch, WorkerIdInterceptor())
-    self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub(
-        self._log_channel)
-
     self._reader = threading.Thread(
         target=lambda: self._read_log_control_messages(),
         name='read_log_control_messages')
@@ -76,6 +75,10 @@ class FnApiLogRecordHandler(logging.Handler):
     self._reader.start()
 
   def connect(self):
+    if hasattr(self, '_logging_stub'):
+      del self._logging_stub
+    self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub(
+        self._log_channel)
     return self._logging_stub.Logging(self._write_log_entries())
 
   def emit(self, record):
@@ -96,6 +99,7 @@ class FnApiLogRecordHandler(logging.Handler):
 
   def close(self):
     """Flush out all existing log entries and unregister this handler."""
+    self._alive = False
     # Acquiring the handler lock ensures ``emit`` is not run until the lock is
     # released.
     self.acquire()
@@ -122,7 +126,14 @@ class FnApiLogRecordHandler(logging.Handler):
         yield beam_fn_api_pb2.LogEntry.List(log_entries=log_entries)
 
   def _read_log_control_messages(self):
-    while True:
+    # Only reconnect when we are alive.
+    # We can drop some logs in the unlikely event of logging connection
+    # dropped(not closed) during termination when we still have logs to be 
sent.
+    # This case is unlikely and the chance of reconnection and successful
+    # transmission of logs is also very less as the process is terminating.
+    # I choose not to handle this case to avoid un-necessary code complexity.
+    while self._alive:
+      # Loop for reconnection.
       log_control_iterator = self.connect()
       if self._dropped_logs > 0:
         logging.warn("Dropped %d logs while logging client disconnected",
@@ -130,6 +141,7 @@ class FnApiLogRecordHandler(logging.Handler):
         self._dropped_logs = 0
       try:
         for _ in log_control_iterator:
+          # Loop for consuming messages from server.
           # TODO(vikasrk): Handle control messages.
           pass
         # iterator is closed
@@ -137,3 +149,5 @@ class FnApiLogRecordHandler(logging.Handler):
       except Exception as ex:
         print("Logging client failed: {}... resetting".format(ex),
               file=sys.stderr)
+        # Wait a bit before trying a reconnect
+        time.sleep(0.5) # 0.5 seconds

Reply via email to