This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 88da381cc33 Fail the pipeline when a mismatched Python or Beam version 
is detected. (#25313)
88da381cc33 is described below

commit 88da381cc3361d551800868372afb2f737e973b5
Author: tvalentyn <[email protected]>
AuthorDate: Tue Feb 7 16:25:36 2023 -0800

    Fail the pipeline when a mismatched Python or Beam version is detected. 
(#25313)
---
 sdks/python/apache_beam/pipeline_test.py           | 16 +++++++++++++++
 .../apache_beam/runners/worker/bundle_processor.py | 23 ++++++++++++++++++++++
 sdks/python/apache_beam/transforms/environments.py |  9 +++++++--
 3 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 98d582884e8..18ab0f091aa 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -23,6 +23,7 @@ import copy
 import platform
 import unittest
 
+import mock
 import pytest
 
 import apache_beam as beam
@@ -680,6 +681,21 @@ class PipelineTest(unittest.TestCase):
     self.assertIs(pcoll2_unbounded.is_bounded, False)
     self.assertIs(merged.is_bounded, False)
 
+  def test_incompatible_submission_and_runtime_envs_fail_pipeline(self):
+    with mock.patch(
+        'apache_beam.transforms.environments.sdk_base_version_capability'
+    ) as base_version:
+      base_version.side_effect = [
+          f"beam:version:sdk_base:apache/beam_python3.5_sdk:2.{i}.0"
+          for i in range(100)
+      ]
+      with self.assertRaisesRegex(
+          RuntimeError,
+          'Pipeline construction environment and pipeline runtime '
+          'environment are not compatible.'):
+        with TestPipeline() as p:
+          _ = p | Create([None])
+
 
 class DoFnTest(unittest.TestCase):
   def test_element(self):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index cea5062b582..d1bf50bdfce 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -67,6 +67,7 @@ from apache_beam.runners.worker import operations
 from apache_beam.runners.worker import statesampler
 from apache_beam.transforms import TimeDomain
 from apache_beam.transforms import core
+from apache_beam.transforms import environments
 from apache_beam.transforms import sideinputs
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
@@ -823,6 +824,27 @@ def only_element(iterable):
   return element
 
 
+def _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor):
+  # type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None
+
+  runtime_sdk = environments.sdk_base_version_capability()
+  for t in process_bundle_descriptor.transforms.values():
+    env = process_bundle_descriptor.environments[t.environment_id]
+    for c in env.capabilities:
+      if (c.startswith(environments.SDK_VERSION_CAPABILITY_PREFIX) and
+          c != runtime_sdk):
+        raise RuntimeError(
+            "Pipeline construction environment and pipeline runtime "
+            "environment are not compatible. If you use a custom "
+            "container image, check that the Python interpreter minor version "
+            "and the Apache Beam version in your image match the versions "
+            "used at pipeline construction time. "
+            f"Submission environment: {c}. "
+            f"Runtime environment: {runtime_sdk}.")
+
+  # TODO: Consider warning on mismatches in versions of installed packages.
+
+
 class BundleProcessor(object):
   """ A class for processing bundles of elements. """
 
@@ -846,6 +868,7 @@ class BundleProcessor(object):
     self.data_channel_factory = data_channel_factory
     self.current_instruction_id = None  # type: Optional[str]
 
+    _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
     # There is no guarantee that the runner only set
     # timer_api_service_descriptor when having timers. So this field cannot be
     # used as an indicator of timers.
diff --git a/sdks/python/apache_beam/transforms/environments.py 
b/sdks/python/apache_beam/transforms/environments.py
index 769f3d9331b..825ddef994f 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -90,8 +90,8 @@ def looks_like_json(s):
 
 
 APACHE_BEAM_DOCKER_IMAGE_PREFIX = 'apache/beam'
-
 APACHE_BEAM_JAVA_CONTAINER_NAME_PREFIX = 'beam_java'
+SDK_VERSION_CAPABILITY_PREFIX = 'beam:version:sdk_base:'
 
 
 def is_apache_beam_container(container_image):
@@ -777,6 +777,11 @@ def python_sdk_docker_capabilities():
   return python_sdk_capabilities() + 
[common_urns.protocols.SIBLING_WORKERS.urn]
 
 
+def sdk_base_version_capability():
+  return (
+      SDK_VERSION_CAPABILITY_PREFIX + DockerEnvironment.default_docker_image())
+
+
 def _python_sdk_capabilities_iter():
   # type: () -> Iterator[str]
   for urn_spec in common_urns.coders.__dict__.values():
@@ -786,7 +791,7 @@ def _python_sdk_capabilities_iter():
   yield common_urns.protocols.HARNESS_MONITORING_INFOS.urn
   yield common_urns.protocols.WORKER_STATUS.urn
   yield python_urns.PACKED_COMBINE_FN
-  yield 'beam:version:sdk_base:' + DockerEnvironment.default_docker_image()
+  yield sdk_base_version_capability()
   yield common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn
   yield common_urns.primitives.TO_STRING.urn
 

Reply via email to