tvalentyn commented on code in PR #35725: URL: https://github.com/apache/beam/pull/35725#discussion_r2309939135
########## sdks/python/apache_beam/coders/coders.py: ########## @@ -940,6 +986,30 @@ def to_type_hint(self): return Any +def _should_force_use_dill(update_compatibility_version): + from apache_beam.transforms.util import is_v1_prior_to_v2 + + if not is_v1_prior_to_v2(v1=update_compatibility_version, v2="2.68.0"): + return False + + try: + import dill + assert dill.__version__ == "0.3.1.1" + except Exception as e: + raise RuntimeError("This pipeline runs with the " \ + "update_compatibility_version=2.67.0 flag. When running with this flag " \ Review Comment: ```suggestion "--update_compatibility_version=2.67.0 or earlier. When running with this option " \ ``` ########## sdks/python/apache_beam/transforms/util.py: ########## @@ -932,6 +932,18 @@ def get_window_coder(self): return self._window_coder +def is_v1_prior_to_v2(v1, v2): + if v1 is None: + return False + + v1 = tuple(map(int, v1.split('.')[0:3])) + v2 = tuple(map(int, v2.split('.')[0:3])) + for i in range(min(len(v1), len(v2))): Review Comment: The logic this flawed for cases like `3.1.0 vs 2.99.0`. you can `return v1 < v2` ########## sdks/python/apache_beam/transforms/util.py: ########## @@ -932,6 +932,18 @@ def get_window_coder(self): return self._window_coder +def is_v1_prior_to_v2(v1, v2): Review Comment: ```suggestion def is_v1_prior_to_v2(*, v1, v2): ``` let's use kv-only args (otherwise the name is a bit confusing) ########## sdks/python/apache_beam/transforms/ptransform_test.py: ########## @@ -738,6 +742,67 @@ def test_flatten_one_single_pcollection(self): result = (pcoll, ) | 'Single Flatten' >> beam.Flatten() assert_that(result, equal_to(input)) + @parameterized.expand([ + param(compat_version=None), + param(compat_version="2.66.0"), + ]) + @pytest.mark.it_validatesrunner + def test_group_by_key_importable_special_types(self, compat_version): Review Comment: what does `importable` refer to here? ########## sdks/python/apache_beam/coders/coder_impl.py: ########## @@ -837,6 +886,7 @@ def decode_from_stream(self, in_, nested): if IntervalWindow is None: from apache_beam.transforms.window import IntervalWindow # instantiating with None is not part of the public interface + # pylint: disable=too-many-function-args Review Comment: just curious, why was this necessary? ########## sdks/python/apache_beam/coders/coders.py: ########## @@ -180,7 +180,8 @@ def is_deterministic(self): """ return False - def as_deterministic_coder(self, step_label, error_message=None): + def as_deterministic_coder( + self, step_label, error_message=None, update_compatibility_version=None): Review Comment: I would consider using coders.registry to store the `update_compatibility_version` parameter, and delegating to the registry on which coder to choose. Especially if we can decide on which coder to use during pipeline submission path, and don't need to fetch the compatibility requirement from pipeline options at runtime, because the coder name would already define if we are using dill-based or cloud-pickle-based implementation. We could do a some initialization of the global coder.registry (https://github.com/apache/beam/blob/6e89a7e2ab7ae946cb97101a3a5d2551d4869ebb/sdks/python/apache_beam/coders/typecoders.py#L213) when we are parsing the pipeline options at job-submission. Then we might be able to avoid plumbing update_compatibility_version through coder machinery and changing the APIs. ########## sdks/python/apache_beam/coders/coders.py: ########## @@ -940,6 +986,30 @@ def to_type_hint(self): return Any +def _should_force_use_dill(update_compatibility_version): + from apache_beam.transforms.util import is_v1_prior_to_v2 + + if not is_v1_prior_to_v2(v1=update_compatibility_version, v2="2.68.0"): + return False + + try: + import dill + assert dill.__version__ == "0.3.1.1" + except Exception as e: + raise RuntimeError("This pipeline runs with the " \ + "update_compatibility_version=2.67.0 flag. When running with this flag " \ + "on SDKs 2.68.0 or higher, you must ensure dill==0.3.1.1 is installed. " \ Review Comment: does this restriction apply unconditionally or only for pipelines that require certain coders? or those coders appear in the vast majority of pipelines, hence wording is unconditional of coders used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org