This is an automated email from the ASF dual-hosted git repository. tvalentyn pushed a commit to branch revert-13475-no_knobs in repository https://gitbox.apache.org/repos/asf/beam.git
commit e516676c416d2efc323f77adbc8e762c1ad75d6f Author: tvalentyn <[email protected]> AuthorDate: Tue Dec 15 14:16:19 2020 -0800 Revert "Do not add unnecessary experiment use_multiple_sdk_containers. (#13475)" This reverts commit ba11ed7daa2cf6e7e4d1899f95460f0c829ad4f7. --- .../runners/dataflow/internal/apiclient.py | 7 +++ .../runners/dataflow/internal/apiclient_test.py | 54 ++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 89da139..b9de09b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -210,6 +210,13 @@ class Environment(object): self.debug_options.add_experiment( 'runner_harness_container_image=' + runner_harness_override) debug_options_experiments = self.debug_options.experiments + # Add use_multiple_sdk_containers flag if it's not already present. Do not + # add the flag if 'no_use_multiple_sdk_containers' is present. + # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK + # till version 2.4. + if ('use_multiple_sdk_containers' not in debug_options_experiments and + 'no_use_multiple_sdk_containers' not in debug_options_experiments): + debug_options_experiments.append('use_multiple_sdk_containers') # FlexRS if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED': self.proto.flexResourceSchedulingGoal = ( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 76efd2b..34f4988 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -786,6 +786,60 @@ class UtilTest(unittest.TestCase): self.assertEqual('key5', job.proto.labels.additionalProperties[4].key) self.assertEqual('', job.proto.labels.additionalProperties[4].value) + def test_experiment_use_multiple_sdk_containers(self): + pipeline_options = PipelineOptions([ + '--project', + 'test_project', + '--job_name', + 'test_job_name', + '--temp_location', + 'gs://test-location/temp', + '--experiments', + 'beam_fn_api' + ]) + environment = apiclient.Environment([], + pipeline_options, + 1, + FAKE_PIPELINE_URL) + self.assertIn('use_multiple_sdk_containers', environment.proto.experiments) + + pipeline_options = PipelineOptions([ + '--project', + 'test_project', + '--job_name', + 'test_job_name', + '--temp_location', + 'gs://test-location/temp', + '--experiments', + 'beam_fn_api', + '--experiments', + 'use_multiple_sdk_containers' + ]) + environment = apiclient.Environment([], + pipeline_options, + 1, + FAKE_PIPELINE_URL) + self.assertIn('use_multiple_sdk_containers', environment.proto.experiments) + + pipeline_options = PipelineOptions([ + '--project', + 'test_project', + '--job_name', + 'test_job_name', + '--temp_location', + 'gs://test-location/temp', + '--experiments', + 'beam_fn_api', + '--experiments', + 'no_use_multiple_sdk_containers' + ]) + environment = apiclient.Environment([], + pipeline_options, + 1, + FAKE_PIPELINE_URL) + self.assertNotIn( + 'use_multiple_sdk_containers', environment.proto.experiments) + @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.sys.version_info', (3, 8))
