Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 762a2930a -> 3b6950689


Accept runners by fully qualified name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fe8954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fe8954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fe8954

Branch: refs/heads/python-sdk
Commit: 84fe8954669ef9a30448d140b5a578c14b863819
Parents: 762a293
Author: Robert Bradshaw <rober...@google.com>
Authored: Thu Jul 14 10:53:09 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Thu Jul 14 14:15:59 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner.py      | 28 ++++++++++-----------
 sdks/python/apache_beam/runners/runner_test.py |  2 +-
 2 files changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index 55b63f3..98f9758 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -41,26 +41,24 @@ def create_runner(runner_name):
     RuntimeError: if an invalid runner name is used.
   """
   # pylint: disable=wrong-import-order, wrong-import-position
-  if runner_name == 'DirectPipelineRunner':
-    import apache_beam.runners.direct_runner
-    return apache_beam.runners.direct_runner.DirectPipelineRunner()
-  if runner_name == 'DiskCachedPipelineRunner':
-    import apache_beam.runners.direct_runner
-    return apache_beam.runners.direct_runner.DiskCachedPipelineRunner(
-    )
-  if runner_name == 'EagerPipelineRunner':
-    import apache_beam.runners.direct_runner
-    return apache_beam.runners.direct_runner.EagerPipelineRunner()
-  elif runner_name in ('DataflowPipelineRunner',
-                       'BlockingDataflowPipelineRunner'):
+  if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
+                     'EagerPipelineRunner'):
+    runner_name = 'apache_beam.runners.direct_runner.' + runner_name
+
+  if runner_name in ('DataflowPipelineRunner',
+                     'BlockingDataflowPipelineRunner'):
     import apache_beam.runners.dataflow_runner
     return apache_beam.runners.dataflow_runner.DataflowPipelineRunner(
         blocking=runner_name == 'BlockingDataflowPipelineRunner')
+  elif '.' in runner_name:
+    module, runner = runner_name.rsplit('.', 1)
+    return getattr(__import__(module, {}, {}, [runner], -1), runner)()
   else:
-    raise RuntimeError(
+    raise ValueError(
         'Unexpected pipeline runner: %s. Valid values are '
-        'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or 
'
-        'BlockingDataflowPipelineRunner.' % runner_name)
+        'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, '
+        'BlockingDataflowPipelineRunner or the fully qualified name of '
+        'a PipelineRunner subclass.' % runner_name)
 
 
 class PipelineRunner(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index 20a7259..d2e70d7 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -44,7 +44,7 @@ class RunnerTest(unittest.TestCase):
     self.assertTrue(
         isinstance(create_runner('BlockingDataflowPipelineRunner'),
                    DataflowPipelineRunner))
-    self.assertRaises(RuntimeError, create_runner, 'xyz')
+    self.assertRaises(ValueError, create_runner, 'xyz')
 
   def test_remote_runner_translation(self):
     remote_runner = DataflowPipelineRunner()

Reply via email to