Repository: beam Updated Branches: refs/heads/master 56e4251de -> dba514079
Set the type of batch jobs to FNAPI_BATCH when beam_fn_api experiment is specified. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fa3bfe9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fa3bfe9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fa3bfe9 Branch: refs/heads/master Commit: 1fa3bfe92bc59a85bfcf12c47c68206757ce238a Parents: 56e4251 Author: Valentyn Tymofieiev <[email protected]> Authored: Fri Jul 7 15:14:56 2017 -0700 Committer: Valentyn Tymofieiev <[email protected]> Committed: Fri Jul 7 15:14:56 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/dataflow_runner.py | 16 ++--------- .../runners/dataflow/internal/apiclient.py | 29 ++++++++++++++++++-- .../runners/dataflow/internal/apiclient_test.py | 5 +--- 3 files changed, 29 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1fa3bfe9/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 57bcc5e..059e139 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -46,8 +46,8 @@ from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints -from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TestOptions from apache_beam.utils.plugin import BeamPlugin @@ -65,12 +65,6 @@ class DataflowRunner(PipelineRunner): if blocking is set to False. """ - # Environment version information. It is passed to the service during a - # a job submission and is used by the service to establish what features - # are expected by the workers. - BATCH_ENVIRONMENT_MAJOR_VERSION = '6' - STREAMING_ENVIRONMENT_MAJOR_VERSION = '1' - # A list of PTransformOverride objects to be applied before running a pipeline # using DataflowRunner. # Currently this only works for overrides where the input and output types do @@ -268,15 +262,9 @@ class DataflowRunner(PipelineRunner): if test_options.dry_run: return None - standard_options = pipeline._options.view_as(StandardOptions) - if standard_options.streaming: - job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION - else: - job_version = DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION - # Get a Dataflow API client and set its options self.dataflow_client = apiclient.DataflowApplicationClient( - pipeline._options, job_version) + pipeline._options) # Create the job result = DataflowPipelineResult( http://git-wip-us.apache.org/repos/asf/beam/blob/1fa3bfe9/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index edac9d7..33dfe19 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -49,6 +49,13 @@ from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import WorkerOptions +# Environment version information. It is passed to the service during a +# a job submission and is used by the service to establish what features +# are expected by the workers. +_LEGACY_ENVIRONMENT_MAJOR_VERSION = '6' +_FNAPI_ENVIRONMENT_MAJOR_VERSION = '1' + + class Step(object): """Wrapper for a dataflow Step protobuf.""" @@ -146,7 +153,10 @@ class Environment(object): if self.standard_options.streaming: job_type = 'FNAPI_STREAMING' else: - job_type = 'PYTHON_BATCH' + if _use_fnapi(options): + job_type = 'FNAPI_BATCH' + else: + job_type = 'PYTHON_BATCH' self.proto.version.additionalProperties.extend([ dataflow.Environment.VersionValue.AdditionalProperty( key='job_type', @@ -360,11 +370,16 @@ class Job(object): class DataflowApplicationClient(object): """A Dataflow API client used by application code to create and query jobs.""" - def __init__(self, options, environment_version): + def __init__(self, options): """Initializes a Dataflow API client object.""" self.standard_options = options.view_as(StandardOptions) self.google_cloud_options = options.view_as(GoogleCloudOptions) - self.environment_version = environment_version + + if _use_fnapi(options): + self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION + else: + self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION + if self.google_cloud_options.no_auth: credentials = None else: @@ -706,6 +721,14 @@ def translate_mean(accumulator, metric_update): metric_update.kind = None +def _use_fnapi(pipeline_options): + standard_options = pipeline_options.view_as(StandardOptions) + debug_options = pipeline_options.view_as(DebugOptions) + + return standard_options.streaming or ( + debug_options.experiments and 'beam_fn_api' in debug_options.experiments) + + # To enable a counter on the service, add it to this dictionary. metric_translations = { cy_combiners.CountCombineFn: ('sum', translate_scalar), http://git-wip-us.apache.org/repos/asf/beam/blob/1fa3bfe9/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 55211f7..407ffcf 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -22,7 +22,6 @@ from mock import Mock from apache_beam.metrics.cells import DistributionData from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner from apache_beam.runners.dataflow.internal.clients import dataflow # Protect against environments where apitools library is not available. @@ -40,9 +39,7 @@ class UtilTest(unittest.TestCase): @unittest.skip("Enable once BEAM-1080 is fixed.") def test_create_application_client(self): pipeline_options = PipelineOptions() - apiclient.DataflowApplicationClient( - pipeline_options, - DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION) + apiclient.DataflowApplicationClient(pipeline_options) def test_set_network(self): pipeline_options = PipelineOptions(
