Repository: beam Updated Branches: refs/heads/master 3076c8695 -> eb2e6aa4f
Revert "Revert "Throw specialized exception in value providers"" This reverts commit 4854291fe38df7b624446311956d0ae0677f6b8c. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e99a3940 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e99a3940 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e99a3940 Branch: refs/heads/master Commit: e99a3940b30ec5aed8bb2ef556aad84ed5c7147c Parents: f654a79 Author: Ahmet Altay <[email protected]> Authored: Fri Apr 14 14:55:50 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Apr 18 15:30:05 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow/dataflow_runner.py | 6 ++++++ sdks/python/apache_beam/utils/value_provider.py | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e99a3940/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index bdbd2dd..24c0d6b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -43,6 +43,7 @@ from apache_beam.runners.runner import PipelineState from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.utils.pipeline_options import StandardOptions +from apache_beam.utils.value_provider import RuntimeValueProviderError class DataflowRunner(PipelineRunner): @@ -479,6 +480,11 @@ class DataflowRunner(PipelineRunner): 'estimated_size_bytes': json_value.get_typed_value_descriptor( transform.source.estimate_size()) } + except RuntimeValueProviderError: + # Size estimation is best effort, and this error is by value provider. + logging.info( + 'Could not estimate size of source %r due to ' + \ + 'RuntimeValueProviderError', transform.source) except Exception: # pylint: disable=broad-except # Size estimation is best effort. So we log the error and continue. logging.info( http://git-wip-us.apache.org/repos/asf/beam/blob/e99a3940/sdks/python/apache_beam/utils/value_provider.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py index a72fc4c..271202d 100644 --- a/sdks/python/apache_beam/utils/value_provider.py +++ b/sdks/python/apache_beam/utils/value_provider.py @@ -22,6 +22,15 @@ and dynamically provided values. from functools import wraps +class RuntimeValueProviderError(RuntimeError): + def __init__(self, msg): + """Class representing the errors thrown during runtime by the valueprovider + Args: + msg: Message string for the exception thrown + """ + super(RuntimeValueProviderError, self).__init__(msg) + + class ValueProvider(object): def is_accessible(self): raise NotImplementedError( @@ -67,7 +76,8 @@ class RuntimeValueProvider(ValueProvider): runtime_options = ( RuntimeValueProvider.runtime_options_map.get(self.options_id)) if runtime_options is None: - raise RuntimeError('%s.get() not called from a runtime context' % self) + raise RuntimeValueProviderError( + '%s.get() not called from a runtime context' % self) candidate = runtime_options.get(self.option_name) if candidate: @@ -104,7 +114,7 @@ def check_accessible(value_provider_list): def _f(self, *args, **kwargs): for obj in [getattr(self, vp) for vp in value_provider_list]: if not obj.is_accessible(): - raise RuntimeError('%s not accessible' % obj) + raise RuntimeValueProviderError('%s not accessible' % obj) return fnc(self, *args, **kwargs) return _f return _check_accessible
