This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2fe5b3b3bd8 Log warning instead of raising an exception for
unsupported pickle option (#34852)
2fe5b3b3bd8 is described below
commit 2fe5b3b3bd8b541bc0737c8312ecdaa975c382aa
Author: Adrian Stoll <[email protected]>
AuthorDate: Mon May 19 10:38:43 2025 -0700
Log warning instead of raising an exception for unsupported pickle option
(#34852)
* Log warning instead of raising an exception for unsupported cloudpickle
option.
This will allow the Flume runner to unconditionally set
enable_best_effort_deterministic_pickling to true without having to
check which pickle library is enabled (which is slightly more
complicated).
See: #34410
* Fix linter errors
---
sdks/python/apache_beam/internal/cloudpickle_pickler.py | 6 +++++-
sdks/python/apache_beam/internal/cloudpickle_pickler_test.py | 6 +++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py
b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
index ccfe798c5c0..9a40c040dd9 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py
@@ -30,6 +30,7 @@ dump_session and load_session are no-ops.
import base64
import bz2
import io
+import logging
import sys
import threading
import zlib
@@ -79,6 +80,7 @@ EnumDescriptor = _get_proto_enum_descriptor_class()
_pickle_lock = threading.RLock()
RLOCK_TYPE = type(_pickle_lock)
LOCK_TYPE = type(threading.Lock())
+_LOGGER = logging.getLogger(__name__)
def _reconstruct_enum_descriptor(full_name):
@@ -116,7 +118,9 @@ def dumps(
if enable_best_effort_determinism:
# TODO: Add support once https://github.com/cloudpipe/cloudpickle/pull/563
# is merged in.
- raise NotImplementedError('This option has only been implemeneted for
dill')
+ _LOGGER.warning(
+ 'Ignoring unsupported option: enable_best_effort_determinism. '
+ 'This has only been implemented for dill.')
with _pickle_lock:
with io.BytesIO() as file:
pickler = cloudpickle.CloudPickler(file)
diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
index 402cdda12e1..b63ebd6c710 100644
--- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
+++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
@@ -213,8 +213,12 @@ self.assertEqual(DataClass(datum='abc'),
loads(dumps(DataClass(datum='abc'))))
''')
def test_best_effort_determinism_not_implemented(self):
- with self.assertRaises(NotImplementedError):
+ with self.assertLogs('apache_beam.internal.cloudpickle_pickler',
+ "WARNING") as l:
dumps(123, enable_best_effort_determinism=True)
+ self.assertIn(
+ 'Ignoring unsupported option: enable_best_effort_determinism',
+ '\n'.join(l.output))
if __name__ == '__main__':