Repository: beam
Updated Branches:
  refs/heads/master 51ef0009a -> f124081f5


Remove blocking dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c2c08dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c2c08dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c2c08dc

Branch: refs/heads/master
Commit: 9c2c08dc41971a0ca18faac77493baf886f3efa3
Parents: 51ef000
Author: Sourabh Bajaj <[email protected]>
Authored: Wed Feb 8 16:55:01 2017 -0800
Committer: Ahmet Altay <[email protected]>
Committed: Fri Feb 10 13:28:43 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py   | 15 ++-------------
 sdks/python/apache_beam/runners/runner.py            |  7 +------
 sdks/python/apache_beam/runners/runner_test.py       |  6 ------
 .../apache_beam/utils/pipeline_options_validator.py  |  1 -
 4 files changed, 3 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow_runner.py
index f02e24b..a48f392 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -46,10 +46,6 @@ from apache_beam.utils.pipeline_options import 
StandardOptions
 from apache_beam.internal.clients import dataflow as dataflow_api
 
 
-def BlockingDataflowRunner(*args, **kwargs):
-  return DataflowRunner(*args, blocking=True, **kwargs)
-
-
 class DataflowRunner(PipelineRunner):
   """A runner that creates job graphs and submits them for remote execution.
 
@@ -66,12 +62,10 @@ class DataflowRunner(PipelineRunner):
   BATCH_ENVIRONMENT_MAJOR_VERSION = '5'
   STREAMING_ENVIRONMENT_MAJOR_VERSION = '0'
 
-  def __init__(self, cache=None, blocking=False):
+  def __init__(self, cache=None):
     # Cache of CloudWorkflowStep protos generated while the runner
     # "executes" a pipeline.
     self._cache = cache if cache is not None else PValueCache()
-    self.blocking = blocking
-    self.result = None
     self._unique_step_id = 0
 
   def _get_unique_step_name(self):
@@ -177,14 +171,9 @@ class DataflowRunner(PipelineRunner):
         pipeline.options, job_version)
 
     # Create the job
-    self.result = DataflowPipelineResult(
+    return DataflowPipelineResult(
         self.dataflow_client.create_job(self.job), self)
 
-    if self.result.has_job and self.blocking:
-      self.result.wait_until_finish()
-
-    return self.result
-
   def _get_typehint_based_encoding(self, typehint, window_coder):
     """Returns an encoding based on a typehint object."""
     return self._get_cloud_encoding(self._get_coder(typehint,

http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index e14acb1..8cbcf65 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -27,7 +27,7 @@ import tempfile
 
 
 _KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner')
-_KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner', 'BlockingDataflowRunner')
+_KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',)
 _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
 _ALL_KNOWN_RUNNERS = (
     _KNOWN_DIRECT_RUNNERS + _KNOWN_DATAFLOW_RUNNERS + _KNOWN_TEST_RUNNERS)
@@ -55,11 +55,6 @@ def create_runner(runner_name):
           '%s is deprecated, use %s instead.', runner_name, new_runner_name)
       runner_name = new_runner_name
 
-  # TODO(BEAM-759): Remove when all BlockingDataflowRunner references are gone.
-  if runner_name == 'BlockingDataflowRunner':
-    logging.warning(
-        'BlockingDataflowRunner is deprecated, use DataflowRunner instead.')
-
   if runner_name in _KNOWN_DIRECT_RUNNERS:
     runner_name = 'apache_beam.runners.direct.direct_runner.' + runner_name
   elif runner_name in _KNOWN_DATAFLOW_RUNNERS:

http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index 5c652a9..88807b8 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -63,9 +63,6 @@ class RunnerTest(unittest.TestCase):
         isinstance(create_runner('DataflowRunner'),
                    DataflowRunner))
     self.assertTrue(
-        isinstance(create_runner('BlockingDataflowRunner'),
-                   DataflowRunner))
-    self.assertTrue(
         isinstance(create_runner('TestDataflowRunner'),
                    TestDataflowRunner))
     self.assertRaises(ValueError, create_runner, 'xyz')
@@ -75,9 +72,6 @@ class RunnerTest(unittest.TestCase):
     self.assertTrue(
         isinstance(create_runner('DataflowPipelineRunner'),
                    DataflowRunner))
-    self.assertTrue(
-        isinstance(create_runner('BlockingDataflowPipelineRunner'),
-                   DataflowRunner))
 
   def test_remote_runner_translation(self):
     remote_runner = DataflowRunner()

http://git-wip-us.apache.org/repos/asf/beam/blob/9c2c08dc/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py 
b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index 85fdc4d..d5dc43d 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -106,7 +106,6 @@ class PipelineOptionsValidator(object):
     is_service_runner = (self.runner is not None and
                          type(self.runner).__name__ in [
                              'DataflowRunner',
-                             'BlockingDataflowRunner',
                              'TestDataflowRunner'])
 
     dataflow_endpoint = (

Reply via email to