soft-enable the use of streaming flag
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51139509 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51139509 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51139509 Branch: refs/heads/DSL_SQL Commit: 51139509b65b5fa04a39c31f584a02f1a29170dc Parents: e3139a3 Author: Ahmet Altay <[email protected]> Authored: Tue Jun 6 13:34:09 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Jun 6 13:56:57 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/options/pipeline_options.py | 5 +++-- .../apache_beam/options/pipeline_options_validator_test.py | 8 -------- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 2 +- .../apache_beam/runners/dataflow/internal/apiclient.py | 2 +- 4 files changed, 5 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 777926a..daef3a7 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -18,6 +18,7 @@ """Pipeline options obtained from command line parsing.""" import argparse +import warnings from apache_beam.transforms.display import HasDisplayData from apache_beam.options.value_provider import StaticValueProvider @@ -278,12 +279,12 @@ class StandardOptions(PipelineOptions): action='store_true', help='Whether to enable streaming mode.') - # TODO(BEAM-1265): Remove this error, once at least one runner supports + # TODO(BEAM-1265): Remove this warning, once at least one runner supports # streaming pipelines. def validate(self, validator): errors = [] if self.view_as(StandardOptions).streaming: - errors.append('Streaming pipelines are not supported.') + warnings.warn('Streaming pipelines are not supported.') return errors http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/options/pipeline_options_validator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py index 28fcbe3..97834cc 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py @@ -300,14 +300,6 @@ class SetupTest(unittest.TestCase): errors = validator.validate() self.assertFalse(errors) - def test_streaming(self): - pipeline_options = PipelineOptions(['--streaming']) - runner = MockRunners.TestDataflowRunner() - validator = PipelineOptionsValidator(pipeline_options, runner) - errors = validator.validate() - - self.assertIn('Streaming pipelines are not supported.', errors) - def test_test_matcher(self): def get_validator(matcher): options = ['--project=example:example', http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 3e0e268..62cea33 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -64,7 +64,7 @@ class DataflowRunner(PipelineRunner): # a job submission and is used by the service to establish what features # are expected by the workers. BATCH_ENVIRONMENT_MAJOR_VERSION = '6' - STREAMING_ENVIRONMENT_MAJOR_VERSION = '0' + STREAMING_ENVIRONMENT_MAJOR_VERSION = '1' def __init__(self, cache=None): # Cache of CloudWorkflowStep protos generated while the runner http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index bfdd5e4..df1a3f2 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -145,7 +145,7 @@ class Environment(object): # Version information. self.proto.version = dataflow.Environment.VersionValue() if self.standard_options.streaming: - job_type = 'PYTHON_STREAMING' + job_type = 'FNAPI_STREAMING' else: job_type = 'PYTHON_BATCH' self.proto.version.additionalProperties.extend([
