Repository: beam Updated Branches: refs/heads/master aa899e4ce -> 524165ac9
Remove _unused_options_id Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ab6f398 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ab6f398 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ab6f398 Branch: refs/heads/master Commit: 2ab6f398401e592a72b6317d9247e1da7ee90cae Parents: aa899e4 Author: Maria Garcia Herrero <[email protected]> Authored: Thu Apr 20 00:20:28 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Thu Apr 20 16:56:57 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow/internal/apiclient.py | 1 - sdks/python/apache_beam/runners/direct/direct_runner.py | 8 +++----- sdks/python/apache_beam/utils/value_provider.py | 2 +- sdks/python/apache_beam/utils/value_provider_test.py | 4 ++-- 4 files changed, 6 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2ab6f398/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 50f9ff4..8d44dff 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -233,7 +233,6 @@ class Environment(object): options_dict = {k: v for k, v in sdk_pipeline_options.iteritems() if v is not None} - options_dict['_options_id'] = 0 # TODO(BEAM-1999): Remove. self.proto.sdkPipelineOptions.additionalProperties.append( dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( key='options', value=to_json_value(options_dict))) http://git-wip-us.apache.org/repos/asf/beam/blob/2ab6f398/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index d8d8cb9..cd0447f 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -85,13 +85,11 @@ class DirectRunner(PipelineRunner): executor = Executor(self.consumer_tracking_visitor.value_to_consumers, TransformEvaluatorRegistry(evaluation_context), evaluation_context) + # DirectRunner does not support injecting + # PipelineOptions values at runtime + RuntimeValueProvider.set_runtime_options({}) # Start the executor. This is a non-blocking call, it will start the # execution in background threads and return. - - if pipeline.options: - # DirectRunner does not support RuntimeValueProviders. - RuntimeValueProvider.set_runtime_options(None, {}) - executor.start(self.consumer_tracking_visitor.root_transforms) result = DirectPipelineResult(executor, evaluation_context) http://git-wip-us.apache.org/repos/asf/beam/blob/2ab6f398/sdks/python/apache_beam/utils/value_provider.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py index 235d257..c00d7bc 100644 --- a/sdks/python/apache_beam/utils/value_provider.py +++ b/sdks/python/apache_beam/utils/value_provider.py @@ -76,7 +76,7 @@ class RuntimeValueProvider(ValueProvider): # TODO(BEAM-1999): Remove _unused_options_id @classmethod - def set_runtime_options(cls, _unused_options_id, pipeline_options): + def set_runtime_options(cls, pipeline_options): RuntimeValueProvider.runtime_options = pipeline_options def __str__(self): http://git-wip-us.apache.org/repos/asf/beam/blob/2ab6f398/sdks/python/apache_beam/utils/value_provider_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider_test.py b/sdks/python/apache_beam/utils/value_provider_test.py index 0411dcc..1b66dd4 100644 --- a/sdks/python/apache_beam/utils/value_provider_test.py +++ b/sdks/python/apache_beam/utils/value_provider_test.py @@ -131,8 +131,8 @@ class ValueProviderTests(unittest.TestCase): # provide values at job-execution time # (options not provided here will use their default, if they have one) - RuntimeValueProvider.set_runtime_options( - None, {'vp_arg': 'abc', 'vp_pos_arg':'3.2'}) + RuntimeValueProvider.set_runtime_options({'vp_arg': 'abc', + 'vp_pos_arg':'3.2'}) self.assertTrue(options.vp_arg.is_accessible()) self.assertEqual(options.vp_arg.get(), 'abc') self.assertTrue(options.vp_arg2.is_accessible())
