This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 88da381cc33 Fail the pipeline when a mismatched Python or Beam version
is detected. (#25313)
88da381cc33 is described below
commit 88da381cc3361d551800868372afb2f737e973b5
Author: tvalentyn <[email protected]>
AuthorDate: Tue Feb 7 16:25:36 2023 -0800
Fail the pipeline when a mismatched Python or Beam version is detected.
(#25313)
---
sdks/python/apache_beam/pipeline_test.py | 16 +++++++++++++++
.../apache_beam/runners/worker/bundle_processor.py | 23 ++++++++++++++++++++++
sdks/python/apache_beam/transforms/environments.py | 9 +++++++--
3 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline_test.py
b/sdks/python/apache_beam/pipeline_test.py
index 98d582884e8..18ab0f091aa 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -23,6 +23,7 @@ import copy
import platform
import unittest
+import mock
import pytest
import apache_beam as beam
@@ -680,6 +681,21 @@ class PipelineTest(unittest.TestCase):
self.assertIs(pcoll2_unbounded.is_bounded, False)
self.assertIs(merged.is_bounded, False)
+ def test_incompatible_submission_and_runtime_envs_fail_pipeline(self):
+ with mock.patch(
+ 'apache_beam.transforms.environments.sdk_base_version_capability'
+ ) as base_version:
+ base_version.side_effect = [
+ f"beam:version:sdk_base:apache/beam_python3.5_sdk:2.{i}.0"
+ for i in range(100)
+ ]
+ with self.assertRaisesRegex(
+ RuntimeError,
+ 'Pipeline construction environment and pipeline runtime '
+ 'environment are not compatible.'):
+ with TestPipeline() as p:
+ _ = p | Create([None])
+
class DoFnTest(unittest.TestCase):
def test_element(self):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index cea5062b582..d1bf50bdfce 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -67,6 +67,7 @@ from apache_beam.runners.worker import operations
from apache_beam.runners.worker import statesampler
from apache_beam.transforms import TimeDomain
from apache_beam.transforms import core
+from apache_beam.transforms import environments
from apache_beam.transforms import sideinputs
from apache_beam.transforms import userstate
from apache_beam.transforms import window
@@ -823,6 +824,27 @@ def only_element(iterable):
return element
+def _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor):
+ # type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None
+
+ runtime_sdk = environments.sdk_base_version_capability()
+ for t in process_bundle_descriptor.transforms.values():
+ env = process_bundle_descriptor.environments[t.environment_id]
+ for c in env.capabilities:
+ if (c.startswith(environments.SDK_VERSION_CAPABILITY_PREFIX) and
+ c != runtime_sdk):
+ raise RuntimeError(
+ "Pipeline construction environment and pipeline runtime "
+ "environment are not compatible. If you use a custom "
+ "container image, check that the Python interpreter minor version "
+ "and the Apache Beam version in your image match the versions "
+ "used at pipeline construction time. "
+ f"Submission environment: {c}. "
+ f"Runtime environment: {runtime_sdk}.")
+
+ # TODO: Consider warning on mismatches in versions of installed packages.
+
+
class BundleProcessor(object):
""" A class for processing bundles of elements. """
@@ -846,6 +868,7 @@ class BundleProcessor(object):
self.data_channel_factory = data_channel_factory
self.current_instruction_id = None # type: Optional[str]
+ _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
# There is no guarantee that the runner only set
# timer_api_service_descriptor when having timers. So this field cannot be
# used as an indicator of timers.
diff --git a/sdks/python/apache_beam/transforms/environments.py
b/sdks/python/apache_beam/transforms/environments.py
index 769f3d9331b..825ddef994f 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -90,8 +90,8 @@ def looks_like_json(s):
APACHE_BEAM_DOCKER_IMAGE_PREFIX = 'apache/beam'
-
APACHE_BEAM_JAVA_CONTAINER_NAME_PREFIX = 'beam_java'
+SDK_VERSION_CAPABILITY_PREFIX = 'beam:version:sdk_base:'
def is_apache_beam_container(container_image):
@@ -777,6 +777,11 @@ def python_sdk_docker_capabilities():
return python_sdk_capabilities() +
[common_urns.protocols.SIBLING_WORKERS.urn]
+def sdk_base_version_capability():
+ return (
+ SDK_VERSION_CAPABILITY_PREFIX + DockerEnvironment.default_docker_image())
+
+
def _python_sdk_capabilities_iter():
# type: () -> Iterator[str]
for urn_spec in common_urns.coders.__dict__.values():
@@ -786,7 +791,7 @@ def _python_sdk_capabilities_iter():
yield common_urns.protocols.HARNESS_MONITORING_INFOS.urn
yield common_urns.protocols.WORKER_STATUS.urn
yield python_urns.PACKED_COMBINE_FN
- yield 'beam:version:sdk_base:' + DockerEnvironment.default_docker_image()
+ yield sdk_base_version_capability()
yield common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn
yield common_urns.primitives.TO_STRING.urn