This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 939f3dfc0551 [SPARK-50621][PYTHON] Upgrade Cloudpickle to 3.1.0
939f3dfc0551 is described below

commit 939f3dfc05513966b60df5dd2e15be60f7c9e9e0
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Dec 20 09:10:35 2024 +0900

    [SPARK-50621][PYTHON] Upgrade Cloudpickle to 3.1.0
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to upgrade Cloudpickle to 3.1.0
    
    ### Why are the changes needed?
    
    To leverage bug fixes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes several bug fixes listed in 
https://github.com/cloudpipe/cloudpickle/blob/v3.1.0/CHANGES.md#310
    
    ### How was this patch tested?
    
    Existing unittests should cover.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49241 from HyukjinKwon/upgrade-cloudpickle.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/cloudpickle/__init__.py         |  2 +-
 python/pyspark/cloudpickle/cloudpickle.py      | 98 +++++++++++++++++++-------
 python/pyspark/cloudpickle/cloudpickle_fast.py |  1 +
 python/pyspark/cloudpickle/compat.py           | 18 -----
 4 files changed, 76 insertions(+), 43 deletions(-)

diff --git a/python/pyspark/cloudpickle/__init__.py 
b/python/pyspark/cloudpickle/__init__.py
index a3348e8b3da2..4d317916d4e4 100644
--- a/python/pyspark/cloudpickle/__init__.py
+++ b/python/pyspark/cloudpickle/__init__.py
@@ -3,7 +3,7 @@ from pyspark.cloudpickle.cloudpickle import *  # noqa
 
 __doc__ = cloudpickle.__doc__
 
-__version__ = "3.0.0"
+__version__ = "3.1.0"
 
 __all__ = [  # noqa
     "__version__",
diff --git a/python/pyspark/cloudpickle/cloudpickle.py 
b/python/pyspark/cloudpickle/cloudpickle.py
index eb43a9676bbb..8c50ba175e3e 100644
--- a/python/pyspark/cloudpickle/cloudpickle.py
+++ b/python/pyspark/cloudpickle/cloudpickle.py
@@ -126,7 +126,7 @@ def _lookup_class_or_track(class_tracker_id, class_def):
 
 
 def register_pickle_by_value(module):
-    """Register a module to make it functions and classes picklable by value.
+    """Register a module to make its functions and classes picklable by value.
 
     By default, functions and classes that are attributes of an importable
     module are to be pickled by reference, that is relying on re-importing
@@ -213,6 +213,7 @@ def _whichmodule(obj, name):
         # sys.modules
         if (
             module_name == "__main__"
+            or module_name == "__mp_main__"
             or module is None
             or not isinstance(module, types.ModuleType)
         ):
@@ -409,7 +410,10 @@ def _walk_global_ops(code):
 
 def _extract_class_dict(cls):
     """Retrieve a copy of the dict of a class without the inherited method."""
-    clsdict = dict(cls.__dict__)  # copy dict proxy to a dict
+    # Hack to circumvent non-predictable memoization caused by string 
interning.
+    # See the inline comment in _class_setstate for details.
+    clsdict = {"".join(k): cls.__dict__[k] for k in sorted(cls.__dict__)}
+
     if len(cls.__bases__) == 1:
         inherited_dict = cls.__bases__[0].__dict__
     else:
@@ -533,9 +537,15 @@ def _make_skeleton_class(
     The "extra" variable is meant to be a dict (or None) that can be used for
     forward compatibility shall the need arise.
     """
+    # We need to intern the keys of the type_kwargs dict to avoid having
+    # different pickles for the same dynamic class depending on whether it was
+    # dynamically created or reconstructed from a pickled stream.
+    type_kwargs = {sys.intern(k): v for k, v in type_kwargs.items()}
+
     skeleton_class = types.new_class(
         name, bases, {"metaclass": type_constructor}, lambda ns: 
ns.update(type_kwargs)
     )
+
     return _lookup_class_or_track(class_tracker_id, skeleton_class)
 
 
@@ -694,8 +704,10 @@ def _function_getstate(func):
     #   unpickling time by iterating over slotstate and calling setattr(func,
     #   slotname, slotvalue)
     slotstate = {
-        "__name__": func.__name__,
-        "__qualname__": func.__qualname__,
+        # Hack to circumvent non-predictable memoization caused by string 
interning.
+        # See the inline comment in _class_setstate for details.
+        "__name__": "".join(func.__name__),
+        "__qualname__": "".join(func.__qualname__),
         "__annotations__": func.__annotations__,
         "__kwdefaults__": func.__kwdefaults__,
         "__defaults__": func.__defaults__,
@@ -721,7 +733,9 @@ def _function_getstate(func):
     )
     slotstate["__globals__"] = f_globals
 
-    state = func.__dict__
+    # Hack to circumvent non-predictable memoization caused by string 
interning.
+    # See the inline comment in _class_setstate for details.
+    state = {"".join(k): v for k, v in func.__dict__.items()}
     return state, slotstate
 
 
@@ -802,6 +816,19 @@ def _code_reduce(obj):
     # of the specific type from types, for example:
     # >>> from types import CodeType
     # >>> help(CodeType)
+
+    # Hack to circumvent non-predictable memoization caused by string 
interning.
+    # See the inline comment in _class_setstate for details.
+    co_name = "".join(obj.co_name)
+
+    # Create shallow copies of these tuple to make cloudpickle payload 
deterministic.
+    # When creating a code object during load, copies of these four tuples are
+    # created, while in the main process, these tuples can be shared.
+    # By always creating copies, we make sure the resulting payload is 
deterministic.
+    co_names = tuple(name for name in obj.co_names)
+    co_varnames = tuple(name for name in obj.co_varnames)
+    co_freevars = tuple(name for name in obj.co_freevars)
+    co_cellvars = tuple(name for name in obj.co_cellvars)
     if hasattr(obj, "co_exceptiontable"):
         # Python 3.11 and later: there are some new attributes
         # related to the enhanced exceptions.
@@ -814,16 +841,16 @@ def _code_reduce(obj):
             obj.co_flags,
             obj.co_code,
             obj.co_consts,
-            obj.co_names,
-            obj.co_varnames,
+            co_names,
+            co_varnames,
             obj.co_filename,
-            obj.co_name,
+            co_name,
             obj.co_qualname,
             obj.co_firstlineno,
             obj.co_linetable,
             obj.co_exceptiontable,
-            obj.co_freevars,
-            obj.co_cellvars,
+            co_freevars,
+            co_cellvars,
         )
     elif hasattr(obj, "co_linetable"):
         # Python 3.10 and later: obj.co_lnotab is deprecated and constructor
@@ -837,14 +864,14 @@ def _code_reduce(obj):
             obj.co_flags,
             obj.co_code,
             obj.co_consts,
-            obj.co_names,
-            obj.co_varnames,
+            co_names,
+            co_varnames,
             obj.co_filename,
-            obj.co_name,
+            co_name,
             obj.co_firstlineno,
             obj.co_linetable,
-            obj.co_freevars,
-            obj.co_cellvars,
+            co_freevars,
+            co_cellvars,
         )
     elif hasattr(obj, "co_nmeta"):  # pragma: no cover
         # "nogil" Python: modified attributes from 3.9
@@ -859,15 +886,15 @@ def _code_reduce(obj):
             obj.co_flags,
             obj.co_code,
             obj.co_consts,
-            obj.co_varnames,
+            co_varnames,
             obj.co_filename,
-            obj.co_name,
+            co_name,
             obj.co_firstlineno,
             obj.co_lnotab,
             obj.co_exc_handlers,
             obj.co_jump_table,
-            obj.co_freevars,
-            obj.co_cellvars,
+            co_freevars,
+            co_cellvars,
             obj.co_free2reg,
             obj.co_cell2reg,
         )
@@ -882,14 +909,14 @@ def _code_reduce(obj):
             obj.co_flags,
             obj.co_code,
             obj.co_consts,
-            obj.co_names,
-            obj.co_varnames,
+            co_names,
+            co_varnames,
             obj.co_filename,
-            obj.co_name,
+            co_name,
             obj.co_firstlineno,
             obj.co_lnotab,
-            obj.co_freevars,
-            obj.co_cellvars,
+            co_freevars,
+            co_cellvars,
         )
     return types.CodeType, args
 
@@ -1127,7 +1154,30 @@ def _class_setstate(obj, state):
         if attrname == "_abc_impl":
             registry = attr
         else:
+            # Note: setting attribute names on a class automatically triggers 
their
+            # interning in CPython:
+            # 
https://github.com/python/cpython/blob/v3.12.0/Objects/object.c#L957
+            #
+            # This means that to get deterministic pickling for a dynamic 
class that
+            # was initially defined in a different Python process, the pickler
+            # needs to ensure that dynamic class and function attribute names 
are
+            # systematically copied into a non-interned version to avoid
+            # unpredictable pickle payloads.
+            #
+            # Indeed the Pickler's memoizer relies on physical object identity 
to break
+            # cycles in the reference graph of the object being serialized.
             setattr(obj, attrname, attr)
+
+    if sys.version_info >= (3, 13) and "__firstlineno__" in state:
+        # Set the Python 3.13+ only __firstlineno__  attribute one more time, 
as it
+        # will be automatically deleted by the `setattr(obj, attrname, attr)` 
call
+        # above when `attrname` is "__firstlineno__". We assume that 
preserving this
+        # information might be important for some users and that it not stale 
in the
+        # context of cloudpickle usage, hence legitimate to propagate. 
Furthermore it
+        # is necessary to do so to keep deterministic chained pickling as 
tested in
+        # test_deterministic_str_interning_for_chained_dynamic_class_pickling.
+        obj.__firstlineno__ = state["__firstlineno__"]
+
     if registry is not None:
         for subclass in registry:
             obj.register(subclass)
diff --git a/python/pyspark/cloudpickle/cloudpickle_fast.py 
b/python/pyspark/cloudpickle/cloudpickle_fast.py
index 52d6732e44eb..20280f0ca354 100644
--- a/python/pyspark/cloudpickle/cloudpickle_fast.py
+++ b/python/pyspark/cloudpickle/cloudpickle_fast.py
@@ -6,6 +6,7 @@ namespace.
 
 See: tests/test_backward_compat.py
 """
+
 from . import cloudpickle
 
 
diff --git a/python/pyspark/cloudpickle/compat.py 
b/python/pyspark/cloudpickle/compat.py
deleted file mode 100644
index 5e9b52773d27..000000000000
--- a/python/pyspark/cloudpickle/compat.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import sys
-
-
-if sys.version_info < (3, 8):
-    try:
-        import pickle5 as pickle  # noqa: F401
-        from pickle5 import Pickler  # noqa: F401
-    except ImportError:
-        import pickle  # noqa: F401
-
-        # Use the Python pickler for old CPython versions
-        from pickle import _Pickler as Pickler  # noqa: F401
-else:
-    import pickle  # noqa: F401
-
-    # Pickler will the C implementation in CPython and the Python
-    # implementation in PyPy
-    from pickle import Pickler  # noqa: F401


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to