http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py 
b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py
deleted file mode 100644
index 4dda47a..0000000
--- a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py
+++ /dev/null
@@ -1,124 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from hamcrest.core.base_matcher import BaseMatcher
-
-
-IGNORED = object()
-
-
-class MetricStructuredNameMatcher(BaseMatcher):
-  """Matches a MetricStructuredName."""
-  def __init__(self,
-               name=IGNORED,
-               origin=IGNORED,
-               context=IGNORED):
-    """Creates a MetricsStructuredNameMatcher.
-
-    Any property not passed in to the constructor will be ignored when 
matching.
-
-    Args:
-      name: A string with the metric name.
-      origin: A string with the metric namespace.
-      context: A key:value dictionary that will be matched to the
-        structured name.
-    """
-    if context != IGNORED and not isinstance(context, dict):
-      raise ValueError('context must be a Python dictionary.')
-
-    self.name = name
-    self.origin = origin
-    self.context = context
-
-  def _matches(self, item):
-    if self.name != IGNORED and item.name != self.name:
-      return False
-    if self.origin != IGNORED and item.origin != self.origin:
-      return False
-    if self.context != IGNORED:
-      for key, name in self.context.iteritems():
-        if key not in item.context:
-          return False
-        if name != IGNORED and item.context[key] != name:
-          return False
-    return True
-
-  def describe_to(self, description):
-    descriptors = []
-    if self.name != IGNORED:
-      descriptors.append('name is {}'.format(self.name))
-    if self.origin != IGNORED:
-      descriptors.append('origin is {}'.format(self.origin))
-    if self.context != IGNORED:
-      descriptors.append('context is ({})'.format(str(self.context)))
-
-    item_description = ' and '.join(descriptors)
-    description.append(item_description)
-
-
-class MetricUpdateMatcher(BaseMatcher):
-  """Matches a metrics update protocol buffer."""
-  def __init__(self,
-               cumulative=IGNORED,
-               name=IGNORED,
-               scalar=IGNORED,
-               kind=IGNORED):
-    """Creates a MetricUpdateMatcher.
-
-    Any property not passed in to the constructor will be ignored when 
matching.
-
-    Args:
-      cumulative: A boolean.
-      name: A MetricStructuredNameMatcher object that matches the name.
-      scalar: An integer with the metric update.
-      kind: A string defining the kind of counter.
-    """
-    if name != IGNORED and not isinstance(name, MetricStructuredNameMatcher):
-      raise ValueError('name must be a MetricStructuredNameMatcher.')
-
-    self.cumulative = cumulative
-    self.name = name
-    self.scalar = scalar
-    self.kind = kind
-
-  def _matches(self, item):
-    if self.cumulative != IGNORED and item.cumulative != self.cumulative:
-      return False
-    if self.name != IGNORED and not self.name._matches(item.name):
-      return False
-    if self.kind != IGNORED and item.kind != self.kind:
-      return False
-    if self.scalar != IGNORED:
-      value_property = [p
-                        for p in item.scalar.object_value.properties
-                        if p.key == 'value']
-      int_value = value_property[0].value.integer_value
-      if self.scalar != int_value:
-        return False
-    return True
-
-  def describe_to(self, description):
-    descriptors = []
-    if self.cumulative != IGNORED:
-      descriptors.append('cumulative is {}'.format(self.cumulative))
-    if self.name != IGNORED:
-      descriptors.append('name is {}'.format(self.name))
-    if self.scalar != IGNORED:
-      descriptors.append('scalar is ({})'.format(str(self.scalar)))
-
-    item_description = ' and '.join(descriptors)
-    description.append(item_description)

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py 
b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py
deleted file mode 100644
index ec63ce7..0000000
--- a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import unittest
-
-import hamcrest as hc
-
-import apache_beam.internal.clients.dataflow as dataflow
-from apache_beam.internal.clients.dataflow import message_matchers
-from apache_beam.internal.json_value import to_json_value
-
-
-class TestMatchers(unittest.TestCase):
-
-  def test_structured_name_matcher_basic(self):
-    metric_name = dataflow.MetricStructuredName()
-    metric_name.name = 'metric1'
-    metric_name.origin = 'origin2'
-
-    matcher = message_matchers.MetricStructuredNameMatcher(
-        name='metric1',
-        origin='origin2')
-    hc.assert_that(metric_name, hc.is_(matcher))
-    with self.assertRaises(AssertionError):
-      matcher = message_matchers.MetricStructuredNameMatcher(
-          name='metric1',
-          origin='origin1')
-      hc.assert_that(metric_name, hc.is_(matcher))
-
-  def test_metric_update_basic(self):
-    metric_update = dataflow.MetricUpdate()
-    metric_update.name = dataflow.MetricStructuredName()
-    metric_update.name.name = 'metric1'
-    metric_update.name.origin = 'origin1'
-
-    metric_update.cumulative = False
-    metric_update.kind = 'sum'
-    metric_update.scalar = to_json_value(1, with_type=True)
-
-    name_matcher = message_matchers.MetricStructuredNameMatcher(
-        name='metric1',
-        origin='origin1')
-    matcher = message_matchers.MetricUpdateMatcher(
-        name=name_matcher,
-        kind='sum',
-        scalar=1)
-
-    hc.assert_that(metric_update, hc.is_(matcher))
-
-    with self.assertRaises(AssertionError):
-      matcher.kind = 'suma'
-      hc.assert_that(metric_update, hc.is_(matcher))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
index 1507d88..760935d 100644
--- a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py
@@ -33,17 +33,17 @@ from apache_beam.internal import json_value
 from apache_beam.internal import pickler
 from apache_beam.pvalue import PCollectionView
 from apache_beam.runners.google_cloud_dataflow.dataflow_metrics import 
DataflowMetrics
+from apache_beam.runners.google_cloud_dataflow.internal.clients import 
dataflow as dataflow_api
+from apache_beam.runners.runner import PValueCache
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.utils import names
 from apache_beam.utils.names import PropertyNames
 from apache_beam.utils.names import TransformNames
 from apache_beam.utils.pipeline_options import StandardOptions
-from apache_beam.internal.clients import dataflow as dataflow_api
 
 
 class DataflowRunner(PipelineRunner):
@@ -154,7 +154,7 @@ class DataflowRunner(PipelineRunner):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Import here to avoid adding the dependency for local running scenarios.
     # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.internal import apiclient
+    from apache_beam.runners.google_cloud_dataflow.internal import apiclient
     self.job = apiclient.Job(pipeline.options)
 
     # The superclass's run will trigger a traversal of all reachable nodes.
@@ -235,7 +235,7 @@ class DataflowRunner(PipelineRunner):
     """Creates a Step object and adds it to the cache."""
     # Import here to avoid adding the dependency for local running scenarios.
     # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.internal import apiclient
+    from apache_beam.runners.google_cloud_dataflow.internal import apiclient
     step = apiclient.Step(step_kind, self._get_unique_step_name())
     self.job.proto.steps.append(step.proto)
     step.add_property(PropertyNames.USER_NAME, step_label)

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
index e9b5aff..59bd2fc 100644
--- 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
+++ 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py
@@ -21,9 +21,9 @@ import unittest
 
 import mock
 
-from apache_beam.internal.clients import dataflow as dataflow_api
 from apache_beam.runners.google_cloud_dataflow.dataflow_runner import 
DataflowPipelineResult
 from apache_beam.runners.google_cloud_dataflow.dataflow_runner import 
DataflowRuntimeException
+from apache_beam.runners.google_cloud_dataflow.internal.clients import 
dataflow as dataflow_api
 
 
 class DataflowRunnerTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
new file mode 100644
index 0000000..db761ea
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py
@@ -0,0 +1,726 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Dataflow client utility functions."""
+
+import codecs
+import getpass
+import json
+import logging
+import os
+import re
+import time
+from StringIO import StringIO
+from datetime import datetime
+
+from apitools.base.py import encoding
+from apitools.base.py import exceptions
+
+from apache_beam import utils
+from apache_beam.internal.auth import get_service_credentials
+from apache_beam.internal.clients import storage
+from apache_beam.internal.json_value import to_json_value
+from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow
+from apache_beam.transforms import cy_combiners
+from apache_beam.transforms.display import DisplayData
+from apache_beam.utils import dependency
+from apache_beam.utils import retry
+from apache_beam.utils.dependency import get_required_container_version
+from apache_beam.utils.dependency import get_sdk_name_and_version
+from apache_beam.utils.names import PropertyNames
+from apache_beam.utils.pipeline_options import DebugOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import WorkerOptions
+
+
+class Step(object):
+  """Wrapper for a dataflow Step protobuf."""
+
+  def __init__(self, step_kind, step_name, additional_properties=None):
+    self.step_kind = step_kind
+    self.step_name = step_name
+    self.proto = dataflow.Step(kind=step_kind, name=step_name)
+    self.proto.properties = {}
+    self._additional_properties = []
+
+    if additional_properties is not None:
+      for (n, v, t) in additional_properties:
+        self.add_property(n, v, t)
+
+  def add_property(self, name, value, with_type=False):
+    self._additional_properties.append((name, value, with_type))
+    self.proto.properties.additionalProperties.append(
+        dataflow.Step.PropertiesValue.AdditionalProperty(
+            key=name, value=to_json_value(value, with_type=with_type)))
+
+  def _get_outputs(self):
+    """Returns a list of all output labels for a step."""
+    outputs = []
+    for p in self.proto.properties.additionalProperties:
+      if p.key == PropertyNames.OUTPUT_INFO:
+        for entry in p.value.array_value.entries:
+          for entry_prop in entry.object_value.properties:
+            if entry_prop.key == PropertyNames.OUTPUT_NAME:
+              outputs.append(entry_prop.value.string_value)
+    return outputs
+
+  def __reduce__(self):
+    """Reduce hook for pickling the Step class more easily."""
+    return (Step, (self.step_kind, self.step_name, 
self._additional_properties))
+
+  def get_output(self, tag=None):
+    """Returns name if it is one of the outputs or first output if name is 
None.
+
+    Args:
+      tag: tag of the output as a string or None if we want to get the
+        name of the first output.
+
+    Returns:
+      The name of the output associated with the tag or the first output
+      if tag was None.
+
+    Raises:
+      ValueError: if the tag does not exist within outputs.
+    """
+    outputs = self._get_outputs()
+    if tag is None:
+      return outputs[0]
+    else:
+      name = '%s_%s' % (PropertyNames.OUT, tag)
+      if name not in outputs:
+        raise ValueError(
+            'Cannot find named output: %s in %s.' % (name, outputs))
+      return name
+
+
+class Environment(object):
+  """Wrapper for a dataflow Environment protobuf."""
+
+  def __init__(self, packages, options, environment_version):
+    self.standard_options = options.view_as(StandardOptions)
+    self.google_cloud_options = options.view_as(GoogleCloudOptions)
+    self.worker_options = options.view_as(WorkerOptions)
+    self.debug_options = options.view_as(DebugOptions)
+    self.proto = dataflow.Environment()
+    self.proto.clusterManagerApiService = 
GoogleCloudOptions.COMPUTE_API_SERVICE
+    self.proto.dataset = '{}/cloud_dataflow'.format(
+        GoogleCloudOptions.BIGQUERY_API_SERVICE)
+    self.proto.tempStoragePrefix = (
+        self.google_cloud_options.temp_location.replace(
+            'gs:/',
+            GoogleCloudOptions.STORAGE_API_SERVICE))
+    # User agent information.
+    self.proto.userAgent = dataflow.Environment.UserAgentValue()
+    self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
+
+    if self.google_cloud_options.service_account_email:
+      self.proto.serviceAccountEmail = (
+          self.google_cloud_options.service_account_email)
+
+    sdk_name, version_string = get_sdk_name_and_version()
+
+    self.proto.userAgent.additionalProperties.extend([
+        dataflow.Environment.UserAgentValue.AdditionalProperty(
+            key='name',
+            value=to_json_value(sdk_name)),
+        dataflow.Environment.UserAgentValue.AdditionalProperty(
+            key='version', value=to_json_value(version_string))])
+    # Version information.
+    self.proto.version = dataflow.Environment.VersionValue()
+    if self.standard_options.streaming:
+      job_type = 'PYTHON_STREAMING'
+    else:
+      job_type = 'PYTHON_BATCH'
+    self.proto.version.additionalProperties.extend([
+        dataflow.Environment.VersionValue.AdditionalProperty(
+            key='job_type',
+            value=to_json_value(job_type)),
+        dataflow.Environment.VersionValue.AdditionalProperty(
+            key='major', value=to_json_value(environment_version))])
+    # Experiments
+    if self.debug_options.experiments:
+      for experiment in self.debug_options.experiments:
+        self.proto.experiments.append(experiment)
+    # Worker pool(s) information.
+    package_descriptors = []
+    for package in packages:
+      package_descriptors.append(
+          dataflow.Package(
+              location='%s/%s' % (
+                  self.google_cloud_options.staging_location.replace(
+                      'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
+                  package),
+              name=package))
+
+    pool = dataflow.WorkerPool(
+        kind='local' if self.local else 'harness',
+        packages=package_descriptors,
+        taskrunnerSettings=dataflow.TaskRunnerSettings(
+            parallelWorkerSettings=dataflow.WorkerSettings(
+                baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
+                servicePath=self.google_cloud_options.dataflow_endpoint)))
+    pool.autoscalingSettings = dataflow.AutoscalingSettings()
+    # Set worker pool options received through command line.
+    if self.worker_options.num_workers:
+      pool.numWorkers = self.worker_options.num_workers
+    if self.worker_options.max_num_workers:
+      pool.autoscalingSettings.maxNumWorkers = (
+          self.worker_options.max_num_workers)
+    if self.worker_options.autoscaling_algorithm:
+      values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
+      pool.autoscalingSettings.algorithm = {
+          'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
+          'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
+      }.get(self.worker_options.autoscaling_algorithm)
+    if self.worker_options.machine_type:
+      pool.machineType = self.worker_options.machine_type
+    if self.worker_options.disk_size_gb:
+      pool.diskSizeGb = self.worker_options.disk_size_gb
+    if self.worker_options.disk_type:
+      pool.diskType = self.worker_options.disk_type
+    if self.worker_options.zone:
+      pool.zone = self.worker_options.zone
+    if self.worker_options.network:
+      pool.network = self.worker_options.network
+    if self.worker_options.worker_harness_container_image:
+      pool.workerHarnessContainerImage = (
+          self.worker_options.worker_harness_container_image)
+    else:
+      # Default to using the worker harness container image for the current SDK
+      # version.
+      pool.workerHarnessContainerImage = (
+          'dataflow.gcr.io/v1beta3/python:%s' %
+          get_required_container_version())
+    if self.worker_options.use_public_ips is not None:
+      if self.worker_options.use_public_ips:
+        pool.ipConfiguration = (
+            dataflow.WorkerPool
+            .IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
+      else:
+        pool.ipConfiguration = (
+            dataflow.WorkerPool
+            .IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
+
+    if self.standard_options.streaming:
+      # Use separate data disk for streaming.
+      disk = dataflow.Disk()
+      if self.local:
+        disk.diskType = 'local'
+      # TODO(ccy): allow customization of disk.
+      pool.dataDisks.append(disk)
+    self.proto.workerPools.append(pool)
+
+    sdk_pipeline_options = options.get_all_options()
+    if sdk_pipeline_options:
+      self.proto.sdkPipelineOptions = (
+          dataflow.Environment.SdkPipelineOptionsValue())
+
+      options_dict = {k: v
+                      for k, v in sdk_pipeline_options.iteritems()
+                      if v is not None}
+      self.proto.sdkPipelineOptions.additionalProperties.append(
+          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+              key='options', value=to_json_value(options_dict)))
+
+      dd = DisplayData.create_from_options(options)
+      items = [item.get_dict() for item in dd.items]
+      self.proto.sdkPipelineOptions.additionalProperties.append(
+          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+              key='display_data', value=to_json_value(items)))
+
+
+class Job(object):
+  """Wrapper for a dataflow Job protobuf."""
+
+  def __str__(self):
+    def encode_shortstrings(input_buffer, errors='strict'):
+      """Encoder (from Unicode) that suppresses long base64 strings."""
+      original_len = len(input_buffer)
+      if original_len > 150:
+        if self.base64_str_re.match(input_buffer):
+          input_buffer = '<string of %d bytes>' % original_len
+          input_buffer = input_buffer.encode('ascii', errors=errors)
+        else:
+          matched = self.coder_str_re.match(input_buffer)
+          if matched:
+            input_buffer = '%s<string of %d bytes>' % (
+                matched.group(1), matched.end(2) - matched.start(2))
+            input_buffer = input_buffer.encode('ascii', errors=errors)
+      return input_buffer, original_len
+
+    def decode_shortstrings(input_buffer, errors='strict'):
+      """Decoder (to Unicode) that suppresses long base64 strings."""
+      shortened, length = encode_shortstrings(input_buffer, errors)
+      return unicode(shortened), length
+
+    def shortstrings_registerer(encoding_name):
+      if encoding_name == 'shortstrings':
+        return codecs.CodecInfo(name='shortstrings',
+                                encode=encode_shortstrings,
+                                decode=decode_shortstrings)
+      return None
+
+    codecs.register(shortstrings_registerer)
+
+    # Use json "dump string" method to get readable formatting;
+    # further modify it to not output too-long strings, aimed at the
+    # 10,000+ character hex-encoded "serialized_fn" values.
+    return json.dumps(
+        json.loads(encoding.MessageToJson(self.proto), 
encoding='shortstrings'),
+        indent=2, sort_keys=True)
+
+  @staticmethod
+  def default_job_name(job_name):
+    if job_name is None:
+      user_name = getpass.getuser().lower()
+      date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f')
+      app_name = 'beamapp'
+      job_name = '{}-{}-{}'.format(app_name, user_name, date_component)
+    return job_name
+
+  def __init__(self, options):
+    self.options = options
+    self.google_cloud_options = options.view_as(GoogleCloudOptions)
+    if not self.google_cloud_options.job_name:
+      self.google_cloud_options.job_name = self.default_job_name(
+          self.google_cloud_options.job_name)
+
+    required_google_cloud_options = ['project', 'job_name', 'temp_location']
+    missing = [
+        option for option in required_google_cloud_options
+        if not getattr(self.google_cloud_options, option)]
+    if missing:
+      raise ValueError(
+          'Missing required configuration parameters: %s' % missing)
+
+    if not self.google_cloud_options.staging_location:
+      logging.info('Defaulting to the temp_location as staging_location: %s',
+                   self.google_cloud_options.temp_location)
+      (self.google_cloud_options
+       .staging_location) = self.google_cloud_options.temp_location
+
+    # Make the staging and temp locations job name and time specific. This is
+    # needed to avoid clashes between job submissions using the same staging
+    # area or team members using same job names. This method is not entirely
+    # foolproof since two job submissions with same name can happen at exactly
+    # the same time. However the window is extremely small given that
+    # time.time() has at least microseconds granularity. We add the suffix only
+    # for GCS staging locations where the potential for such clashes is high.
+    if self.google_cloud_options.staging_location.startswith('gs://'):
+      path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
+      self.google_cloud_options.staging_location = utils.path.join(
+          self.google_cloud_options.staging_location, path_suffix)
+      self.google_cloud_options.temp_location = utils.path.join(
+          self.google_cloud_options.temp_location, path_suffix)
+    self.proto = dataflow.Job(name=self.google_cloud_options.job_name)
+    if self.options.view_as(StandardOptions).streaming:
+      self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING
+    else:
+      self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH
+    self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
+    self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
+
+  def json(self):
+    return encoding.MessageToJson(self.proto)
+
+  def __reduce__(self):
+    """Reduce hook for pickling the Job class more easily."""
+    return (Job, (self.options,))
+
+
+class DataflowApplicationClient(object):
+  """A Dataflow API client used by application code to create and query 
jobs."""
+
+  def __init__(self, options, environment_version):
+    """Initializes a Dataflow API client object."""
+    self.standard_options = options.view_as(StandardOptions)
+    self.google_cloud_options = options.view_as(GoogleCloudOptions)
+    self.environment_version = environment_version
+    if self.google_cloud_options.no_auth:
+      credentials = None
+    else:
+      credentials = get_service_credentials()
+    self._client = dataflow.DataflowV1b3(
+        url=self.google_cloud_options.dataflow_endpoint,
+        credentials=credentials,
+        get_credentials=(not self.google_cloud_options.no_auth))
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self.google_cloud_options.no_auth))
+
+  # TODO(silviuc): Refactor so that retry logic can be applied.
+  @retry.no_retries  # Using no_retries marks this as an integration point.
+  def _gcs_file_copy(self, from_path, to_path):
+    to_folder, to_name = os.path.split(to_path)
+    with open(from_path, 'rb') as f:
+      self.stage_file(to_folder, to_name, f)
+
+  def stage_file(self, gcs_or_local_path, file_name, stream,
+                 mime_type='application/octet-stream'):
+    """Stages a file at a GCS or local path with stream-supplied contents."""
+    if not gcs_or_local_path.startswith('gs://'):
+      local_path = os.path.join(gcs_or_local_path, file_name)
+      logging.info('Staging file locally to %s', local_path)
+      with open(local_path, 'wb') as f:
+        f.write(stream.read())
+      return
+    gcs_location = gcs_or_local_path + '/' + file_name
+    bucket, name = gcs_location[5:].split('/', 1)
+
+    request = storage.StorageObjectsInsertRequest(
+        bucket=bucket, name=name)
+    logging.info('Starting GCS upload to %s...', gcs_location)
+    upload = storage.Upload(stream, mime_type)
+    try:
+      response = self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
+      reportable_errors = {
+          403: 'access denied',
+          404: 'bucket not found',
+      }
+      if e.status_code in reportable_errors:
+        raise IOError(('Could not upload to GCS path %s: %s. Please verify '
+                       'that credentials are valid and that you have write '
+                       'access to the specified path. Stale credentials can be 
'
+                       'refreshed by executing "gcloud auth login".') %
+                      (gcs_or_local_path, reportable_errors[e.status_code]))
+      raise
+    logging.info('Completed GCS upload to %s', gcs_location)
+    return response
+
+  # TODO(silviuc): Refactor so that retry logic can be applied.
+  @retry.no_retries  # Using no_retries marks this as an integration point.
+  def create_job(self, job):
+    """Creates job description. May stage and/or submit for remote 
execution."""
+    self.create_job_description(job)
+
+    # Stage and submit the job when necessary
+    dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file
+    template_location = (
+        job.options.view_as(GoogleCloudOptions).template_location)
+
+    job_location = template_location or dataflow_job_file
+    if job_location:
+      gcs_or_local_path = os.path.dirname(job_location)
+      file_name = os.path.basename(job_location)
+      self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
+
+    if not template_location:
+      return self.submit_job_description(job)
+    else:
+      return None
+
+  def create_job_description(self, job):
+    """Creates a job described by the workflow proto."""
+    resources = dependency.stage_job_resources(
+        job.options, file_copy=self._gcs_file_copy)
+    job.proto.environment = Environment(
+        packages=resources, options=job.options,
+        environment_version=self.environment_version).proto
+    # TODO(silviuc): Remove the debug logging eventually.
+    logging.info('JOB: %s', job)
+
+  def submit_job_description(self, job):
+    """Creates and excutes a job request."""
+    request = dataflow.DataflowProjectsJobsCreateRequest()
+    request.projectId = self.google_cloud_options.project
+    request.job = job.proto
+
+    try:
+      response = self._client.projects_jobs.Create(request)
+    except exceptions.BadStatusCodeError as e:
+      logging.error('HTTP status %d trying to create job'
+                    ' at dataflow service endpoint %s',
+                    e.response.status,
+                    self.google_cloud_options.dataflow_endpoint)
+      logging.fatal('details of server error: %s', e)
+      raise
+    logging.info('Create job: %s', response)
+    # The response is a Job proto with the id for the new job.
+    logging.info('Created job with id: [%s]', response.id)
+    logging.info(
+        'To access the Dataflow monitoring console, please navigate to '
+        'https://console.developers.google.com/project/%s/dataflow/job/%s',
+        self.google_cloud_options.project, response.id)
+
+    return response
+
+  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
+  def modify_job_state(self, job_id, new_state):
+    """Modify the run state of the job.
+
+    Args:
+      job_id: The id of the job.
+      new_state: A string representing the new desired state. It could be set 
to
+      either 'JOB_STATE_DONE', 'JOB_STATE_CANCELLED' or 'JOB_STATE_DRAINING'.
+
+    Returns:
+      True if the job was modified successfully.
+    """
+    if new_state == 'JOB_STATE_DONE':
+      new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DONE
+    elif new_state == 'JOB_STATE_CANCELLED':
+      new_state = 
dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_CANCELLED
+    elif new_state == 'JOB_STATE_DRAINING':
+      new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DRAINING
+    else:
+      # Other states could only be set by the service.
+      return False
+
+    request = dataflow.DataflowProjectsJobsUpdateRequest()
+    request.jobId = job_id
+    request.projectId = self.google_cloud_options.project
+    request.job = dataflow.Job(requestedState=new_state)
+
+    self._client.projects_jobs.Update(request)
+    return True
+
+  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
+  def get_job(self, job_id):
+    """Gets the job status for a submitted job.
+
+    Args:
+      job_id: A string representing the job_id for the workflow as returned
+        by the a create_job() request.
+
+    Returns:
+      A Job proto. See below for interesting fields.
+
+    The Job proto returned from a get_job() request contains some interesting
+    fields:
+      currentState: An object representing the current state of the job. The
+        string representation of the object (str() result) has the following
+        possible values: JOB_STATE_UNKNONW, JOB_STATE_STOPPED,
+        JOB_STATE_RUNNING, JOB_STATE_DONE, JOB_STATE_FAILED,
+        JOB_STATE_CANCELLED.
+      createTime: UTC time when the job was created
+        (e.g. '2015-03-10T00:01:53.074Z')
+      currentStateTime: UTC time for the current state of the job.
+    """
+    request = dataflow.DataflowProjectsJobsGetRequest()
+    request.jobId = job_id
+    request.projectId = self.google_cloud_options.project
+    response = self._client.projects_jobs.Get(request)
+    return response
+
+  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
+  def list_messages(
+      self, job_id, start_time=None, end_time=None, page_token=None,
+      minimum_importance=None):
+    """List messages associated with the execution of a job.
+
+    Args:
+      job_id: A string representing the job_id for the workflow as returned
+        by the a create_job() request.
+      start_time: If specified, only messages generated after the start time
+        will be returned, otherwise all messages since job started will be
+        returned. The value is a string representing UTC time
+        (e.g., '2015-08-18T21:03:50.644Z')
+      end_time: If specified, only messages generated before the end time
+        will be returned, otherwise all messages up to current time will be
+        returned. The value is a string representing UTC time
+        (e.g., '2015-08-18T21:03:50.644Z')
+      page_token: A string to be used as next page token if the list call
+        returned paginated results.
+      minimum_importance: Filter for messages based on importance. The possible
+        string values in increasing order of importance are: JOB_MESSAGE_DEBUG,
+        JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, JOB_MESSAGE_WARNING,
+        JOB_MESSAGE_ERROR. For example, a filter set on warning will allow only
+        warnings and errors and exclude all others.
+
+    Returns:
+      A tuple consisting of a list of JobMessage instances and a
+      next page token string.
+
+    Raises:
+      RuntimeError: if an unexpected value for the message_importance argument
+        is used.
+
+    The JobMessage objects returned by the call contain the following  fields:
+      id: A unique string identifier for the message.
+      time: A string representing the UTC time of the message
+        (e.g., '2015-08-18T21:03:50.644Z')
+      messageImportance: An enumeration value for the message importance. The
+        value if converted to string will have the following possible values:
+        JOB_MESSAGE_DEBUG, JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC,
+        JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
+     messageText: A message string.
+    """
+    request = dataflow.DataflowProjectsJobsMessagesListRequest(
+        jobId=job_id, projectId=self.google_cloud_options.project)
+    if page_token is not None:
+      request.pageToken = page_token
+    if start_time is not None:
+      request.startTime = start_time
+    if end_time is not None:
+      request.endTime = end_time
+    if minimum_importance is not None:
+      if minimum_importance == 'JOB_MESSAGE_DEBUG':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_DEBUG)
+      elif minimum_importance == 'JOB_MESSAGE_DETAILED':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_DETAILED)
+      elif minimum_importance == 'JOB_MESSAGE_BASIC':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_BASIC)
+      elif minimum_importance == 'JOB_MESSAGE_WARNING':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_WARNING)
+      elif minimum_importance == 'JOB_MESSAGE_ERROR':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_ERROR)
+      else:
+        raise RuntimeError(
+            'Unexpected value for minimum_importance argument: %r',
+            minimum_importance)
+    response = self._client.projects_jobs_messages.List(request)
+    return response.jobMessages, response.nextPageToken
+
+
+class MetricUpdateTranslators(object):
+  """Translators between accumulators and dataflow metric updates."""
+
+  @staticmethod
+  def translate_boolean(accumulator, metric_update_proto):
+    metric_update_proto.boolean = accumulator.value
+
+  @staticmethod
+  def translate_scalar_mean_int(accumulator, metric_update_proto):
+    if accumulator.count:
+      metric_update_proto.integerMean = dataflow.IntegerMean()
+      metric_update_proto.integerMean.sum = to_split_int(accumulator.sum)
+      metric_update_proto.integerMean.count = to_split_int(accumulator.count)
+    else:
+      metric_update_proto.nameAndKind.kind = None
+
+  @staticmethod
+  def translate_scalar_mean_float(accumulator, metric_update_proto):
+    if accumulator.count:
+      metric_update_proto.floatingPointMean = dataflow.FloatingPointMean()
+      metric_update_proto.floatingPointMean.sum = accumulator.sum
+      metric_update_proto.floatingPointMean.count = to_split_int(
+          accumulator.count)
+    else:
+      metric_update_proto.nameAndKind.kind = None
+
+  @staticmethod
+  def translate_scalar_counter_int(accumulator, metric_update_proto):
+    metric_update_proto.integer = to_split_int(accumulator.value)
+
+  @staticmethod
+  def translate_scalar_counter_float(accumulator, metric_update_proto):
+    metric_update_proto.floatingPoint = accumulator.value
+
+
+def to_split_int(n):
+  res = dataflow.SplitInt64()
+  res.lowBits = n & 0xffffffff
+  res.highBits = n >> 32
+  return res
+
+
+def translate_distribution(distribution_update, metric_update_proto):
+  """Translate metrics DistributionUpdate to dataflow distribution update."""
+  dist_update_proto = dataflow.DistributionUpdate()
+  dist_update_proto.min = to_split_int(distribution_update.min)
+  dist_update_proto.max = to_split_int(distribution_update.max)
+  dist_update_proto.count = to_split_int(distribution_update.count)
+  dist_update_proto.sum = to_split_int(distribution_update.sum)
+  metric_update_proto.distribution = dist_update_proto
+
+
+def translate_value(value, metric_update_proto):
+  metric_update_proto.integer = to_split_int(value)
+
+
+def translate_scalar(accumulator, metric_update):
+  metric_update.scalar = to_json_value(accumulator.value, with_type=True)
+
+
+def translate_mean(accumulator, metric_update):
+  if accumulator.count:
+    metric_update.meanSum = to_json_value(accumulator.sum, with_type=True)
+    metric_update.meanCount = to_json_value(accumulator.count, with_type=True)
+  else:
+    # A denominator of 0 will raise an error in the service.
+    # What it means is we have nothing to report yet, so don't.
+    metric_update.kind = None
+
+
+# To enable a counter on the service, add it to this dictionary.
+metric_translations = {
+    cy_combiners.CountCombineFn: ('sum', translate_scalar),
+    cy_combiners.SumInt64Fn: ('sum', translate_scalar),
+    cy_combiners.MinInt64Fn: ('min', translate_scalar),
+    cy_combiners.MaxInt64Fn: ('max', translate_scalar),
+    cy_combiners.MeanInt64Fn: ('mean', translate_mean),
+    cy_combiners.SumFloatFn: ('sum', translate_scalar),
+    cy_combiners.MinFloatFn: ('min', translate_scalar),
+    cy_combiners.MaxFloatFn: ('max', translate_scalar),
+    cy_combiners.MeanFloatFn: ('mean', translate_mean),
+    cy_combiners.AllCombineFn: ('and', translate_scalar),
+    cy_combiners.AnyCombineFn: ('or', translate_scalar),
+}
+
+counter_translations = {
+    cy_combiners.CountCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.SumInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MinInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MaxInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MeanInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_int),
+    cy_combiners.SumFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MinFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MaxFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MeanFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_float),
+    cy_combiners.AllCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.AND,
+        MetricUpdateTranslators.translate_boolean),
+    cy_combiners.AnyCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.OR,
+        MetricUpdateTranslators.translate_boolean),
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
new file mode 100644
index 0000000..bc57211
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Unit tests for the apiclient module."""
+
+import unittest
+
+from mock import Mock
+
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.runners.google_cloud_dataflow.dataflow_runner import 
DataflowRunner
+from apache_beam.runners.google_cloud_dataflow.internal import apiclient
+from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow
+from apache_beam.utils.pipeline_options import PipelineOptions
+
+
+class UtilTest(unittest.TestCase):
+
+  @unittest.skip("Enable once BEAM-1080 is fixed.")
+  def test_create_application_client(self):
+    pipeline_options = PipelineOptions()
+    apiclient.DataflowApplicationClient(
+        pipeline_options,
+        DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
+
+  def test_default_job_name(self):
+    job_name = apiclient.Job.default_job_name(None)
+    regexp = 'beamapp-.*-[0-9]{10}-[0-9]{6}'
+    self.assertRegexpMatches(job_name, regexp)
+
+  def test_split_int(self):
+    number = 12345
+    split_number = apiclient.to_split_int(number)
+    self.assertEqual((split_number.lowBits, split_number.highBits),
+                     (number, 0))
+    shift_number = number << 32
+    split_number = apiclient.to_split_int(shift_number)
+    self.assertEqual((split_number.lowBits, split_number.highBits),
+                     (0, number))
+
+  def test_translate_distribution(self):
+    metric_update = dataflow.CounterUpdate()
+    distribution_update = DistributionData(16, 2, 1, 15)
+    apiclient.translate_distribution(distribution_update, metric_update)
+    self.assertEqual(metric_update.distribution.min.lowBits,
+                     distribution_update.min)
+    self.assertEqual(metric_update.distribution.max.lowBits,
+                     distribution_update.max)
+    self.assertEqual(metric_update.distribution.sum.lowBits,
+                     distribution_update.sum)
+    self.assertEqual(metric_update.distribution.count.lowBits,
+                     distribution_update.count)
+
+  def test_translate_means(self):
+    metric_update = dataflow.CounterUpdate()
+    accumulator = Mock()
+    accumulator.sum = 16
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
+                                                                metric_update)
+    self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum)
+    self.assertEqual(metric_update.integerMean.count.lowBits, 
accumulator.count)
+
+    accumulator.sum = 16.0
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator,
+                                                                  
metric_update)
+    self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum)
+    self.assertEqual(
+        metric_update.floatingPointMean.count.lowBits, accumulator.count)
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
new file mode 100644
index 0000000..a6857f8
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Common imports for generated dataflow client library."""
+# pylint:disable=wildcard-import
+
+import pkgutil
+
+from 
apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages
 import *
+
+from 
apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client
 import *
+
+__path__ = pkgutil.extend_path(__path__, __name__)

http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
new file mode 100644
index 0000000..4d3d525
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
@@ -0,0 +1,684 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Generated client library for dataflow version v1b3."""
+# NOTE: This file is autogenerated and should not be edited by hand.
+from apitools.base.py import base_api
+
+from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow 
import dataflow_v1b3_messages as messages
+
+
+class DataflowV1b3(base_api.BaseApiClient):
+  """Generated client library for service dataflow version v1b3."""
+
+  MESSAGES_MODULE = messages
+  BASE_URL = u'https://dataflow.googleapis.com/'
+
+  _PACKAGE = u'dataflow'
+  _SCOPES = [u'https://www.googleapis.com/auth/cloud-platform', 
u'https://www.googleapis.com/auth/userinfo.email']
+  _VERSION = u'v1b3'
+  _CLIENT_ID = '1042881264118.apps.googleusercontent.com'
+  _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b'
+  _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b'
+  _CLIENT_CLASS_NAME = u'DataflowV1b3'
+  _URL_VERSION = u'v1b3'
+  _API_KEY = None
+
+  def __init__(self, url='', credentials=None,
+               get_credentials=True, http=None, model=None,
+               log_request=False, log_response=False,
+               credentials_args=None, default_global_params=None,
+               additional_http_headers=None):
+    """Create a new dataflow handle."""
+    url = url or self.BASE_URL
+    super(DataflowV1b3, self).__init__(
+        url, credentials=credentials,
+        get_credentials=get_credentials, http=http, model=model,
+        log_request=log_request, log_response=log_response,
+        credentials_args=credentials_args,
+        default_global_params=default_global_params,
+        additional_http_headers=additional_http_headers)
+    self.projects_jobs_debug = self.ProjectsJobsDebugService(self)
+    self.projects_jobs_messages = self.ProjectsJobsMessagesService(self)
+    self.projects_jobs_workItems = self.ProjectsJobsWorkItemsService(self)
+    self.projects_jobs = self.ProjectsJobsService(self)
+    self.projects_locations_jobs_messages = 
self.ProjectsLocationsJobsMessagesService(self)
+    self.projects_locations_jobs_workItems = 
self.ProjectsLocationsJobsWorkItemsService(self)
+    self.projects_locations_jobs = self.ProjectsLocationsJobsService(self)
+    self.projects_locations = self.ProjectsLocationsService(self)
+    self.projects_templates = self.ProjectsTemplatesService(self)
+    self.projects = self.ProjectsService(self)
+
+  class ProjectsJobsDebugService(base_api.BaseApiService):
+    """Service class for the projects_jobs_debug resource."""
+
+    _NAME = u'projects_jobs_debug'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsJobsDebugService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def GetConfig(self, request, global_params=None):
+      """Get encoded debug configuration for component. Not cacheable.
+
+      Args:
+        request: (DataflowProjectsJobsDebugGetConfigRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (GetDebugConfigResponse) The response message.
+      """
+      config = self.GetMethodConfig('GetConfig')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    GetConfig.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.jobs.debug.getConfig',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[],
+        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig',
+        request_field=u'getDebugConfigRequest',
+        request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest',
+        response_type_name=u'GetDebugConfigResponse',
+        supports_download=False,
+    )
+
+    def SendCapture(self, request, global_params=None):
+      """Send encoded debug capture data for component.
+
+      Args:
+        request: (DataflowProjectsJobsDebugSendCaptureRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (SendDebugCaptureResponse) The response message.
+      """
+      config = self.GetMethodConfig('SendCapture')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    SendCapture.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.jobs.debug.sendCapture',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[],
+        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture',
+        request_field=u'sendDebugCaptureRequest',
+        request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest',
+        response_type_name=u'SendDebugCaptureResponse',
+        supports_download=False,
+    )
+
+  class ProjectsJobsMessagesService(base_api.BaseApiService):
+    """Service class for the projects_jobs_messages resource."""
+
+    _NAME = u'projects_jobs_messages'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsJobsMessagesService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def List(self, request, global_params=None):
+      """Request the job status.
+
+      Args:
+        request: (DataflowProjectsJobsMessagesListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (ListJobMessagesResponse) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.jobs.messages.list',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[u'endTime', u'location', u'minimumImportance', 
u'pageSize', u'pageToken', u'startTime'],
+        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/messages',
+        request_field='',
+        request_type_name=u'DataflowProjectsJobsMessagesListRequest',
+        response_type_name=u'ListJobMessagesResponse',
+        supports_download=False,
+    )
+
+  class ProjectsJobsWorkItemsService(base_api.BaseApiService):
+    """Service class for the projects_jobs_workItems resource."""
+
+    _NAME = u'projects_jobs_workItems'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsJobsWorkItemsService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def Lease(self, request, global_params=None):
+      """Leases a dataflow WorkItem to run.
+
+      Args:
+        request: (DataflowProjectsJobsWorkItemsLeaseRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (LeaseWorkItemResponse) The response message.
+      """
+      config = self.GetMethodConfig('Lease')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Lease.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.jobs.workItems.lease',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[],
+        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:lease',
+        request_field=u'leaseWorkItemRequest',
+        request_type_name=u'DataflowProjectsJobsWorkItemsLeaseRequest',
+        response_type_name=u'LeaseWorkItemResponse',
+        supports_download=False,
+    )
+
+    def ReportStatus(self, request, global_params=None):
+      """Reports the status of dataflow WorkItems leased by a worker.
+
+      Args:
+        request: (DataflowProjectsJobsWorkItemsReportStatusRequest) input 
message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (ReportWorkItemStatusResponse) The response message.
+      """
+      config = self.GetMethodConfig('ReportStatus')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    ReportStatus.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.jobs.workItems.reportStatus',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[],
+        
relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:reportStatus',
+        request_field=u'reportWorkItemStatusRequest',
+        request_type_name=u'DataflowProjectsJobsWorkItemsReportStatusRequest',
+        response_type_name=u'ReportWorkItemStatusResponse',
+        supports_download=False,
+    )
+
+  class ProjectsJobsService(base_api.BaseApiService):
+    """Service class for the projects_jobs resource."""
+
+    _NAME = u'projects_jobs'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsJobsService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def Create(self, request, global_params=None):
+      """Creates a Cloud Dataflow job.
+
+      Args:
+        request: (DataflowProjectsJobsCreateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (Job) The response message.
+      """
+      config = self.GetMethodConfig('Create')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Create.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.jobs.create',
+        ordered_params=[u'projectId'],
+        path_params=[u'projectId'],
+        query_params=[u'location', u'replaceJobId', u'view'],
+        relative_path=u'v1b3/projects/{projectId}/jobs',
+        request_field=u'job',
+        request_type_name=u'DataflowProjectsJobsCreateRequest',
+        response_type_name=u'Job',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      """Gets the state of the specified Cloud Dataflow job.
+
+      Args:
+        request: (DataflowProjectsJobsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (Job) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.jobs.get',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[u'location', u'view'],
+        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}',
+        request_field='',
+        request_type_name=u'DataflowProjectsJobsGetRequest',
+        response_type_name=u'Job',
+        supports_download=False,
+    )
+
+    def GetMetrics(self, request, global_params=None):
+      """Request the job status.
+
+      Args:
+        request: (DataflowProjectsJobsGetMetricsRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (JobMetrics) The response message.
+      """
+      config = self.GetMethodConfig('GetMetrics')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    GetMetrics.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.jobs.getMetrics',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[u'location', u'startTime'],
+        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/metrics',
+        request_field='',
+        request_type_name=u'DataflowProjectsJobsGetMetricsRequest',
+        response_type_name=u'JobMetrics',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      """List the jobs of a project.
+
+      Args:
+        request: (DataflowProjectsJobsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (ListJobsResponse) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.jobs.list',
+        ordered_params=[u'projectId'],
+        path_params=[u'projectId'],
+        query_params=[u'filter', u'location', u'pageSize', u'pageToken', 
u'view'],
+        relative_path=u'v1b3/projects/{projectId}/jobs',
+        request_field='',
+        request_type_name=u'DataflowProjectsJobsListRequest',
+        response_type_name=u'ListJobsResponse',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      """Updates the state of an existing Cloud Dataflow job.
+
+      Args:
+        request: (DataflowProjectsJobsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (Job) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'dataflow.projects.jobs.update',
+        ordered_params=[u'projectId', u'jobId'],
+        path_params=[u'jobId', u'projectId'],
+        query_params=[u'location'],
+        relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}',
+        request_field=u'job',
+        request_type_name=u'DataflowProjectsJobsUpdateRequest',
+        response_type_name=u'Job',
+        supports_download=False,
+    )
+
+  class ProjectsLocationsJobsMessagesService(base_api.BaseApiService):
+    """Service class for the projects_locations_jobs_messages resource."""
+
+    _NAME = u'projects_locations_jobs_messages'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsLocationsJobsMessagesService, 
self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def List(self, request, global_params=None):
+      """Request the job status.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsMessagesListRequest) input 
message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (ListJobMessagesResponse) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.locations.jobs.messages.list',
+        ordered_params=[u'projectId', u'location', u'jobId'],
+        path_params=[u'jobId', u'location', u'projectId'],
+        query_params=[u'endTime', u'minimumImportance', u'pageSize', 
u'pageToken', u'startTime'],
+        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/messages',
+        request_field='',
+        request_type_name=u'DataflowProjectsLocationsJobsMessagesListRequest',
+        response_type_name=u'ListJobMessagesResponse',
+        supports_download=False,
+    )
+
+  class ProjectsLocationsJobsWorkItemsService(base_api.BaseApiService):
+    """Service class for the projects_locations_jobs_workItems resource."""
+
+    _NAME = u'projects_locations_jobs_workItems'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsLocationsJobsWorkItemsService, 
self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def Lease(self, request, global_params=None):
+      """Leases a dataflow WorkItem to run.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsWorkItemsLeaseRequest) input 
message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (LeaseWorkItemResponse) The response message.
+      """
+      config = self.GetMethodConfig('Lease')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Lease.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.locations.jobs.workItems.lease',
+        ordered_params=[u'projectId', u'location', u'jobId'],
+        path_params=[u'jobId', u'location', u'projectId'],
+        query_params=[],
+        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:lease',
+        request_field=u'leaseWorkItemRequest',
+        
request_type_name=u'DataflowProjectsLocationsJobsWorkItemsLeaseRequest',
+        response_type_name=u'LeaseWorkItemResponse',
+        supports_download=False,
+    )
+
+    def ReportStatus(self, request, global_params=None):
+      """Reports the status of dataflow WorkItems leased by a worker.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsWorkItemsReportStatusRequest) 
input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (ReportWorkItemStatusResponse) The response message.
+      """
+      config = self.GetMethodConfig('ReportStatus')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    ReportStatus.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.locations.jobs.workItems.reportStatus',
+        ordered_params=[u'projectId', u'location', u'jobId'],
+        path_params=[u'jobId', u'location', u'projectId'],
+        query_params=[],
+        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:reportStatus',
+        request_field=u'reportWorkItemStatusRequest',
+        
request_type_name=u'DataflowProjectsLocationsJobsWorkItemsReportStatusRequest',
+        response_type_name=u'ReportWorkItemStatusResponse',
+        supports_download=False,
+    )
+
+  class ProjectsLocationsJobsService(base_api.BaseApiService):
+    """Service class for the projects_locations_jobs resource."""
+
+    _NAME = u'projects_locations_jobs'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsLocationsJobsService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def Create(self, request, global_params=None):
+      """Creates a Cloud Dataflow job.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsCreateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (Job) The response message.
+      """
+      config = self.GetMethodConfig('Create')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Create.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.locations.jobs.create',
+        ordered_params=[u'projectId', u'location'],
+        path_params=[u'location', u'projectId'],
+        query_params=[u'replaceJobId', u'view'],
+        relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs',
+        request_field=u'job',
+        request_type_name=u'DataflowProjectsLocationsJobsCreateRequest',
+        response_type_name=u'Job',
+        supports_download=False,
+    )
+
+    def Get(self, request, global_params=None):
+      """Gets the state of the specified Cloud Dataflow job.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsGetRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (Job) The response message.
+      """
+      config = self.GetMethodConfig('Get')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Get.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.locations.jobs.get',
+        ordered_params=[u'projectId', u'location', u'jobId'],
+        path_params=[u'jobId', u'location', u'projectId'],
+        query_params=[u'view'],
+        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}',
+        request_field='',
+        request_type_name=u'DataflowProjectsLocationsJobsGetRequest',
+        response_type_name=u'Job',
+        supports_download=False,
+    )
+
+    def GetMetrics(self, request, global_params=None):
+      """Request the job status.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsGetMetricsRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (JobMetrics) The response message.
+      """
+      config = self.GetMethodConfig('GetMetrics')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    GetMetrics.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.locations.jobs.getMetrics',
+        ordered_params=[u'projectId', u'location', u'jobId'],
+        path_params=[u'jobId', u'location', u'projectId'],
+        query_params=[u'startTime'],
+        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/metrics',
+        request_field='',
+        request_type_name=u'DataflowProjectsLocationsJobsGetMetricsRequest',
+        response_type_name=u'JobMetrics',
+        supports_download=False,
+    )
+
+    def List(self, request, global_params=None):
+      """List the jobs of a project.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsListRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (ListJobsResponse) The response message.
+      """
+      config = self.GetMethodConfig('List')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    List.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'GET',
+        method_id=u'dataflow.projects.locations.jobs.list',
+        ordered_params=[u'projectId', u'location'],
+        path_params=[u'location', u'projectId'],
+        query_params=[u'filter', u'pageSize', u'pageToken', u'view'],
+        relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs',
+        request_field='',
+        request_type_name=u'DataflowProjectsLocationsJobsListRequest',
+        response_type_name=u'ListJobsResponse',
+        supports_download=False,
+    )
+
+    def Update(self, request, global_params=None):
+      """Updates the state of an existing Cloud Dataflow job.
+
+      Args:
+        request: (DataflowProjectsLocationsJobsUpdateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (Job) The response message.
+      """
+      config = self.GetMethodConfig('Update')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Update.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'PUT',
+        method_id=u'dataflow.projects.locations.jobs.update',
+        ordered_params=[u'projectId', u'location', u'jobId'],
+        path_params=[u'jobId', u'location', u'projectId'],
+        query_params=[],
+        
relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}',
+        request_field=u'job',
+        request_type_name=u'DataflowProjectsLocationsJobsUpdateRequest',
+        response_type_name=u'Job',
+        supports_download=False,
+    )
+
+  class ProjectsLocationsService(base_api.BaseApiService):
+    """Service class for the projects_locations resource."""
+
+    _NAME = u'projects_locations'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsLocationsService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+  class ProjectsTemplatesService(base_api.BaseApiService):
+    """Service class for the projects_templates resource."""
+
+    _NAME = u'projects_templates'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsTemplatesService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def Create(self, request, global_params=None):
+      """Creates a Cloud Dataflow job from a template.
+
+      Args:
+        request: (DataflowProjectsTemplatesCreateRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (Job) The response message.
+      """
+      config = self.GetMethodConfig('Create')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    Create.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.templates.create',
+        ordered_params=[u'projectId'],
+        path_params=[u'projectId'],
+        query_params=[],
+        relative_path=u'v1b3/projects/{projectId}/templates',
+        request_field=u'createJobFromTemplateRequest',
+        request_type_name=u'DataflowProjectsTemplatesCreateRequest',
+        response_type_name=u'Job',
+        supports_download=False,
+    )
+
+  class ProjectsService(base_api.BaseApiService):
+    """Service class for the projects resource."""
+
+    _NAME = u'projects'
+
+    def __init__(self, client):
+      super(DataflowV1b3.ProjectsService, self).__init__(client)
+      self._upload_configs = {
+          }
+
+    def WorkerMessages(self, request, global_params=None):
+      """Send a worker_message to the service.
+
+      Args:
+        request: (DataflowProjectsWorkerMessagesRequest) input message
+        global_params: (StandardQueryParameters, default: None) global 
arguments
+      Returns:
+        (SendWorkerMessagesResponse) The response message.
+      """
+      config = self.GetMethodConfig('WorkerMessages')
+      return self._RunMethod(
+          config, request, global_params=global_params)
+
+    WorkerMessages.method_config = lambda: base_api.ApiMethodInfo(
+        http_method=u'POST',
+        method_id=u'dataflow.projects.workerMessages',
+        ordered_params=[u'projectId'],
+        path_params=[u'projectId'],
+        query_params=[],
+        relative_path=u'v1b3/projects/{projectId}/WorkerMessages',
+        request_field=u'sendWorkerMessagesRequest',
+        request_type_name=u'DataflowProjectsWorkerMessagesRequest',
+        response_type_name=u'SendWorkerMessagesResponse',
+        supports_download=False,
+    )

Reply via email to