tvalentyn commented on code in PR #35725:
URL: https://github.com/apache/beam/pull/35725#discussion_r2309939135


##########
sdks/python/apache_beam/coders/coders.py:
##########
@@ -940,6 +986,30 @@ def to_type_hint(self):
     return Any
 
 
+def _should_force_use_dill(update_compatibility_version):
+  from apache_beam.transforms.util import is_v1_prior_to_v2
+
+  if not is_v1_prior_to_v2(v1=update_compatibility_version, v2="2.68.0"):
+    return False
+
+  try:
+    import dill
+    assert dill.__version__ == "0.3.1.1"
+  except Exception as e:
+    raise RuntimeError("This pipeline runs with the " \
+    "update_compatibility_version=2.67.0 flag. When running with this flag " \

Review Comment:
   ```suggestion
       "--update_compatibility_version=2.67.0 or earlier. When running with 
this option " \
   ```



##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -932,6 +932,18 @@ def get_window_coder(self):
     return self._window_coder
 
 
+def is_v1_prior_to_v2(v1, v2):
+  if v1 is None:
+    return False
+
+  v1 = tuple(map(int, v1.split('.')[0:3]))
+  v2 = tuple(map(int, v2.split('.')[0:3]))
+  for i in range(min(len(v1), len(v2))):

Review Comment:
   The logic this flawed for cases like `3.1.0  vs 2.99.0`. you can `return v1 
< v2`



##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -932,6 +932,18 @@ def get_window_coder(self):
     return self._window_coder
 
 
+def is_v1_prior_to_v2(v1, v2):

Review Comment:
   ```suggestion
   def is_v1_prior_to_v2(*, v1, v2):
   ```
   let's use kv-only args (otherwise the name is a bit confusing)



##########
sdks/python/apache_beam/transforms/ptransform_test.py:
##########
@@ -738,6 +742,67 @@ def test_flatten_one_single_pcollection(self):
       result = (pcoll, ) | 'Single Flatten' >> beam.Flatten()
       assert_that(result, equal_to(input))
 
+  @parameterized.expand([
+      param(compat_version=None),
+      param(compat_version="2.66.0"),
+  ])
+  @pytest.mark.it_validatesrunner
+  def test_group_by_key_importable_special_types(self, compat_version):

Review Comment:
   what does `importable` refer to here?



##########
sdks/python/apache_beam/coders/coder_impl.py:
##########
@@ -837,6 +886,7 @@ def decode_from_stream(self, in_, nested):
       if IntervalWindow is None:
         from apache_beam.transforms.window import IntervalWindow
     # instantiating with None is not part of the public interface
+    # pylint: disable=too-many-function-args

Review Comment:
   just curious, why was this necessary?



##########
sdks/python/apache_beam/coders/coders.py:
##########
@@ -180,7 +180,8 @@ def is_deterministic(self):
     """
     return False
 
-  def as_deterministic_coder(self, step_label, error_message=None):
+  def as_deterministic_coder(
+      self, step_label, error_message=None, update_compatibility_version=None):

Review Comment:
   I would consider using coders.registry to store the 
`update_compatibility_version` parameter, and delegating to the registry on 
which coder to choose. Especially if we can decide on which coder to use during 
pipeline submission path, and don't need to fetch the compatibility requirement 
from pipeline options at runtime, because the coder name would already define 
if we are using dill-based or cloud-pickle-based implementation.
   
   We could do a some initialization of the global coder.registry 
(https://github.com/apache/beam/blob/6e89a7e2ab7ae946cb97101a3a5d2551d4869ebb/sdks/python/apache_beam/coders/typecoders.py#L213)
 when we are parsing the pipeline options at job-submission. Then we might be 
able to avoid plumbing update_compatibility_version through coder machinery and 
changing the APIs.



##########
sdks/python/apache_beam/coders/coders.py:
##########
@@ -940,6 +986,30 @@ def to_type_hint(self):
     return Any
 
 
+def _should_force_use_dill(update_compatibility_version):
+  from apache_beam.transforms.util import is_v1_prior_to_v2
+
+  if not is_v1_prior_to_v2(v1=update_compatibility_version, v2="2.68.0"):
+    return False
+
+  try:
+    import dill
+    assert dill.__version__ == "0.3.1.1"
+  except Exception as e:
+    raise RuntimeError("This pipeline runs with the " \
+    "update_compatibility_version=2.67.0 flag. When running with this flag " \
+    "on SDKs 2.68.0 or higher, you must ensure dill==0.3.1.1 is installed. " \

Review Comment:
   does this restriction apply unconditionally or only for pipelines that 
require certain coders? or those coders appear in the vast majority  of 
pipelines, hence wording is unconditional of coders used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to