This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2011cfcc63a Fail when main session cannot be loaded. (#25617)
2011cfcc63a is described below

commit 2011cfcc63a83a8ec907052fee4abd9f533f67b5
Author: tvalentyn <[email protected]>
AuthorDate: Mon Mar 6 19:14:37 2023 -0800

    Fail when main session cannot be loaded. (#25617)
---
 CHANGES.md                                         | 14 +-----------
 .../apache_beam/runners/worker/sdk_worker.py       |  7 +++++-
 .../apache_beam/runners/worker/sdk_worker_main.py  | 26 +++++++++++++++-------
 3 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d36fb2ffaa7..9437d489916 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -64,11 +64,10 @@
 ## New Features / Improvements
 
 * The Flink runner now supports Flink 1.16.x 
([#25046](https://github.com/apache/beam/issues/25046)).
-* X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 
 ## Breaking Changes
 
-* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* If a main session fails to load, the pipeline will now fail at worker 
startup. ([#25401](https://github.com/apache/beam/issues/25401)).
 
 ## Deprecations
 
@@ -97,7 +96,6 @@
 
 ## I/Os
 
-* Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * Added in JmsIO a retry policy for failed publications (Java) 
([#24971](https://github.com/apache/beam/issues/24971)).
 * Support for `LZMA` compression/decompression of text files added to the 
Python SDK ([#25316](https://github.com/apache/beam/issues/25316))
 * Added ReadFrom/WriteTo Csv/Json as top-level transforms to the Python SDK.
@@ -139,11 +137,6 @@
 
 # [2.45.0] - 2023-02-15
 
-## Highlights
-
-* New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
-* New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
-
 ## I/Os
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
@@ -174,11 +167,6 @@
 
 # [2.44.0] - 2023-01-12
 
-## Highlights
-
-* New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
-* New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
-
 ## I/Os
 
 * Support for Bigtable sink (Write and WriteBatch) added (Go) 
([#23324](https://github.com/apache/beam/issues/23324)).
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index f53a8d9ec99..88beabd33ab 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -80,7 +80,6 @@ _VT = TypeVar('_VT')
 _LOGGER = logging.getLogger(__name__)
 
 DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60
-
 # The number of ProcessBundleRequest instruction ids the BundleProcessorCache
 # will remember for not running instructions.
 MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000
@@ -172,12 +171,16 @@ class SdkHarness(object):
       # Heap dump through status api is disabled by default
       enable_heap_dump=False,  # type: bool
       data_sampler=None,  # type: Optional[data_sampler.DataSampler]
+      # Unrecoverable SDK harness initialization error (if any)
+      # that should be reported to the runner when proocessing the first 
bundle.
+      deferred_exception=None, # type: Optional[Exception]
   ):
     # type: (...) -> None
     self._alive = True
     self._worker_index = 0
     self._worker_id = worker_id
     self._state_cache = StateCache(state_cache_size)
+    self._deferred_exception = deferred_exception
     options = [('grpc.max_receive_message_length', -1),
                ('grpc.max_send_message_length', -1)]
     if credentials is None:
@@ -308,6 +311,8 @@ class SdkHarness(object):
 
   def _request_process_bundle(self, request):
     # type: (beam_fn_api_pb2.InstructionRequest) -> None
+    if self._deferred_exception:
+      raise self._deferred_exception
     self._bundle_processor_cache.activate(request.instruction_id)
     self._request_execute(request)
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 84ec49f5262..b643034899d 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -71,6 +71,7 @@ def _import_beam_plugins(plugins):
 def create_harness(environment, dry_run=False):
   """Creates SDK Fn Harness."""
 
+  deferred_exception = None
   if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
     try:
       logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
@@ -114,15 +115,23 @@ def create_harness(environment, dry_run=False):
   if pickle_library != pickler.USE_CLOUDPICKLE:
     try:
       _load_main_session(semi_persistent_directory)
-    except CorruptMainSessionException:
+    except LoadMainSessionException:
       exception_details = traceback.format_exc()
       _LOGGER.error(
           'Could not load main session: %s', exception_details, exc_info=True)
       raise
     except Exception:  # pylint: disable=broad-except
+      summary = (
+          "Could not load main session. Inspect which external dependencies "
+          "are used in the main module of your pipeline. Verify that "
+          "corresponding packages are installed in the pipeline runtime "
+          "environment and their installed versions match the versions used in 
"
+          "pipeline submission environment. For more information, see: 
https://";
+          "beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
+      _LOGGER.error(summary, exc_info=True)
       exception_details = traceback.format_exc()
-      _LOGGER.error(
-          'Could not load main session: %s', exception_details, exc_info=True)
+      deferred_exception = LoadMainSessionException(
+          f"{summary} {exception_details}")
 
   _LOGGER.info(
       'Pipeline_options: %s',
@@ -159,7 +168,8 @@ def create_harness(environment, dry_run=False):
       profiler_factory=profiler.Profile.factory_from_options(
           sdk_pipeline_options.view_as(ProfilingOptions)),
       enable_heap_dump=enable_heap_dump,
-      data_sampler=data_sampler)
+      data_sampler=data_sampler,
+      deferred_exception=deferred_exception)
   return fn_log_handler, sdk_harness, sdk_pipeline_options
 
 
@@ -306,10 +316,9 @@ def _set_log_level_overrides(options_dict: dict) -> None:
           "Error occurred when setting log level for %s: %s", module_name, e)
 
 
-class CorruptMainSessionException(Exception):
+class LoadMainSessionException(Exception):
   """
-  Used to crash this worker if a main session file was provided but
-  is not valid.
+  Used to crash this worker if a main session file failed to load.
   """
   pass
 
@@ -325,7 +334,8 @@ def _load_main_session(semi_persistent_directory):
       # This can happen if the worker fails to download the main session.
       # Raise a fatal error and crash this worker, forcing a restart.
       if os.path.getsize(session_file) == 0:
-        raise CorruptMainSessionException(
+        # Potenitally transient error, unclear if still happening.
+        raise LoadMainSessionException(
             'Session file found, but empty: %s. Functions defined in __main__ '
             '(interactive session) will almost certainly fail.' %
             (session_file, ))

Reply via email to