Repository: beam
Updated Branches:
  refs/heads/master 2dd1907c6 -> bf5aa1bca


Use SDK harness container for FnAPI jobs when worker_harness_container_image is 
not specified. Add a separate image tag to use with the SDK harness container.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f46a40c2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f46a40c2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f46a40c2

Branch: refs/heads/master
Commit: f46a40c279499737bb7fb45af5e299d76f6af49b
Parents: 2dd1907
Author: Valentyn Tymofieiev <valen...@google.com>
Authored: Wed Jun 28 16:41:03 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 29 10:35:53 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/internal/apiclient.py      |  6 +--
 .../runners/dataflow/internal/dependency.py     | 44 +++++++++++++++++---
 2 files changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f46a40c2/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index df1a3f2..edac9d7 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -38,7 +38,6 @@ from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal.clients import dataflow
-from apache_beam.runners.dataflow.internal.dependency import 
get_required_container_version
 from apache_beam.runners.dataflow.internal.dependency import 
get_sdk_name_and_version
 from apache_beam.runners.dataflow.internal.names import PropertyNames
 from apache_beam.transforms import cy_combiners
@@ -205,11 +204,8 @@ class Environment(object):
       pool.workerHarnessContainerImage = (
           self.worker_options.worker_harness_container_image)
     else:
-      # Default to using the worker harness container image for the current SDK
-      # version.
       pool.workerHarnessContainerImage = (
-          'dataflow.gcr.io/v1beta3/python:%s' %
-          get_required_container_version())
+          dependency.get_default_container_image_for_current_sdk(job_type))
     if self.worker_options.use_public_ips is not None:
       if self.worker_options.use_public_ips:
         pool.ipConfiguration = (

http://git-wip-us.apache.org/repos/asf/beam/blob/f46a40c2/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py 
b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 03e1794..a40a582 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -71,9 +71,15 @@ from apache_beam.options.pipeline_options import SetupOptions
 
 
 # Update this version to the next version whenever there is a change that will
-# require changes to the execution environment.
+# require changes to legacy Dataflow worker execution environment.
 # This should be in the beam-[version]-[date] format, date is optional.
+# BEAM_CONTAINER_VERSION and BEAM_FNAPI_CONTAINER version should coincide
+# when we make a release.
 BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170626'
+# Update this version to the next version whenever there is a change that
+# requires changes to SDK harness container or SDK harness launcher.
+# This should be in the beam-[version]-[date] format, date is optional.
+BEAM_FNAPI_CONTAINER_VERSION = 'beam-2.1.0-20170621'
 
 # Standard file names used for staging files.
 WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
@@ -474,10 +480,33 @@ def _stage_beam_sdk_tarball(sdk_remote_location, 
staged_path, temp_dir):
         'type of location: %s' % sdk_remote_location)
 
 
-def get_required_container_version():
+def get_default_container_image_for_current_sdk(job_type):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Args:
+    job_type (str): BEAM job type.
+
+  Returns:
+    str: Google Cloud Dataflow container image for remote execution.
+  """
+  # TODO(tvalentyn): Use enumerated type instead of strings for job types.
+  if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
+    image_name = 'dataflow.gcr.io/v1beta3/python-fnapi'
+  else:
+    image_name = 'dataflow.gcr.io/v1beta3/python'
+  image_tag = _get_required_container_version(job_type)
+  return image_name + ':' + image_tag
+
+
+def _get_required_container_version(job_type=None):
   """For internal use only; no backwards-compatibility guarantees.
 
-  Returns the Google Cloud Dataflow container version for remote execution.
+  Args:
+    job_type (str, optional): BEAM job type. Defaults to None.
+
+  Returns:
+    str: The tag of worker container images in GCR that corresponds to
+      current version of the SDK.
   """
   # TODO(silviuc): Handle apache-beam versions when we have official releases.
   import pkg_resources as pkg
@@ -493,7 +522,10 @@ def get_required_container_version():
   except pkg.DistributionNotFound:
     # This case covers Apache Beam end-to-end testing scenarios. All these 
tests
     # will run with a special container version.
-    return BEAM_CONTAINER_VERSION
+    if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
+      return BEAM_FNAPI_CONTAINER_VERSION
+    else:
+      return BEAM_CONTAINER_VERSION
 
 
 def get_sdk_name_and_version():
@@ -501,7 +533,7 @@ def get_sdk_name_and_version():
 
   Returns name and version of SDK reported to Google Cloud Dataflow."""
   import pkg_resources as pkg
-  container_version = get_required_container_version()
+  container_version = _get_required_container_version()
   try:
     pkg.get_distribution(GOOGLE_PACKAGE_NAME)
     return ('Google Cloud Dataflow SDK for Python', container_version)
@@ -513,7 +545,7 @@ def get_sdk_package_name():
   """For internal use only; no backwards-compatibility guarantees.
 
   Returns the PyPI package name to be staged to Google Cloud Dataflow."""
-  container_version = get_required_container_version()
+  container_version = _get_required_container_version()
   if container_version == BEAM_CONTAINER_VERSION:
     return BEAM_PACKAGE_NAME
   return GOOGLE_PACKAGE_NAME

Reply via email to