This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/softenDeterminismRequirement in repository https://gitbox.apache.org/repos/asf/beam.git
commit 60bd6b4be4d68065026024522e769805e62161f0 Author: Danny McCormick <[email protected]> AuthorDate: Mon Oct 13 15:45:24 2025 -0400 Softens the GBEK determinism requirement --- sdks/python/apache_beam/transforms/util.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 5af9d904895..9ae52274cd0 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -546,13 +546,16 @@ class GroupByEncryptedKey(PTransform): pcoll.element_type).tuple_types) kv_type_hint = typehints.KV[key_type, value_type] if kv_type_hint and kv_type_hint != typehints.Any: - coder = coders.registry.get_coder(kv_type_hint).as_deterministic_coder( - f'GroupByEncryptedKey {self.label}' - 'The key coder is not deterministic. This may result in incorrect ' - 'pipeline output. This can be fixed by adding a type hint to the ' - 'operation preceding the GroupByKey step, and for custom key ' - 'classes, by writing a deterministic custom Coder. Please see the ' - 'documentation for more details.') + coder = coders.registry.get_coder(kv_type_hint) + try: + coder = coder.as_deterministic_coder() + except ValueError: + logging.warning(f'GroupByEncryptedKey {self.label}: ' + 'The key coder is not deterministic. This may result in incorrect ' + 'pipeline output. This can be fixed by adding a type hint to the ' + 'operation preceding the GroupByKey step, and for custom key ' + 'classes, by writing a deterministic custom Coder. Please see the ' + 'documentation for more details.') if not coder.is_kv_coder(): raise ValueError( 'Input elements to the transform %s with stateful DoFn must be '
