This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c679de6 [BEAM-9637] Add all runners to Python --runner help text.
new 0350f2e Merge pull request #13584 from ibzib/BEAM-9637
c679de6 is described below
commit c679de69d26de7472ab50fd4704cde24281205d7
Author: Kyle Weaver <[email protected]>
AuthorDate: Fri Dec 18 12:41:23 2020 -0800
[BEAM-9637] Add all runners to Python --runner help text.
---
.../python/apache_beam/options/pipeline_options.py | 19 ++++++++++++++++-
sdks/python/apache_beam/runners/runner.py | 24 +++++++---------------
2 files changed, 25 insertions(+), 18 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index c169586..0effcf6 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -414,13 +414,30 @@ class StandardOptions(PipelineOptions):
DEFAULT_RUNNER = 'DirectRunner'
+ ALL_KNOWN_RUNNERS = (
+ 'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner',
+ 'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner',
+ 'apache_beam.runners.direct.direct_runner.DirectRunner',
+ 'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner',
+ 'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
+ 'apache_beam.runners.portability.flink_runner.FlinkRunner',
+ 'apache_beam.runners.portability.portable_runner.PortableRunner',
+ 'apache_beam.runners.portability.spark_runner.SparkRunner',
+ 'apache_beam.runners.test.TestDirectRunner',
+ 'apache_beam.runners.test.TestDataflowRunner',
+ )
+
+ KNOWN_RUNNER_NAMES = [path.split('.')[-1] for path in ALL_KNOWN_RUNNERS]
+
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--runner',
help=(
'Pipeline runner used to execute the workflow. Valid values are '
- 'DirectRunner, DataflowRunner.'))
+ 'one of %s, or the fully qualified name of a PipelineRunner '
+ 'subclass. If unspecified, defaults to %s.' %
+ (', '.join(cls.KNOWN_RUNNER_NAMES), cls.DEFAULT_RUNNER)))
# Whether to enable streaming mode.
parser.add_argument(
'--streaming',
diff --git a/sdks/python/apache_beam/runners/runner.py
b/sdks/python/apache_beam/runners/runner.py
index ba80b27..02ed845 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -31,6 +31,8 @@ from builtins import object
from typing import TYPE_CHECKING
from typing import Optional
+from apache_beam.options.pipeline_options import StandardOptions
+
if TYPE_CHECKING:
from apache_beam import pvalue
from apache_beam import PTransform
@@ -41,22 +43,10 @@ if TYPE_CHECKING:
__all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult']
-_ALL_KNOWN_RUNNERS = (
- 'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner',
- 'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner',
- 'apache_beam.runners.direct.direct_runner.DirectRunner',
- 'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner',
- 'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
- 'apache_beam.runners.portability.flink_runner.FlinkRunner',
- 'apache_beam.runners.portability.portable_runner.PortableRunner',
- 'apache_beam.runners.portability.spark_runner.SparkRunner',
- 'apache_beam.runners.test.TestDirectRunner',
- 'apache_beam.runners.test.TestDataflowRunner',
-)
-
-_KNOWN_RUNNER_NAMES = [path.split('.')[-1] for path in _ALL_KNOWN_RUNNERS]
-
-_RUNNER_MAP = {path.split('.')[-1].lower(): path for path in
_ALL_KNOWN_RUNNERS}
+_RUNNER_MAP = {
+ path.split('.')[-1].lower(): path
+ for path in StandardOptions.ALL_KNOWN_RUNNERS
+}
# Allow this alias, but don't make public.
_RUNNER_MAP['pythonrpcdirectrunner'] = (
@@ -110,7 +100,7 @@ def create_runner(runner_name):
raise ValueError(
'Unexpected pipeline runner: %s. Valid values are %s '
'or the fully qualified name of a PipelineRunner subclass.' %
- (runner_name, ', '.join(_KNOWN_RUNNER_NAMES)))
+ (runner_name, ', '.join(StandardOptions.KNOWN_RUNNER_NAMES)))
class PipelineRunner(object):