This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2011cfcc63a Fail when main session cannot be loaded. (#25617)
2011cfcc63a is described below
commit 2011cfcc63a83a8ec907052fee4abd9f533f67b5
Author: tvalentyn <[email protected]>
AuthorDate: Mon Mar 6 19:14:37 2023 -0800
Fail when main session cannot be loaded. (#25617)
---
CHANGES.md | 14 +-----------
.../apache_beam/runners/worker/sdk_worker.py | 7 +++++-
.../apache_beam/runners/worker/sdk_worker_main.py | 26 +++++++++++++++-------
3 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d36fb2ffaa7..9437d489916 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -64,11 +64,10 @@
## New Features / Improvements
* The Flink runner now supports Flink 1.16.x
([#25046](https://github.com/apache/beam/issues/25046)).
-* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
## Breaking Changes
-* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* If a main session fails to load, the pipeline will now fail at worker
startup. ([#25401](https://github.com/apache/beam/issues/25401)).
## Deprecations
@@ -97,7 +96,6 @@
## I/Os
-* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* Added in JmsIO a retry policy for failed publications (Java)
([#24971](https://github.com/apache/beam/issues/24971)).
* Support for `LZMA` compression/decompression of text files added to the
Python SDK ([#25316](https://github.com/apache/beam/issues/25316))
* Added ReadFrom/WriteTo Csv/Json as top-level transforms to the Python SDK.
@@ -139,11 +137,6 @@
# [2.45.0] - 2023-02-15
-## Highlights
-
-* New highly anticipated feature X added to Python SDK
([#X](https://github.com/apache/beam/issues/X)).
-* New highly anticipated feature Y added to Java SDK
([#Y](https://github.com/apache/beam/issues/Y)).
-
## I/Os
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
@@ -174,11 +167,6 @@
# [2.44.0] - 2023-01-12
-## Highlights
-
-* New highly anticipated feature X added to Python SDK
([#X](https://github.com/apache/beam/issues/X)).
-* New highly anticipated feature Y added to Java SDK
([#Y](https://github.com/apache/beam/issues/Y)).
-
## I/Os
* Support for Bigtable sink (Write and WriteBatch) added (Go)
([#23324](https://github.com/apache/beam/issues/23324)).
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index f53a8d9ec99..88beabd33ab 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -80,7 +80,6 @@ _VT = TypeVar('_VT')
_LOGGER = logging.getLogger(__name__)
DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60
-
# The number of ProcessBundleRequest instruction ids the BundleProcessorCache
# will remember for not running instructions.
MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000
@@ -172,12 +171,16 @@ class SdkHarness(object):
# Heap dump through status api is disabled by default
enable_heap_dump=False, # type: bool
data_sampler=None, # type: Optional[data_sampler.DataSampler]
+ # Unrecoverable SDK harness initialization error (if any)
+ # that should be reported to the runner when proocessing the first
bundle.
+ deferred_exception=None, # type: Optional[Exception]
):
# type: (...) -> None
self._alive = True
self._worker_index = 0
self._worker_id = worker_id
self._state_cache = StateCache(state_cache_size)
+ self._deferred_exception = deferred_exception
options = [('grpc.max_receive_message_length', -1),
('grpc.max_send_message_length', -1)]
if credentials is None:
@@ -308,6 +311,8 @@ class SdkHarness(object):
def _request_process_bundle(self, request):
# type: (beam_fn_api_pb2.InstructionRequest) -> None
+ if self._deferred_exception:
+ raise self._deferred_exception
self._bundle_processor_cache.activate(request.instruction_id)
self._request_execute(request)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 84ec49f5262..b643034899d 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -71,6 +71,7 @@ def _import_beam_plugins(plugins):
def create_harness(environment, dry_run=False):
"""Creates SDK Fn Harness."""
+ deferred_exception = None
if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
try:
logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
@@ -114,15 +115,23 @@ def create_harness(environment, dry_run=False):
if pickle_library != pickler.USE_CLOUDPICKLE:
try:
_load_main_session(semi_persistent_directory)
- except CorruptMainSessionException:
+ except LoadMainSessionException:
exception_details = traceback.format_exc()
_LOGGER.error(
'Could not load main session: %s', exception_details, exc_info=True)
raise
except Exception: # pylint: disable=broad-except
+ summary = (
+ "Could not load main session. Inspect which external dependencies "
+ "are used in the main module of your pipeline. Verify that "
+ "corresponding packages are installed in the pipeline runtime "
+ "environment and their installed versions match the versions used in
"
+ "pipeline submission environment. For more information, see:
https://"
+ "beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
+ _LOGGER.error(summary, exc_info=True)
exception_details = traceback.format_exc()
- _LOGGER.error(
- 'Could not load main session: %s', exception_details, exc_info=True)
+ deferred_exception = LoadMainSessionException(
+ f"{summary} {exception_details}")
_LOGGER.info(
'Pipeline_options: %s',
@@ -159,7 +168,8 @@ def create_harness(environment, dry_run=False):
profiler_factory=profiler.Profile.factory_from_options(
sdk_pipeline_options.view_as(ProfilingOptions)),
enable_heap_dump=enable_heap_dump,
- data_sampler=data_sampler)
+ data_sampler=data_sampler,
+ deferred_exception=deferred_exception)
return fn_log_handler, sdk_harness, sdk_pipeline_options
@@ -306,10 +316,9 @@ def _set_log_level_overrides(options_dict: dict) -> None:
"Error occurred when setting log level for %s: %s", module_name, e)
-class CorruptMainSessionException(Exception):
+class LoadMainSessionException(Exception):
"""
- Used to crash this worker if a main session file was provided but
- is not valid.
+ Used to crash this worker if a main session file failed to load.
"""
pass
@@ -325,7 +334,8 @@ def _load_main_session(semi_persistent_directory):
# This can happen if the worker fails to download the main session.
# Raise a fatal error and crash this worker, forcing a restart.
if os.path.getsize(session_file) == 0:
- raise CorruptMainSessionException(
+ # Potenitally transient error, unclear if still happening.
+ raise LoadMainSessionException(
'Session file found, but empty: %s. Functions defined in __main__ '
'(interactive session) will almost certainly fail.' %
(session_file, ))