This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8074c27  Update Dataflow container, select correct Python 3 container
     new 6ca6f14  Merge pull request #7711 from 
charlesccychen/update-dfpy3-container
8074c27 is described below

commit 8074c274733baee5b2be0197ffac42ee60ef4cc2
Author: Charles Chen <c...@google.com>
AuthorDate: Fri Feb 1 20:40:21 2019 -0800

    Update Dataflow container, select correct Python 3 container
---
 .../runners/dataflow/internal/apiclient.py         |  7 +++++-
 .../runners/dataflow/internal/apiclient_test.py    | 29 ++++++++++++++++------
 2 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index e0c3805..bad5222 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -873,7 +873,12 @@ def get_default_container_image_for_current_sdk(job_type):
   if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
     image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python-fnapi'
   else:
-    image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python'
+    if sys.version_info[0] == 2:
+      image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python'
+    elif sys.version_info[0] == 3:
+      image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python3'
+    else:
+      raise Exception('Dataflow only supports Python versions 2 and 3.')
   image_tag = _get_required_container_version(job_type)
   return image_name + ':' + image_tag
 
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index aa0983e..d5d15f6 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -18,6 +18,7 @@
 
 from __future__ import absolute_import
 
+import sys
 import unittest
 
 import mock
@@ -259,10 +260,16 @@ class UtilTest(unittest.TestCase):
                                 pipeline_options,
                                 '2.0.0', #any environment version
                                 FAKE_PIPELINE_URL)
-    self.assertEqual(
-        env.proto.workerPools[0].workerHarnessContainerImage,
-        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
-         '/python:' + names.BEAM_CONTAINER_VERSION))
+    if sys.version_info[0] == 3:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python3:' + names.BEAM_CONTAINER_VERSION))
+    else:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python:' + names.BEAM_CONTAINER_VERSION))
 
   @mock.patch('apache_beam.runners.dataflow.internal.apiclient.'
               'beam_version.__version__', '2.2.0')
@@ -286,10 +293,16 @@ class UtilTest(unittest.TestCase):
                                 pipeline_options,
                                 '2.0.0', #any environment version
                                 FAKE_PIPELINE_URL)
-    self.assertEqual(
-        env.proto.workerPools[0].workerHarnessContainerImage,
-        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
-         '/python:2.2.0'))
+    if sys.version_info[0] == 3:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python3:2.2.0'))
+    else:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python:2.2.0'))
 
   def test_worker_harness_override_takes_precedence_over_sdk_defaults(self):
     # streaming, fnapi pipeline.

Reply via email to