This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7857fa7 [BEAM-7109] Do not reconnect logging at termination
new d866fed Merge pull request #8367 from angoenka/logging_thread_leak
7857fa7 is described below
commit 7857fa76b391fe44ea4021d683922a84eab410d7
Author: Ankur Goenka <[email protected]>
AuthorDate: Fri Apr 19 16:14:44 2019 -0700
[BEAM-7109] Do not reconnect logging at termination
---
.../apache_beam/runners/worker/log_handler.py | 22 ++++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py
b/sdks/python/apache_beam/runners/worker/log_handler.py
index cbd68f5..4eac50c 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -24,6 +24,7 @@ import math
import queue
import sys
import threading
+import time
from builtins import range
import grpc
@@ -59,6 +60,7 @@ class FnApiLogRecordHandler(logging.Handler):
def __init__(self, log_service_descriptor):
super(FnApiLogRecordHandler, self).__init__()
+ self._alive = True
self._dropped_logs = 0
self._log_entry_queue = queue.Queue(maxsize=self._QUEUE_SIZE)
@@ -66,9 +68,6 @@ class FnApiLogRecordHandler(logging.Handler):
# Make sure the channel is ready to avoid [BEAM-4649]
grpc.channel_ready_future(ch).result(timeout=60)
self._log_channel = grpc.intercept_channel(ch, WorkerIdInterceptor())
- self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub(
- self._log_channel)
-
self._reader = threading.Thread(
target=lambda: self._read_log_control_messages(),
name='read_log_control_messages')
@@ -76,6 +75,10 @@ class FnApiLogRecordHandler(logging.Handler):
self._reader.start()
def connect(self):
+ if hasattr(self, '_logging_stub'):
+ del self._logging_stub
+ self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub(
+ self._log_channel)
return self._logging_stub.Logging(self._write_log_entries())
def emit(self, record):
@@ -96,6 +99,7 @@ class FnApiLogRecordHandler(logging.Handler):
def close(self):
"""Flush out all existing log entries and unregister this handler."""
+ self._alive = False
# Acquiring the handler lock ensures ``emit`` is not run until the lock is
# released.
self.acquire()
@@ -122,7 +126,14 @@ class FnApiLogRecordHandler(logging.Handler):
yield beam_fn_api_pb2.LogEntry.List(log_entries=log_entries)
def _read_log_control_messages(self):
- while True:
+ # Only reconnect when we are alive.
+ # We can drop some logs in the unlikely event of logging connection
+ # dropped(not closed) during termination when we still have logs to be
sent.
+ # This case is unlikely and the chance of reconnection and successful
+ # transmission of logs is also very less as the process is terminating.
+ # I choose not to handle this case to avoid un-necessary code complexity.
+ while self._alive:
+ # Loop for reconnection.
log_control_iterator = self.connect()
if self._dropped_logs > 0:
logging.warn("Dropped %d logs while logging client disconnected",
@@ -130,6 +141,7 @@ class FnApiLogRecordHandler(logging.Handler):
self._dropped_logs = 0
try:
for _ in log_control_iterator:
+ # Loop for consuming messages from server.
# TODO(vikasrk): Handle control messages.
pass
# iterator is closed
@@ -137,3 +149,5 @@ class FnApiLogRecordHandler(logging.Handler):
except Exception as ex:
print("Logging client failed: {}... resetting".format(ex),
file=sys.stderr)
+ # Wait a bit before trying a reconnect
+ time.sleep(0.5) # 0.5 seconds