Repository: beam Updated Branches: refs/heads/master 51ef0009a -> f124081f5
Remove blocking dataflow runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c2c08dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c2c08dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c2c08dc Branch: refs/heads/master Commit: 9c2c08dc41971a0ca18faac77493baf886f3efa3 Parents: 51ef000 Author: Sourabh Bajaj <[email protected]> Authored: Wed Feb 8 16:55:01 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Fri Feb 10 13:28:43 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/dataflow_runner.py | 15 ++------------- sdks/python/apache_beam/runners/runner.py | 7 +------ sdks/python/apache_beam/runners/runner_test.py | 6 ------ .../apache_beam/utils/pipeline_options_validator.py | 1 - 4 files changed, 3 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index f02e24b..a48f392 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -46,10 +46,6 @@ from apache_beam.utils.pipeline_options import StandardOptions from apache_beam.internal.clients import dataflow as dataflow_api -def BlockingDataflowRunner(*args, **kwargs): - return DataflowRunner(*args, blocking=True, **kwargs) - - class DataflowRunner(PipelineRunner): """A runner that creates job graphs and submits them for remote execution. @@ -66,12 +62,10 @@ class DataflowRunner(PipelineRunner): BATCH_ENVIRONMENT_MAJOR_VERSION = '5' STREAMING_ENVIRONMENT_MAJOR_VERSION = '0' - def __init__(self, cache=None, blocking=False): + def __init__(self, cache=None): # Cache of CloudWorkflowStep protos generated while the runner # "executes" a pipeline. self._cache = cache if cache is not None else PValueCache() - self.blocking = blocking - self.result = None self._unique_step_id = 0 def _get_unique_step_name(self): @@ -177,14 +171,9 @@ class DataflowRunner(PipelineRunner): pipeline.options, job_version) # Create the job - self.result = DataflowPipelineResult( + return DataflowPipelineResult( self.dataflow_client.create_job(self.job), self) - if self.result.has_job and self.blocking: - self.result.wait_until_finish() - - return self.result - def _get_typehint_based_encoding(self, typehint, window_coder): """Returns an encoding based on a typehint object.""" return self._get_cloud_encoding(self._get_coder(typehint, http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index e14acb1..8cbcf65 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -27,7 +27,7 @@ import tempfile _KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner') -_KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner', 'BlockingDataflowRunner') +_KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',) _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',) _ALL_KNOWN_RUNNERS = ( _KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS + _KNOWN_TEST_RUNNERS) @@ -55,11 +55,6 @@ def create_runner(runner_name): '%s is deprecated, use %s instead.', runner_name, new_runner_name) runner_name = new_runner_name - # TODO(BEAM-759): Remove when all BlockingDataflowRunner references are gone. - if runner_name == 'BlockingDataflowRunner': - logging.warning( - 'BlockingDataflowRunner is deprecated, use DataflowRunner instead.') - if runner_name in _KNOWN_DIRECT_RUNNERS: runner_name = 'apache_beam.runners.direct.direct_runner.' + runner_name elif runner_name in _KNOWN_DATAFLOW_RUNNERS: http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 5c652a9..88807b8 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -63,9 +63,6 @@ class RunnerTest(unittest.TestCase): isinstance(create_runner('DataflowRunner'), DataflowRunner)) self.assertTrue( - isinstance(create_runner('BlockingDataflowRunner'), - DataflowRunner)) - self.assertTrue( isinstance(create_runner('TestDataflowRunner'), TestDataflowRunner)) self.assertRaises(ValueError, create_runner, 'xyz') @@ -75,9 +72,6 @@ class RunnerTest(unittest.TestCase): self.assertTrue( isinstance(create_runner('DataflowPipelineRunner'), DataflowRunner)) - self.assertTrue( - isinstance(create_runner('BlockingDataflowPipelineRunner'), - DataflowRunner)) def test_remote_runner_translation(self): remote_runner = DataflowRunner() http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/utils/pipeline_options_validator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py index 85fdc4d..d5dc43d 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_validator.py +++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py @@ -106,7 +106,6 @@ class PipelineOptionsValidator(object): is_service_runner = (self.runner is not None and type(self.runner).__name__ in [ 'DataflowRunner', - 'BlockingDataflowRunner', 'TestDataflowRunner']) dataflow_endpoint = (
