Repository: incubator-beam Updated Branches: refs/heads/python-sdk 762a2930a -> 3b6950689
Accept runners by fully qualified name. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fe8954 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fe8954 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fe8954 Branch: refs/heads/python-sdk Commit: 84fe8954669ef9a30448d140b5a578c14b863819 Parents: 762a293 Author: Robert Bradshaw <rober...@google.com> Authored: Thu Jul 14 10:53:09 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Jul 14 14:15:59 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/runner.py | 28 ++++++++++----------- sdks/python/apache_beam/runners/runner_test.py | 2 +- 2 files changed, 14 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 55b63f3..98f9758 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -41,26 +41,24 @@ def create_runner(runner_name): RuntimeError: if an invalid runner name is used. """ # pylint: disable=wrong-import-order, wrong-import-position - if runner_name == 'DirectPipelineRunner': - import apache_beam.runners.direct_runner - return apache_beam.runners.direct_runner.DirectPipelineRunner() - if runner_name == 'DiskCachedPipelineRunner': - import apache_beam.runners.direct_runner - return apache_beam.runners.direct_runner.DiskCachedPipelineRunner( - ) - if runner_name == 'EagerPipelineRunner': - import apache_beam.runners.direct_runner - return apache_beam.runners.direct_runner.EagerPipelineRunner() - elif runner_name in ('DataflowPipelineRunner', - 'BlockingDataflowPipelineRunner'): + if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner', + 'EagerPipelineRunner'): + runner_name = 'apache_beam.runners.direct_runner.' + runner_name + + if runner_name in ('DataflowPipelineRunner', + 'BlockingDataflowPipelineRunner'): import apache_beam.runners.dataflow_runner return apache_beam.runners.dataflow_runner.DataflowPipelineRunner( blocking=runner_name == 'BlockingDataflowPipelineRunner') + elif '.' in runner_name: + module, runner = runner_name.rsplit('.', 1) + return getattr(__import__(module, {}, {}, [runner], -1), runner)() else: - raise RuntimeError( + raise ValueError( 'Unexpected pipeline runner: %s. Valid values are ' - 'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or ' - 'BlockingDataflowPipelineRunner.' % runner_name) + 'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, ' + 'BlockingDataflowPipelineRunner or the fully qualified name of ' + 'a PipelineRunner subclass.' % runner_name) class PipelineRunner(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 20a7259..d2e70d7 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -44,7 +44,7 @@ class RunnerTest(unittest.TestCase): self.assertTrue( isinstance(create_runner('BlockingDataflowPipelineRunner'), DataflowPipelineRunner)) - self.assertRaises(RuntimeError, create_runner, 'xyz') + self.assertRaises(ValueError, create_runner, 'xyz') def test_remote_runner_translation(self): remote_runner = DataflowPipelineRunner()