This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ea7380de85 Refactor update compatibility checks and pipeline options 
access (#37566)
9ea7380de85 is described below

commit 9ea7380de850fafceff09cd007d3979987b5ab46
Author: claudevdm <[email protected]>
AuthorDate: Thu Feb 12 16:37:09 2026 -0500

    Refactor update compatibility checks and pipeline options access (#37566)
    
    * Refactor update compat checks.
    
    * trigger dill tests.
---
 .../trigger_files/beam_PreCommit_Python_Dill.json  |   2 +-
 sdks/python/apache_beam/coders/coders.py           |  32 ++--
 .../apache_beam/coders/coders_test_common.py       | 194 +++++++++++----------
 sdks/python/apache_beam/coders/typecoders.py       |   1 -
 .../apache_beam/io/gcp/bigquery_file_loads.py      |   4 +-
 .../apache_beam/io/gcp/bigquery_file_loads_test.py |  15 +-
 .../python/apache_beam/options/pipeline_options.py |  19 ++
 .../apache_beam/options/pipeline_options_test.py   |  66 +++++++
 sdks/python/apache_beam/pipeline_test.py           |   2 +
 sdks/python/apache_beam/transforms/external.py     |   7 +-
 sdks/python/apache_beam/transforms/ptransform.py   |   6 +-
 sdks/python/apache_beam/transforms/util.py         |  36 +---
 sdks/python/apache_beam/transforms/util_test.py    |  78 +--------
 13 files changed, 224 insertions(+), 238 deletions(-)

diff --git a/.github/trigger_files/beam_PreCommit_Python_Dill.json 
b/.github/trigger_files/beam_PreCommit_Python_Dill.json
index 8c604b0a135..840d064bdbb 100644
--- a/.github/trigger_files/beam_PreCommit_Python_Dill.json
+++ b/.github/trigger_files/beam_PreCommit_Python_Dill.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-    "revision": 2
+    "revision": 3
 }
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index b1c607457f9..b9bee458568 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -927,16 +927,16 @@ class CloudpickleCoder(_PickleCoderBase):
 
 class DeterministicFastPrimitivesCoderV2(FastCoder):
   """Throws runtime errors when encoding non-deterministic values."""
-  def __init__(self, coder, step_label, update_compatibility_version=None):
+  def __init__(self, coder, step_label):
     self._underlying_coder = coder
     self._step_label = step_label
     self._use_relative_filepaths = True
     self._version_tag = "v2_69"
-    from apache_beam.transforms.util import is_v1_prior_to_v2
 
     # Versions prior to 2.69.0 did not use relative filepaths.
-    if update_compatibility_version and is_v1_prior_to_v2(
-        v1=update_compatibility_version, v2="2.69.0"):
+    from apache_beam.options.pipeline_options_context import 
get_pipeline_options
+    opts = get_pipeline_options()
+    if opts and opts.is_compat_version_prior_to("2.69.0"):
       self._version_tag = ""
       self._use_relative_filepaths = False
 
@@ -1005,20 +1005,11 @@ class DeterministicFastPrimitivesCoder(FastCoder):
     return Any
 
 
-def _should_force_use_dill(registry):
-  # force_dill_deterministic_coders is for testing purposes. If there is a
-  # DeterministicFastPrimitivesCoder in the pipeline graph but the dill
-  # encoding path is not really triggered dill does not have to be installed.
-  # and this check can be skipped.
-  if getattr(registry, 'force_dill_deterministic_coders', False):
-    return True
+def _should_force_use_dill():
+  from apache_beam.options.pipeline_options_context import get_pipeline_options
 
-  from apache_beam.transforms.util import is_v1_prior_to_v2
-  update_compat_version = registry.update_compatibility_version
-  if not update_compat_version:
-    return False
-
-  if not is_v1_prior_to_v2(v1=update_compat_version, v2="2.68.0"):
+  opts = get_pipeline_options()
+  if opts is None or not opts.is_compat_version_prior_to("2.68.0"):
     return False
 
   try:
@@ -1043,12 +1034,9 @@ def 
_update_compatible_deterministic_fast_primitives_coder(coder, step_label):
    - In SDK version 2.69.0 cloudpickle is used to encode "special types" with
    relative filepaths in code objects and dynamic functions.
   """
-  from apache_beam.coders import typecoders
-
-  if _should_force_use_dill(typecoders.registry):
+  if _should_force_use_dill():
     return DeterministicFastPrimitivesCoder(coder, step_label)
-  return DeterministicFastPrimitivesCoderV2(
-      coder, step_label, typecoders.registry.update_compatibility_version)
+  return DeterministicFastPrimitivesCoderV2(coder, step_label)
 
 
 class FastPrimitivesCoder(FastCoder):
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index ad742665fb8..5b7f5f65a56 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -42,6 +42,8 @@ from apache_beam.coders import coders
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 from apache_beam.coders import typecoders
 from apache_beam.internal import pickler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options_context import 
scoped_pipeline_options
 from apache_beam.runners import pipeline_context
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
@@ -202,9 +204,6 @@ class CodersTest(unittest.TestCase):
     assert not standard - cls.seen, str(standard - cls.seen)
     assert not cls.seen_nested - standard, str(cls.seen_nested - standard)
 
-  def tearDown(self):
-    typecoders.registry.update_compatibility_version = None
-
   @classmethod
   def _observe(cls, coder):
     cls.seen.add(type(coder))
@@ -274,80 +273,82 @@ class CodersTest(unittest.TestCase):
     - In SDK version >=2.69.0 cloudpickle is used to encode "special types"
     with relative filepaths in code objects and dynamic functions.
     """
+    with scoped_pipeline_options(
+        PipelineOptions(update_compatibility_version=compat_version)):
+      coder = coders.FastPrimitivesCoder()
+      if not dill and compat_version == "2.67.0":
+        with self.assertRaises(RuntimeError):
+          coder.as_deterministic_coder(step_label="step")
+        self.skipTest('Dill not installed')
+      deterministic_coder = coder.as_deterministic_coder(step_label="step")
+
+      self.check_coder(deterministic_coder, *self.test_values_deterministic)
+      for v in self.test_values_deterministic:
+        self.check_coder(coders.TupleCoder((deterministic_coder, )), (v, ))
+      self.check_coder(
+          coders.TupleCoder(
+              (deterministic_coder, ) * len(self.test_values_deterministic)),
+          tuple(self.test_values_deterministic))
 
-    typecoders.registry.update_compatibility_version = compat_version
-    coder = coders.FastPrimitivesCoder()
-    if not dill and compat_version == "2.67.0":
-      with self.assertRaises(RuntimeError):
-        coder.as_deterministic_coder(step_label="step")
-      self.skipTest('Dill not installed')
-    deterministic_coder = coder.as_deterministic_coder(step_label="step")
-
-    self.check_coder(deterministic_coder, *self.test_values_deterministic)
-    for v in self.test_values_deterministic:
-      self.check_coder(coders.TupleCoder((deterministic_coder, )), (v, ))
-    self.check_coder(
-        coders.TupleCoder(
-            (deterministic_coder, ) * len(self.test_values_deterministic)),
-        tuple(self.test_values_deterministic))
-
-    self.check_coder(deterministic_coder, {})
-    self.check_coder(deterministic_coder, {2: 'x', 1: 'y'})
-    with self.assertRaises(TypeError):
-      self.check_coder(deterministic_coder, {1: 'x', 'y': 2})
-    self.check_coder(deterministic_coder, [1, {}])
-    with self.assertRaises(TypeError):
-      self.check_coder(deterministic_coder, [1, {1: 'x', 'y': 2}])
-
-    self.check_coder(
-        coders.TupleCoder((deterministic_coder, coder)), (1, {}), ('a', [{}]))
+      self.check_coder(deterministic_coder, {})
+      self.check_coder(deterministic_coder, {2: 'x', 1: 'y'})
+      with self.assertRaises(TypeError):
+        self.check_coder(deterministic_coder, {1: 'x', 'y': 2})
+      self.check_coder(deterministic_coder, [1, {}])
+      with self.assertRaises(TypeError):
+        self.check_coder(deterministic_coder, [1, {1: 'x', 'y': 2}])
 
-    self.check_coder(deterministic_coder, 
test_message.MessageA(field1='value'))
+      self.check_coder(
+          coders.TupleCoder((deterministic_coder, coder)), (1, {}), ('a', 
[{}]))
 
-    # Skip this test during cloudpickle. Dill monkey patches the __reduce__
-    # method for anonymous named tuples (MyNamedTuple) which is not pickleable.
-    # Since the test is parameterized the type gets colbbered.
-    if compat_version == "2.67.0":
       self.check_coder(
-          deterministic_coder, [MyNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])
+          deterministic_coder, test_message.MessageA(field1='value'))
 
-    self.check_coder(
-        deterministic_coder,
-        [AnotherNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])
+      # Skip this test during cloudpickle. Dill monkey patches the __reduce__
+      # method for anonymous named tuples (MyNamedTuple) which is not
+      # pickleable. Since the test is parameterized the type gets colbbered.
+      if compat_version == "2.67.0":
+        self.check_coder(
+            deterministic_coder,
+            [MyNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])
 
-    if dataclasses is not None:
-      self.check_coder(deterministic_coder, FrozenDataClass(1, 2))
-      self.check_coder(deterministic_coder, FrozenKwOnlyDataClass(c=1, d=2))
       self.check_coder(
-          deterministic_coder, FrozenUnInitKwOnlyDataClass(side=11))
+          deterministic_coder,
+          [AnotherNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])
 
-      with self.assertRaises(TypeError):
-        self.check_coder(deterministic_coder, UnFrozenDataClass(1, 2))
-
-      with self.assertRaises(TypeError):
+      if dataclasses is not None:
+        self.check_coder(deterministic_coder, FrozenDataClass(1, 2))
+        self.check_coder(deterministic_coder, FrozenKwOnlyDataClass(c=1, d=2))
         self.check_coder(
-            deterministic_coder, FrozenDataClass(UnFrozenDataClass(1, 2), 3))
+            deterministic_coder, FrozenUnInitKwOnlyDataClass(side=11))
+
         with self.assertRaises(TypeError):
-          self.check_coder(
-              deterministic_coder,
-              AnotherNamedTuple(UnFrozenDataClass(1, 2), 3))
+          self.check_coder(deterministic_coder, UnFrozenDataClass(1, 2))
 
-    self.check_coder(deterministic_coder, list(MyEnum))
-    self.check_coder(deterministic_coder, list(MyIntEnum))
-    self.check_coder(deterministic_coder, list(MyIntFlag))
-    self.check_coder(deterministic_coder, list(MyFlag))
+        with self.assertRaises(TypeError):
+          self.check_coder(
+              deterministic_coder, FrozenDataClass(UnFrozenDataClass(1, 2), 3))
+          with self.assertRaises(TypeError):
+            self.check_coder(
+                deterministic_coder,
+                AnotherNamedTuple(UnFrozenDataClass(1, 2), 3))
 
-    self.check_coder(
-        deterministic_coder,
-        [DefinesGetAndSetState(1), DefinesGetAndSetState((1, 2, 3))])
+      self.check_coder(deterministic_coder, list(MyEnum))
+      self.check_coder(deterministic_coder, list(MyIntEnum))
+      self.check_coder(deterministic_coder, list(MyIntFlag))
+      self.check_coder(deterministic_coder, list(MyFlag))
 
-    with self.assertRaises(TypeError):
-      self.check_coder(deterministic_coder, DefinesGetState(1))
-    with self.assertRaises(TypeError):
       self.check_coder(
-          deterministic_coder, DefinesGetAndSetState({
-              1: 'x', 'y': 2
-          }))
+          deterministic_coder,
+          [DefinesGetAndSetState(1), DefinesGetAndSetState((1, 2, 3))])
+
+      with self.assertRaises(TypeError):
+        self.check_coder(deterministic_coder, DefinesGetState(1))
+      with self.assertRaises(TypeError):
+        self.check_coder(
+            deterministic_coder, DefinesGetAndSetState({
+                1: 'x', 'y': 2
+            }))
 
   @parameterized.expand([
       param(compat_version=None),
@@ -364,28 +365,29 @@ class CodersTest(unittest.TestCase):
     - In SDK version >=2.69.0 cloudpickle is used to encode "special types"
     with relative file.
     """
-    typecoders.registry.update_compatibility_version = compat_version
-    values = [{
-        MyTypedNamedTuple(i, 'a'): MyTypedNamedTuple('a', i)
-        for i in range(10)
-    }]
+    with scoped_pipeline_options(
+        PipelineOptions(update_compatibility_version=compat_version)):
+      values = [{
+          MyTypedNamedTuple(i, 'a'): MyTypedNamedTuple('a', i)
+          for i in range(10)
+      }]
 
-    coder = coders.MapCoder(
-        coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder())
+      coder = coders.MapCoder(
+          coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder())
 
-    if not dill and compat_version == "2.67.0":
-      with self.assertRaises(RuntimeError):
-        coder.as_deterministic_coder(step_label="step")
-      self.skipTest('Dill not installed')
+      if not dill and compat_version == "2.67.0":
+        with self.assertRaises(RuntimeError):
+          coder.as_deterministic_coder(step_label="step")
+        self.skipTest('Dill not installed')
 
-    deterministic_coder = coder.as_deterministic_coder(step_label="step")
+      deterministic_coder = coder.as_deterministic_coder(step_label="step")
 
-    assert isinstance(
-        deterministic_coder._key_coder,
-        coders.DeterministicFastPrimitivesCoderV2 if compat_version
-        in (None, "2.68.0") else coders.DeterministicFastPrimitivesCoder)
+      assert isinstance(
+          deterministic_coder._key_coder,
+          coders.DeterministicFastPrimitivesCoderV2 if compat_version
+          in (None, "2.68.0") else coders.DeterministicFastPrimitivesCoder)
 
-    self.check_coder(deterministic_coder, *values)
+      self.check_coder(deterministic_coder, *values)
 
   def test_dill_coder(self):
     if not dill:
@@ -738,7 +740,6 @@ class CodersTest(unittest.TestCase):
 
     if sys.executable is None:
       self.skipTest('No Python interpreter found')
-    typecoders.registry.update_compatibility_version = compat_version
 
     # pylint: disable=line-too-long
     script = textwrap.dedent(
@@ -750,7 +751,8 @@ class CodersTest(unittest.TestCase):
         import logging
 
         from apache_beam.coders import coders
-        from apache_beam.coders import typecoders
+        from apache_beam.options.pipeline_options_context import 
scoped_pipeline_options
+        from apache_beam.options.pipeline_options import PipelineOptions
         from apache_beam.coders.coders_test_common import MyNamedTuple
         from apache_beam.coders.coders_test_common import MyTypedNamedTuple
         from apache_beam.coders.coders_test_common import MyEnum
@@ -802,20 +804,20 @@ class CodersTest(unittest.TestCase):
         ])
 
         compat_version = {'"'+ compat_version +'"' if compat_version else None}
-        typecoders.registry.update_compatibility_version = compat_version
-        coder = coders.FastPrimitivesCoder()
-        deterministic_coder = coder.as_deterministic_coder("step")
-        
-        results = dict()
-        for test_name, value in test_cases:
-            try:
-                encoded = deterministic_coder.encode(value)
-                results[test_name] = encoded
-            except Exception as e:
-              logging.warning("Encoding failed with %s", e)
-              sys.exit(1)
-        
-        sys.stdout.buffer.write(pickle.dumps(results))
+        with 
scoped_pipeline_options(PipelineOptions(update_compatibility_version=compat_version)):
+            coder = coders.FastPrimitivesCoder()
+            deterministic_coder = coder.as_deterministic_coder("step")
+            
+            results = dict()
+            for test_name, value in test_cases:
+                try:
+                    encoded = deterministic_coder.encode(value)
+                    results[test_name] = encoded
+                except Exception as e:
+                  logging.warning("Encoding failed with %s", e)
+                  sys.exit(1)
+            
+            sys.stdout.buffer.write(pickle.dumps(results))
                 
         
     ''')
diff --git a/sdks/python/apache_beam/coders/typecoders.py 
b/sdks/python/apache_beam/coders/typecoders.py
index ef75a21ce9e..9683e00f0c2 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -84,7 +84,6 @@ class CoderRegistry(object):
     self._coders: Dict[Any, Type[coders.Coder]] = {}
     self.custom_types: List[Any] = []
     self.register_standard_coders(fallback_coder)
-    self.update_compatibility_version = None
 
   def register_standard_coders(self, fallback_coder):
     """Register coders for all basic and composite types."""
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 76f465ddebb..738ace67a5f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -1120,8 +1120,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
          of the load jobs would fail but not other. If any of them fails, then
          copy jobs are not triggered.
     """
-    self.reshuffle_before_load = not util.is_compat_version_prior_to(
-        p.options, "2.65.0")
+    self.reshuffle_before_load = not p.options.is_compat_version_prior_to(
+        "2.65.0")
     if self.reshuffle_before_load:
       # Ensure that TriggerLoadJob retry inputs are deterministic by breaking
       # fusion for inputs.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 30f09ff4f56..191719e6a20 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -485,9 +485,9 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
       param(compat_version=None),
       param(compat_version="2.64.0"),
   ])
-  def test_reshuffle_before_load(self, compat_version):
-    from apache_beam.coders import typecoders
-    typecoders.registry.force_dill_deterministic_coders = True
+  @mock.patch(
+      'apache_beam.coders.coders._should_force_use_dill', return_value=True)
+  def test_reshuffle_before_load(self, mock_force_dill, compat_version):
     destination = 'project1:dataset1.table1'
 
     job_reference = bigquery_api.JobReference()
@@ -523,7 +523,6 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
 
     reshuffle_before_load = compat_version is None
     assert transform.reshuffle_before_load == reshuffle_before_load
-    typecoders.registry.force_dill_deterministic_coders = False
 
   def test_load_job_id_used(self):
     job_reference = bigquery_api.JobReference()
@@ -998,10 +997,10 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
       param(
           is_streaming=True, with_auto_sharding=True, compat_version="2.64.0"),
   ])
+  @mock.patch(
+      'apache_beam.coders.coders._should_force_use_dill', return_value=True)
   def test_triggering_frequency(
-      self, is_streaming, with_auto_sharding, compat_version):
-    from apache_beam.coders import typecoders
-    typecoders.registry.force_dill_deterministic_coders = True
+      self, mock_force_dill, is_streaming, with_auto_sharding, compat_version):
 
     destination = 'project1:dataset1.table1'
 
@@ -1108,8 +1107,6 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
           label='CheckDestinations')
       assert_that(jobs, equal_to(expected_jobs), label='CheckJobs')
 
-    typecoders.registry.force_dill_deterministic_coders = False
-
 
 class BigQueryFileLoadsIT(unittest.TestCase):
 
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 0e1012b2de6..d60d75283ea 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -668,6 +668,25 @@ class PipelineOptions(HasDisplayData):
     view._all_options = self._all_options
     return view
 
+  def is_compat_version_prior_to(self, breaking_change_version):
+    """Check if update_compatibility_version is prior to a breaking change.
+
+    Returns True if the pipeline should use old behavior (i.e., the
+    update_compatibility_version is set and is earlier than the given version).
+    Returns False if update_compatibility_version is not set or is >= the
+    breaking change version.
+
+    Args:
+      breaking_change_version: Version string (e.g., "2.72.0") at which
+        the breaking change was introduced.
+    """
+    v1 = self.view_as(StreamingOptions).update_compatibility_version
+    if v1 is None:
+      return False
+    v1_parts = (v1.split('.') + ['0', '0', '0'])[:3]
+    v2_parts = (breaking_change_version.split('.') + ['0', '0', '0'])[:3]
+    return tuple(map(int, v1_parts)) < tuple(map(int, v2_parts))
+
   def _visible_option_list(self) -> List[str]:
     return sorted(
         option for option in dir(self._visible_options) if option[0] != '_')
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py 
b/sdks/python/apache_beam/options/pipeline_options_test.py
index c683c962527..215c44156ea 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -987,6 +987,72 @@ class PipelineOptionsTest(unittest.TestCase):
                      options.get_all_options()['dataflow_service_options'])
 
 
+class CompatVersionTest(unittest.TestCase):
+  def test_is_compat_version_prior_to(self):
+    test_cases = [
+        # Basic comparison cases
+        ("1.0.0", "2.0.0", True),  # v1 < v2 in major
+        ("2.0.0", "1.0.0", False),  # v1 > v2 in major
+        ("1.1.0", "1.2.0", True),  # v1 < v2 in minor
+        ("1.2.0", "1.1.0", False),  # v1 > v2 in minor
+        ("1.0.1", "1.0.2", True),  # v1 < v2 in patch
+        ("1.0.2", "1.0.1", False),  # v1 > v2 in patch
+
+        # Equal versions
+        ("1.0.0", "1.0.0", False),  # Identical
+        ("0.0.0", "0.0.0", False),  # Both zero
+
+        # Different lengths - shorter vs longer
+        ("1.0", "1.0.0", False),  # Should be equal (1.0 = 1.0.0)
+        ("1.0", "1.0.1", True),  # 1.0.0 < 1.0.1
+        ("1.2", "1.2.0", False),  # Should be equal (1.2 = 1.2.0)
+        ("1.2", "1.2.3", True),  # 1.2.0 < 1.2.3
+        ("2", "2.0.0", False),  # Should be equal (2 = 2.0.0)
+        ("2", "2.0.1", True),  # 2.0.0 < 2.0.1
+        ("1", "2.0", True),  # 1.0.0 < 2.0.0
+
+        # Different lengths - longer vs shorter
+        ("1.0.0", "1.0", False),  # Should be equal
+        ("1.0.1", "1.0", False),  # 1.0.1 > 1.0.0
+        ("1.2.0", "1.2", False),  # Should be equal
+        ("1.2.3", "1.2", False),  # 1.2.3 > 1.2.0
+        ("2.0.0", "2", False),  # Should be equal
+        ("2.0.1", "2", False),  # 2.0.1 > 2.0.0
+        ("2.0", "1", False),  # 2.0.0 > 1.0.0
+
+        # Mixed length comparisons
+        ("1.0", "2.0.0", True),  # 1.0.0 < 2.0.0
+        ("2.0", "1.0.0", False),  # 2.0.0 > 1.0.0
+        ("1", "1.0.1", True),  # 1.0.0 < 1.0.1
+        ("1.1", "1.0.9", False),  # 1.1.0 > 1.0.9
+
+        # Large numbers
+        ("1.9.9", "2.0.0", True),  # 1.9.9 < 2.0.0
+        ("10.0.0", "9.9.9", False),  # 10.0.0 > 9.9.9
+        ("1.10.0", "1.9.0", False),  # 1.10.0 > 1.9.0
+        ("1.2.10", "1.2.9", False),  # 1.2.10 > 1.2.9
+
+        # Sequential versions
+        ("1.0.0", "1.0.1", True),
+        ("1.0.1", "1.0.2", True),
+        ("1.0.9", "1.1.0", True),
+        ("1.9.9", "2.0.0", True),
+    ]
+
+    for v1, v2, expected in test_cases:
+      options = PipelineOptions(update_compatibility_version=v1)
+      self.assertEqual(
+          options.is_compat_version_prior_to(v2),
+          expected,
+          msg=f"Failed {v1} < {v2} == {expected}")
+
+    # None case: no update_compatibility_version set
+    options_no_compat = PipelineOptions()
+    self.assertFalse(
+        options_no_compat.is_compat_version_prior_to("1.0.0"),
+        msg="Should return False when update_compatibility_version is not set")
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 3e7d083cb2f..b28fe3c3d14 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -1023,6 +1023,7 @@ class PipelineOptionsTest(unittest.TestCase):
     self.assertEqual({
         'from_dictionary',
         'get_all_options',
+        'is_compat_version_prior_to',
         'slices',
         'style',
         'view_as',
@@ -1038,6 +1039,7 @@ class PipelineOptionsTest(unittest.TestCase):
     self.assertEqual({
         'from_dictionary',
         'get_all_options',
+        'is_compat_version_prior_to',
         'style',
         'view_as',
         'display_data',
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index 21a863069e6..9469ac717df 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -48,7 +48,6 @@ from apache_beam.runners import pipeline_context
 from apache_beam.runners.portability import artifact_service
 from apache_beam.transforms import environments
 from apache_beam.transforms import ptransform
-from apache_beam.transforms.util import is_compat_version_prior_to
 from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints import native_type_compatibility
 from apache_beam.typehints import row_type
@@ -499,9 +498,9 @@ class SchemaAwareExternalTransform(ptransform.PTransform):
     expansion_service = self._expansion_service
 
     if self._managed_replacement:
-      compat_version_prior_to_current = is_compat_version_prior_to(
-          pcolls.pipeline._options,
-          self._managed_replacement.update_compatibility_version)
+      compat_version_prior_to_current = (
+          pcolls.pipeline._options.is_compat_version_prior_to(
+              self._managed_replacement.update_compatibility_version))
       if not compat_version_prior_to_current:
         payload_builder = self._managed_payload_builder
         expansion_service = self._managed_expansion_service
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index d5985b6212d..06ea822aa0b 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -1062,10 +1062,8 @@ class _PTransformFnPTransform(PTransform):
     return self._fn(pcoll, *args, **kwargs)
 
   def set_options(self, options):
-    # Avoid circular import.
-    from apache_beam.transforms.util import is_compat_version_prior_to
-    self._use_backwards_compatible_label = is_compat_version_prior_to(
-        options, '2.68.0')
+    self._use_backwards_compatible_label = options.is_compat_version_prior_to(
+        '2.68.0')
 
   def default_label(self) -> str:
     # Attempt to give a reasonable name to this transform.
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index fbaab6b4ebb..770a5baec36 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -47,7 +47,6 @@ from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.metrics import Metrics
-from apache_beam.options import pipeline_options
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.pvalue import AsSideInput
@@ -1350,27 +1349,6 @@ class _IdentityWindowFn(NonMergingWindowFn):
     return self._window_coder
 
 
-def is_v1_prior_to_v2(*, v1, v2):
-  if v1 is None:
-    return False
-
-  v1_parts = (v1.split('.') + ['0', '0', '0'])[:3]
-  v2_parts = (v2.split('.') + ['0', '0', '0'])[:3]
-  return tuple(map(int, v1_parts)) < tuple(map(int, v2_parts))
-
-
-def is_compat_version_prior_to(options, breaking_change_version):
-  # This function is used in a branch statement to determine whether we should
-  # keep the old behavior prior to a breaking change or use the new behavior.
-  # - If update_compatibility_version < breaking_change_version, we will return
-  #   True and keep the old behavior.
-  update_compatibility_version = options.view_as(
-      pipeline_options.StreamingOptions).update_compatibility_version
-
-  return is_v1_prior_to_v2(
-      v1=update_compatibility_version, v2=breaking_change_version)
-
-
 def reify_metadata_default_window(
     element, timestamp=DoFn.TimestampParam, pane_info=DoFn.PaneInfoParam):
   key, value = element
@@ -1448,8 +1426,8 @@ class ReshufflePerKey(PTransform):
             for (value, timestamp) in values
         ]
 
-      if is_compat_version_prior_to(pcoll.pipeline.options,
-                                    
RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
+      if pcoll.pipeline.options.is_compat_version_prior_to(
+          RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
         pre_gbk_map = Map(reify_timestamps).with_output_types(Any)
       else:
         pre_gbk_map = Map(reify_timestamps).with_input_types(
@@ -1468,8 +1446,8 @@ class ReshufflePerKey(PTransform):
         key, windowed_values = element
         return [wv.with_value((key, wv.value)) for wv in windowed_values]
 
-      if is_compat_version_prior_to(pcoll.pipeline.options,
-                                    
RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
+      if pcoll.pipeline.options.is_compat_version_prior_to(
+          RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
         pre_gbk_map = Map(reify_timestamps).with_output_types(Any)
       else:
         pre_gbk_map = Map(reify_timestamps).with_input_types(
@@ -1493,7 +1471,7 @@ class ReshufflePerKey(PTransform):
     return result
 
   def expand(self, pcoll):
-    if is_compat_version_prior_to(pcoll.pipeline.options, "2.65.0"):
+    if pcoll.pipeline.options.is_compat_version_prior_to("2.65.0"):
       return self.expand_2_64_0(pcoll)
 
     windowing_saved = pcoll.windowing
@@ -1550,8 +1528,8 @@ class Reshuffle(PTransform):
 
   def expand(self, pcoll):
     # type: (pvalue.PValue) -> pvalue.PCollection
-    if is_compat_version_prior_to(pcoll.pipeline.options,
-                                  RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
+    if pcoll.pipeline.options.is_compat_version_prior_to(
+        RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION):
       reshuffle_step = ReshufflePerKey()
     else:
       reshuffle_step = ReshufflePerKey().with_input_types(
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index 7389568691c..98edb4cc2bd 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -1321,10 +1321,11 @@ class ReshuffleTest(unittest.TestCase):
       param(compat_version=None),
       param(compat_version="2.64.0"),
   ])
-  def test_reshuffle_custom_window_preserves_metadata(self, compat_version):
+  @mock.patch(
+      'apache_beam.coders.coders._should_force_use_dill', return_value=True)
+  def test_reshuffle_custom_window_preserves_metadata(
+      self, mock_force_dill, compat_version):
     """Tests that Reshuffle preserves pane info."""
-    from apache_beam.coders import typecoders
-    typecoders.registry.force_dill_deterministic_coders = True
     element_count = 12
     timestamp_value = timestamp.Timestamp(0)
     l = [
@@ -1418,17 +1419,17 @@ class ReshuffleTest(unittest.TestCase):
           equal_to(expected),
           label='CheckMetadataPreserved',
           reify_windows=True)
-    typecoders.registry.force_dill_deterministic_coders = False
 
   @parameterized.expand([
       param(compat_version=None),
       param(compat_version="2.64.0"),
   ])
-  def test_reshuffle_default_window_preserves_metadata(self, compat_version):
+  @mock.patch(
+      'apache_beam.coders.coders._should_force_use_dill', return_value=True)
+  def test_reshuffle_default_window_preserves_metadata(
+      self, mock_force_dill, compat_version):
     """Tests that Reshuffle preserves timestamp, window, and pane info
     metadata."""
-    from apache_beam.coders import typecoders
-    typecoders.registry.force_dill_deterministic_coders = True
     no_firing = PaneInfo(
         is_first=True,
         is_last=True,
@@ -1502,7 +1503,6 @@ class ReshuffleTest(unittest.TestCase):
           equal_to(expected),
           label='CheckMetadataPreserved',
           reify_windows=True)
-    typecoders.registry.force_dill_deterministic_coders = False
 
   @pytest.mark.it_validatesrunner
   def test_reshuffle_preserves_timestamps(self):
@@ -2521,68 +2521,6 @@ class WaitOnTest(unittest.TestCase):
           label='result')
 
 
-class CompatCheckTest(unittest.TestCase):
-  def test_is_v1_prior_to_v2(self):
-    test_cases = [
-        # Basic comparison cases
-        ("1.0.0", "2.0.0", True),  # v1 < v2 in major
-        ("2.0.0", "1.0.0", False),  # v1 > v2 in major
-        ("1.1.0", "1.2.0", True),  # v1 < v2 in minor
-        ("1.2.0", "1.1.0", False),  # v1 > v2 in minor
-        ("1.0.1", "1.0.2", True),  # v1 < v2 in patch
-        ("1.0.2", "1.0.1", False),  # v1 > v2 in patch
-
-        # Equal versions
-        ("1.0.0", "1.0.0", False),  # Identical
-        ("0.0.0", "0.0.0", False),  # Both zero
-
-        # Different lengths - shorter vs longer
-        ("1.0", "1.0.0", False),  # Should be equal (1.0 = 1.0.0)
-        ("1.0", "1.0.1", True),  # 1.0.0 < 1.0.1
-        ("1.2", "1.2.0", False),  # Should be equal (1.2 = 1.2.0)
-        ("1.2", "1.2.3", True),  # 1.2.0 < 1.2.3
-        ("2", "2.0.0", False),  # Should be equal (2 = 2.0.0)
-        ("2", "2.0.1", True),  # 2.0.0 < 2.0.1
-        ("1", "2.0", True),  # 1.0.0 < 2.0.0
-
-        # Different lengths - longer vs shorter
-        ("1.0.0", "1.0", False),  # Should be equal
-        ("1.0.1", "1.0", False),  # 1.0.1 > 1.0.0
-        ("1.2.0", "1.2", False),  # Should be equal
-        ("1.2.3", "1.2", False),  # 1.2.3 > 1.2.0
-        ("2.0.0", "2", False),  # Should be equal
-        ("2.0.1", "2", False),  # 2.0.1 > 2.0.0
-        ("2.0", "1", False),  # 2.0.0 > 1.0.0
-
-        # Mixed length comparisons
-        ("1.0", "2.0.0", True),  # 1.0.0 < 2.0.0
-        ("2.0", "1.0.0", False),  # 2.0.0 > 1.0.0
-        ("1", "1.0.1", True),  # 1.0.0 < 1.0.1
-        ("1.1", "1.0.9", False),  # 1.1.0 > 1.0.9
-
-        # Large numbers
-        ("1.9.9", "2.0.0", True),  # 1.9.9 < 2.0.0
-        ("10.0.0", "9.9.9", False),  # 10.0.0 > 9.9.9
-        ("1.10.0", "1.9.0", False),  # 1.10.0 > 1.9.0
-        ("1.2.10", "1.2.9", False),  # 1.2.10 > 1.2.9
-
-        # Sequential versions
-        ("1.0.0", "1.0.1", True),
-        ("1.0.1", "1.0.2", True),
-        ("1.0.9", "1.1.0", True),
-        ("1.9.9", "2.0.0", True),
-
-        # Null/None cases
-        (None, "1.0.0", False),  # v1 is None
-    ]
-
-    for v1, v2, expected in test_cases:
-      self.assertEqual(
-          util.is_v1_prior_to_v2(v1=v1, v2=v2),
-          expected,
-          msg=f"Failed {v1} < {v2} == {expected}")
-
-
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()


Reply via email to