This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fe5b3b3bd8 Log warning instead of raising an exception for 
unsupported pickle option (#34852)
2fe5b3b3bd8 is described below

commit 2fe5b3b3bd8b541bc0737c8312ecdaa975c382aa
Author: Adrian Stoll <[email protected]>
AuthorDate: Mon May 19 10:38:43 2025 -0700

    Log warning instead of raising an exception for unsupported pickle option 
(#34852)
    
    * Log warning instead of raising an exception for unsupported cloudpickle 
option.
    
    This will allow the Flume runner to unconditionally set
    enable_best_effort_deterministic_pickling to true without having to
    check which pickle library is enabled (which is slightly more
    complicated).
    
    See: #34410
    
    * Fix linter errors
---
 sdks/python/apache_beam/internal/cloudpickle_pickler.py      | 6 +++++-
 sdks/python/apache_beam/internal/cloudpickle_pickler_test.py | 6 +++++-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py 
b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
index ccfe798c5c0..9a40c040dd9 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
@@ -30,6 +30,7 @@ dump_session and load_session are no-ops.
 import base64
 import bz2
 import io
+import logging
 import sys
 import threading
 import zlib
@@ -79,6 +80,7 @@ EnumDescriptor = _get_proto_enum_descriptor_class()
 _pickle_lock = threading.RLock()
 RLOCK_TYPE = type(_pickle_lock)
 LOCK_TYPE = type(threading.Lock())
+_LOGGER = logging.getLogger(__name__)
 
 
 def _reconstruct_enum_descriptor(full_name):
@@ -116,7 +118,9 @@ def dumps(
   if enable_best_effort_determinism:
     # TODO: Add support once https://github.com/cloudpipe/cloudpickle/pull/563
     # is merged in.
-    raise NotImplementedError('This option has only been implemeneted for 
dill')
+    _LOGGER.warning(
+        'Ignoring unsupported option: enable_best_effort_determinism. '
+        'This has only been implemented for dill.')
   with _pickle_lock:
     with io.BytesIO() as file:
       pickler = cloudpickle.CloudPickler(file)
diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py 
b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
index 402cdda12e1..b63ebd6c710 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
@@ -213,8 +213,12 @@ self.assertEqual(DataClass(datum='abc'), 
loads(dumps(DataClass(datum='abc'))))
     ''')
 
   def test_best_effort_determinism_not_implemented(self):
-    with self.assertRaises(NotImplementedError):
+    with self.assertLogs('apache_beam.internal.cloudpickle_pickler',
+                         "WARNING") as l:
       dumps(123, enable_best_effort_determinism=True)
+      self.assertIn(
+          'Ignoring unsupported option: enable_best_effort_determinism',
+          '\n'.join(l.output))
 
 
 if __name__ == '__main__':

Reply via email to