This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6ba393a26e9 Added function for deterministic ID for class definitions
by hashing. (#36793)
6ba393a26e9 is described below
commit 6ba393a26e92b34f37083d3de0a83d1cca65eea7
Author: Praneet Nadella <[email protected]>
AuthorDate: Thu Nov 13 16:49:19 2025 -0500
Added function for deterministic ID for class definitions by hashing.
(#36793)
* Added function for deterministic ID for class definitions by hashing.
* Trigger CI: Rerun checks
* addresrsing reviwer comments
---
.../internal/cloudpickle/cloudpickle.py | 28 ++++++++++++++++++----
1 file changed, 24 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
index 8ee770d6169..495e888a516 100644
--- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
+++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
@@ -62,6 +62,7 @@ import dataclasses
import dis
from enum import Enum
import functools
+import hashlib
import io
import itertools
import logging
@@ -98,7 +99,7 @@ _PICKLE_BY_VALUE_MODULES = set()
_DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
_DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary()
_DYNAMIC_CLASS_STATE_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
-_DYNAMIC_CLASS_TRACKER_LOCK = threading.Lock()
+_DYNAMIC_CLASS_TRACKER_LOCK = threading.RLock()
PYPY = platform.python_implementation() == "PyPy"
@@ -168,6 +169,7 @@ class CloudPickleConfig:
DEFAULT_CONFIG = CloudPickleConfig()
+_GENERATING_SENTINEL = object()
builtin_code_type = None
if PYPY:
# builtin-code objects only exist in pypy
@@ -179,10 +181,21 @@ _extract_code_globals_cache = weakref.WeakKeyDictionary()
def _get_or_create_tracker_id(class_def, id_generator):
with _DYNAMIC_CLASS_TRACKER_LOCK:
class_tracker_id = _DYNAMIC_CLASS_TRACKER_BY_CLASS.get(class_def)
+ if class_tracker_id is _GENERATING_SENTINEL and id_generator:
+ raise RuntimeError(
+ f"Recursive ID generation detected for {class_def}. "
+ f"The id_generator cannot recursively request an ID for the same
class."
+ )
+
if class_tracker_id is None and id_generator is not None:
- class_tracker_id = id_generator(class_def)
- _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
- _DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
+ _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = _GENERATING_SENTINEL
+ try:
+ class_tracker_id = id_generator(class_def)
+ _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
+ _DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
+ except Exception:
+ _DYNAMIC_CLASS_TRACKER_BY_CLASS.pop(class_def, None)
+ raise
return class_tracker_id
@@ -1720,3 +1733,10 @@ load, loads = pickle.load, pickle.loads
# Backward compat alias.
CloudPickler = Pickler
+
+
+def hash_dynamic_classdef(classdef):
+ """Generates a deterministic ID by hashing the pickled class definition."""
+ hexdigest = hashlib.sha256(
+ dumps(classdef, config=CloudPickleConfig(id_generator=None))).hexdigest()
+ return hexdigest