This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c84f28f84aa Add option to pickle relative filepaths in cloudpickle.
(#36300)
c84f28f84aa is described below
commit c84f28f84aa4f38cb7209809fd079835c698f0d4
Author: claudevdm <[email protected]>
AuthorDate: Mon Sep 29 14:36:33 2025 -0400
Add option to pickle relative filepaths in cloudpickle. (#36300)
* Add option to pickle relative filepaths in cloudpickle.
* Use relative filepaths for deterministic coder pickling.
* Make filepath interceptor, add docstrings to CloudPickleConfig, revert
coder changes (they need to be guarded by update compat flag).
---
.../internal/cloudpickle/cloudpickle.py | 116 ++++++++++++++++-----
1 file changed, 92 insertions(+), 24 deletions(-)
diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
index e4fbf0c72f8..b236949a24c 100644
--- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
+++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
@@ -66,6 +66,7 @@ import io
import itertools
import logging
import opcode
+import os
import pickle
from pickle import _getattribute as _pickle_getattribute
import platform
@@ -108,9 +109,28 @@ def uuid_generator(_):
@dataclasses.dataclass
class CloudPickleConfig:
- """Configuration for cloudpickle behavior."""
+ """Configuration for cloudpickle behavior.
+
+ This class controls various aspects of how cloudpickle serializes objects.
+
+ Attributes:
+ id_generator: Callable that generates unique identifiers for dynamic
+ types. Controls isinstance semantics preservation. If None,
+ disables type tracking and isinstance relationships are not
+ preserved across pickle/unpickle cycles. If callable, generates
+ unique IDs to maintain object identity.
+ Default: uuid_generator (generates UUID hex strings).
+
+ skip_reset_dynamic_type_state: Whether to skip resetting state when
+ reconstructing dynamic types. If True, skips state reset for
+ already-reconstructed types.
+
+ filepath_interceptor: Used to modify filepaths in `co_filename` and
+ function.__globals__['__file__'].
+ """
id_generator: typing.Optional[callable] = uuid_generator
skip_reset_dynamic_type_state: bool = False
+ filepath_interceptor: typing.Optional[callable] = None
DEFAULT_CONFIG = CloudPickleConfig()
@@ -396,6 +416,27 @@ def _find_imported_submodules(code,
top_level_dependencies):
return subimports
+def get_relative_path(path):
+ """Returns the path of a filename relative to the longest matching directory
+ in sys.path.
+ Args:
+ path: The path to the file.
+ """
+ abs_path = os.path.abspath(path)
+ longest_match = ""
+
+ for dir_path in sys.path:
+ if not dir_path.endswith(os.path.sep):
+ dir_path += os.path.sep
+
+ if abs_path.startswith(dir_path) and len(dir_path) > len(longest_match):
+ longest_match = dir_path
+
+ if not longest_match:
+ return path
+ return os.path.relpath(abs_path, longest_match)
+
+
# relevant opcodes
STORE_GLOBAL = opcode.opmap["STORE_GLOBAL"]
DELETE_GLOBAL = opcode.opmap["DELETE_GLOBAL"]
@@ -608,7 +649,7 @@ def _make_typevar(
return _lookup_class_or_track(class_tracker_id, tv)
-def _decompose_typevar(obj, config):
+def _decompose_typevar(obj, config: CloudPickleConfig):
return (
obj.__name__,
obj.__bound__,
@@ -619,7 +660,7 @@ def _decompose_typevar(obj, config):
)
-def _typevar_reduce(obj, config):
+def _typevar_reduce(obj, config: CloudPickleConfig):
# TypeVar instances require the module information hence why we
# are not using the _should_pickle_by_reference directly
module_and_name = _lookup_module_and_qualname(obj, name=obj.__name__)
@@ -671,7 +712,7 @@ def _make_dict_items(obj, is_ordered=False):
# -------------------------------------------------
-def _class_getnewargs(obj, config):
+def _class_getnewargs(obj, config: CloudPickleConfig):
type_kwargs = {}
if "__module__" in obj.__dict__:
type_kwargs["__module__"] = obj.__module__
@@ -690,7 +731,7 @@ def _class_getnewargs(obj, config):
)
-def _enum_getnewargs(obj, config):
+def _enum_getnewargs(obj, config: CloudPickleConfig):
members = {e.name: e.value for e in obj}
return (
obj.__bases__,
@@ -831,7 +872,7 @@ def _enum_getstate(obj):
# these holes".
-def _code_reduce(obj):
+def _code_reduce(obj, config: CloudPickleConfig):
"""code object reducer."""
# If you are not sure about the order of arguments, take a look at help
# of the specific type from types, for example:
@@ -850,6 +891,11 @@ def _code_reduce(obj):
co_varnames = tuple(name for name in obj.co_varnames)
co_freevars = tuple(name for name in obj.co_freevars)
co_cellvars = tuple(name for name in obj.co_cellvars)
+
+ co_filename = obj.co_filename
+ if (config and config.filepath_interceptor):
+ co_filename = config.filepath_interceptor(co_filename)
+
if hasattr(obj, "co_exceptiontable"):
# Python 3.11 and later: there are some new attributes
# related to the enhanced exceptions.
@@ -864,7 +910,7 @@ def _code_reduce(obj):
obj.co_consts,
co_names,
co_varnames,
- obj.co_filename,
+ co_filename,
co_name,
obj.co_qualname,
obj.co_firstlineno,
@@ -887,7 +933,7 @@ def _code_reduce(obj):
obj.co_consts,
co_names,
co_varnames,
- obj.co_filename,
+ co_filename,
co_name,
obj.co_firstlineno,
obj.co_linetable,
@@ -908,7 +954,7 @@ def _code_reduce(obj):
obj.co_code,
obj.co_consts,
co_varnames,
- obj.co_filename,
+ co_filename,
co_name,
obj.co_firstlineno,
obj.co_lnotab,
@@ -932,7 +978,7 @@ def _code_reduce(obj):
obj.co_consts,
co_names,
co_varnames,
- obj.co_filename,
+ co_filename,
co_name,
obj.co_firstlineno,
obj.co_lnotab,
@@ -1043,7 +1089,7 @@ def _weakset_reduce(obj):
return weakref.WeakSet, (list(obj), )
-def _dynamic_class_reduce(obj, config):
+def _dynamic_class_reduce(obj, config: CloudPickleConfig):
"""Save a class that can't be referenced as a module attribute.
This method is used to serialize classes that are defined inside
@@ -1074,7 +1120,7 @@ def _dynamic_class_reduce(obj, config):
)
-def _class_reduce(obj, config):
+def _class_reduce(obj, config: CloudPickleConfig):
"""Select the reducer depending on the dynamic nature of the class obj."""
if obj is type(None): # noqa
return type, (None, )
@@ -1169,7 +1215,7 @@ def _function_setstate(obj, state):
setattr(obj, k, v)
-def _class_setstate(obj, state, skip_reset_dynamic_type_state):
+def _class_setstate(obj, state, skip_reset_dynamic_type_state=False):
# Lock while potentially modifying class state.
with _DYNAMIC_CLASS_TRACKER_LOCK:
if skip_reset_dynamic_type_state and obj in
_DYNAMIC_CLASS_STATE_TRACKER_BY_CLASS:
@@ -1240,7 +1286,6 @@ class Pickler(pickle.Pickler):
_dispatch_table[property] = _property_reduce
_dispatch_table[staticmethod] = _classmethod_reduce
_dispatch_table[CellType] = _cell_reduce
- _dispatch_table[types.CodeType] = _code_reduce
_dispatch_table[types.GetSetDescriptorType] = _getset_descriptor_reduce
_dispatch_table[types.ModuleType] = _module_reduce
_dispatch_table[types.MethodType] = _method_reduce
@@ -1300,9 +1345,15 @@ class Pickler(pickle.Pickler):
base_globals = self.globals_ref.setdefault(id(func.__globals__), {})
if base_globals == {}:
+ if "__file__" in func.__globals__:
+ # Apply normalization ONLY to the __file__ attribute
+ file_path = func.__globals__["__file__"]
+ if self.config.filepath_interceptor:
+ file_path = self.config.filepath_interceptor(file_path)
+ base_globals["__file__"] = file_path
# Add module attributes used to resolve relative imports
# instructions inside func.
- for k in ["__package__", "__name__", "__path__", "__file__"]:
+ for k in ["__package__", "__name__", "__path__"]:
if k in func.__globals__:
base_globals[k] = func.__globals__[k]
@@ -1318,15 +1369,16 @@ class Pickler(pickle.Pickler):
def dump(self, obj):
try:
return super().dump(obj)
- except RuntimeError as e:
- if len(e.args) > 0 and "recursion" in e.args[0]:
- msg = "Could not pickle object as excessively deep recursion required."
- raise pickle.PicklingError(msg) from e
- else:
- raise
+ except RecursionError as e:
+ msg = "Could not pickle object as excessively deep recursion required."
+ raise pickle.PicklingError(msg) from e
def __init__(
- self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG):
+ self,
+ file,
+ protocol=None,
+ buffer_callback=None,
+ config: CloudPickleConfig = DEFAULT_CONFIG):
if protocol is None:
protocol = DEFAULT_PROTOCOL
super().__init__(file, protocol=protocol, buffer_callback=buffer_callback)
@@ -1405,6 +1457,8 @@ class Pickler(pickle.Pickler):
return _class_reduce(obj, self.config)
elif isinstance(obj, typing.TypeVar): # Add this check
return _typevar_reduce(obj, self.config)
+ elif isinstance(obj, types.CodeType):
+ return _code_reduce(obj, self.config)
elif isinstance(obj, types.FunctionType):
return self._function_reduce(obj)
else:
@@ -1487,6 +1541,11 @@ class Pickler(pickle.Pickler):
dispatch[typing.TypeVar] = save_typevar
+ def save_code(self, obj, name=None):
+ return self.save_reduce(*_code_reduce(obj, self.config), obj=obj)
+
+ dispatch[types.CodeType] = save_code
+
def save_function(self, obj, name=None):
"""Registered with the dispatch to handle all function types.
@@ -1532,7 +1591,12 @@ class Pickler(pickle.Pickler):
# Shorthands similar to pickle.dump/pickle.dumps
-def dump(obj, file, protocol=None, buffer_callback=None,
config=DEFAULT_CONFIG):
+def dump(
+ obj,
+ file,
+ protocol=None,
+ buffer_callback=None,
+ config: CloudPickleConfig = DEFAULT_CONFIG):
"""Serialize obj as bytes streamed into file
protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
@@ -1550,7 +1614,11 @@ def dump(obj, file, protocol=None, buffer_callback=None,
config=DEFAULT_CONFIG):
config=config).dump(obj)
-def dumps(obj, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG):
+def dumps(
+ obj,
+ protocol=None,
+ buffer_callback=None,
+ config: CloudPickleConfig = DEFAULT_CONFIG):
"""Serialize obj as a string of bytes allocated in memory
protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to