This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 87f8ff1 [BEAM-9290] Support runner_harness_container_image in
released python sdks.
new cec1094 Merge pull request #10827 from
angoenka/runner_harness_image_dev
87f8ff1 is described below
commit 87f8ff1996a5d7a787968516234f32837d25135d
Author: Ankur Goenka <[email protected]>
AuthorDate: Mon Feb 10 17:28:20 2020 -0800
[BEAM-9290] Support runner_harness_container_image in released python sdks.
---
.../runners/dataflow/internal/apiclient.py | 11 +++++----
.../runners/dataflow/internal/apiclient_test.py | 26 +++++++++++++++++++++-
2 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 878dc55..b8e9caf 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -184,12 +184,15 @@ class Environment(object):
])
# TODO: Use enumerated type instead of strings for job types.
if job_type.startswith('FNAPI_'):
+ self.debug_options = self.debug_options or DebugOptions()
self.debug_options.experiments = self.debug_options.experiments or []
+ if not self.debug_options.lookup_experiment(
+ 'runner_harness_container_image'):
+ runner_harness_override = (get_runner_harness_container_image())
+ if runner_harness_override:
+ self.debug_options.add_experiment(
+ 'runner_harness_container_image=' + runner_harness_override)
debug_options_experiments = self.debug_options.experiments
- runner_harness_override = (get_runner_harness_container_image())
- if runner_harness_override:
- debug_options_experiments.append(
- 'runner_harness_container_image=' + runner_harness_override)
# Add use_multiple_sdk_containers flag if it's not already present. Do
not
# add the flag if 'no_use_multiple_sdk_containers' is present.
# TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index faaccc4..ee643a2 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -318,7 +318,7 @@ class UtilTest(unittest.TestCase):
'apache_beam.runners.dataflow.internal.apiclient.'
'beam_version.__version__',
'2.2.0')
- def test_harness_override_present_in_released_sdks(self):
+ def test_harness_override_default_in_released_sdks(self):
pipeline_options = PipelineOptions(
['--temp_location', 'gs://any-location/temp', '--streaming'])
override = ''.join([
@@ -335,6 +335,30 @@ class UtilTest(unittest.TestCase):
@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.'
'beam_version.__version__',
+ '2.2.0')
+ def test_harness_override_custom_in_released_sdks(self):
+ pipeline_options = PipelineOptions([
+ '--temp_location',
+ 'gs://any-location/temp',
+ '--streaming',
+ '--experiments=runner_harness_container_image=fake_image'
+ ])
+ env = apiclient.Environment([], #packages
+ pipeline_options,
+ '2.0.0', #any environment version
+ FAKE_PIPELINE_URL)
+ self.assertEqual(
+ 1,
+ len([
+ x for x in env.proto.experiments
+ if x.startswith('runner_harness_container_image=')
+ ]))
+ self.assertIn(
+ 'runner_harness_container_image=fake_image', env.proto.experiments)
+
+ @mock.patch(
+ 'apache_beam.runners.dataflow.internal.apiclient.'
+ 'beam_version.__version__',
'2.2.0.rc1')
def test_harness_override_uses_base_version_in_rc_releases(self):
pipeline_options = PipelineOptions(