This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6319d519c99 Loosen environment validation checks for RCs (#28100)
6319d519c99 is described below
commit 6319d519c99aac3471827b2c24d32957fe78eb69
Author: tvalentyn <[email protected]>
AuthorDate: Tue Aug 22 11:55:20 2023 -0700
Loosen environment validation checks for RCs (#28100)
---
.../apache_beam/runners/worker/bundle_processor.py | 13 ++++++++++++-
.../runners/worker/bundle_processor_test.py | 20 ++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index b3ca92aa9a1..935ba83709c 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -828,6 +828,17 @@ def only_element(iterable):
return element
+def _environments_compatible(submission, runtime):
+ # type: (str, str) -> bool
+ if submission == runtime:
+ return True
+ if 'rc' in submission and runtime in submission:
+ # TODO(https://github.com/apache/beam/issues/28084): Loosen
+ # the check for RCs until RC containers install the matching version.
+ return True
+ return False
+
+
def _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor):
# type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None
@@ -836,7 +847,7 @@ def
_verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor):
env = process_bundle_descriptor.environments[t.environment_id]
for c in env.capabilities:
if (c.startswith(environments.SDK_VERSION_CAPABILITY_PREFIX) and
- c != runtime_sdk):
+ not _environments_compatible(c, runtime_sdk)):
raise RuntimeError(
"Pipeline construction environment and pipeline runtime "
"environment are not compatible. If you use a custom "
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
index 673c1cea111..292b8431063 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
@@ -402,5 +402,25 @@ class DataSamplingTest(unittest.TestCase):
data_sampler.stop()
+class EnvironmentCompatibilityTest(unittest.TestCase):
+ def test_rc_environments_are_compatible_with_released_images(self):
+ # TODO(https://github.com/apache/beam/issues/28084): remove when
+ # resolved.
+ self.assertTrue(
+ bundle_processor._environments_compatible(
+ "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0rc1",
+ "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0"))
+
+ def test_user_modified_sdks_need_to_be_installed_in_runtime_env(self):
+ self.assertFalse(
+ bundle_processor._environments_compatible(
+ "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0-custom",
+ "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0"))
+ self.assertTrue(
+ bundle_processor._environments_compatible(
+ "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0-custom",
+ "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0-custom"))
+
+
if __name__ == '__main__':
unittest.main()