Repository: beam Updated Branches: refs/heads/master 17a41ab10 -> 132d4c55d
Throw specialized exception in value providers Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee92b964 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee92b964 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee92b964 Branch: refs/heads/master Commit: ee92b9642bb6b6e42bb701ab638c55539163bb69 Parents: 17a41ab Author: Sourabh Bajaj <[email protected]> Authored: Tue Apr 11 13:38:26 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Apr 11 15:32:14 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow/dataflow_runner.py | 6 ++++++ sdks/python/apache_beam/utils/value_provider.py | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee92b964/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 1a935c1..4e81788 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -43,6 +43,7 @@ from apache_beam.runners.runner import PipelineState from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.utils.pipeline_options import StandardOptions +from apache_beam.utils.value_provider import RuntimeValueProviderError class DataflowRunner(PipelineRunner): @@ -476,6 +477,11 @@ class DataflowRunner(PipelineRunner): 'estimated_size_bytes': json_value.get_typed_value_descriptor( transform.source.estimate_size()) } + except RuntimeValueProviderError: + # Size estimation is best effort, and this error is by value provider. + logging.info( + 'Could not estimate size of source %r due to ' + \ + 'RuntimeValueProviderError', transform.source) except Exception: # pylint: disable=broad-except # Size estimation is best effort. So we log the error and continue. logging.info( http://git-wip-us.apache.org/repos/asf/beam/blob/ee92b964/sdks/python/apache_beam/utils/value_provider.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py index a72fc4c..271202d 100644 --- a/sdks/python/apache_beam/utils/value_provider.py +++ b/sdks/python/apache_beam/utils/value_provider.py @@ -22,6 +22,15 @@ and dynamically provided values. from functools import wraps +class RuntimeValueProviderError(RuntimeError): + def __init__(self, msg): + """Class representing the errors thrown during runtime by the valueprovider + Args: + msg: Message string for the exception thrown + """ + super(RuntimeValueProviderError, self).__init__(msg) + + class ValueProvider(object): def is_accessible(self): raise NotImplementedError( @@ -67,7 +76,8 @@ class RuntimeValueProvider(ValueProvider): runtime_options = ( RuntimeValueProvider.runtime_options_map.get(self.options_id)) if runtime_options is None: - raise RuntimeError('%s.get() not called from a runtime context' % self) + raise RuntimeValueProviderError( + '%s.get() not called from a runtime context' % self) candidate = runtime_options.get(self.option_name) if candidate: @@ -104,7 +114,7 @@ def check_accessible(value_provider_list): def _f(self, *args, **kwargs): for obj in [getattr(self, vp) for vp in value_provider_list]: if not obj.is_accessible(): - raise RuntimeError('%s not accessible' % obj) + raise RuntimeValueProviderError('%s not accessible' % obj) return fnc(self, *args, **kwargs) return _f return _check_accessible
