This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7ea4547 Make SDK worker resilient to bad logging services. (#9214)
7ea4547 is described below
commit 7ea4547a6f2530dffc06cb4dab3cdde58c2fd0dc
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Aug 1 19:00:54 2019 +0200
Make SDK worker resilient to bad logging services. (#9214)
---
.../apache_beam/runners/worker/log_handler.py | 26 +++++++++++++---------
.../apache_beam/runners/worker/sdk_worker_main.py | 25 ++++++++++++---------
2 files changed, 31 insertions(+), 20 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py
b/sdks/python/apache_beam/runners/worker/log_handler.py
index 4eac50c..b38aaed 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -99,16 +99,22 @@ class FnApiLogRecordHandler(logging.Handler):
def close(self):
"""Flush out all existing log entries and unregister this handler."""
- self._alive = False
- # Acquiring the handler lock ensures ``emit`` is not run until the lock is
- # released.
- self.acquire()
- self._log_entry_queue.put(self._FINISHED, timeout=5)
- # wait on server to close.
- self._reader.join()
- self.release()
- # Unregister this handler.
- super(FnApiLogRecordHandler, self).close()
+ try:
+ self._alive = False
+ # Acquiring the handler lock ensures ``emit`` is not run until the lock
is
+ # released.
+ self.acquire()
+ self._log_entry_queue.put(self._FINISHED, timeout=5)
+ # wait on server to close.
+ self._reader.join()
+ self.release()
+ # Unregister this handler.
+ super(FnApiLogRecordHandler, self).close()
+ except Exception:
+ # Log rather than raising exceptions, to avoid clobbering
+ # underlying errors that may have caused this to close
+ # prematurely.
+ logging.error("Error closing the logging channel.", exc_info=True)
def _write_log_entries(self):
done = False
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 4c7bee6..81bc1b5 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -91,16 +91,21 @@ class StatusServer(object):
def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
if 'LOGGING_API_SERVICE_DESCRIPTOR' in os.environ:
- logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
- text_format.Merge(os.environ['LOGGING_API_SERVICE_DESCRIPTOR'],
- logging_service_descriptor)
-
- # Send all logs to the runner.
- fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor)
- # TODO(BEAM-5468): This should be picked up from pipeline options.
- logging.getLogger().setLevel(logging.INFO)
- logging.getLogger().addHandler(fn_log_handler)
- logging.info('Logging handler created.')
+ try:
+ logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
+ text_format.Merge(os.environ['LOGGING_API_SERVICE_DESCRIPTOR'],
+ logging_service_descriptor)
+
+ # Send all logs to the runner.
+ fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor)
+ # TODO(BEAM-5468): This should be picked up from pipeline options.
+ logging.getLogger().setLevel(logging.INFO)
+ logging.getLogger().addHandler(fn_log_handler)
+ logging.info('Logging handler created.')
+ except Exception:
+ logging.error("Failed to set up logging handler, continuing without.",
+ exc_info=True)
+ fn_log_handler = None
else:
fn_log_handler = None