This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ba393a26e9 Added function for deterministic ID for class definitions 
by hashing. (#36793)
6ba393a26e9 is described below

commit 6ba393a26e92b34f37083d3de0a83d1cca65eea7
Author: Praneet Nadella <[email protected]>
AuthorDate: Thu Nov 13 16:49:19 2025 -0500

    Added function for deterministic ID for class definitions by hashing. 
(#36793)
    
    * Added function for deterministic ID for class definitions by hashing.
    
    * Trigger CI: Rerun checks
    
    * addresrsing reviwer comments
---
 .../internal/cloudpickle/cloudpickle.py            | 28 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py 
b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
index 8ee770d6169..495e888a516 100644
--- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
+++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
@@ -62,6 +62,7 @@ import dataclasses
 import dis
 from enum import Enum
 import functools
+import hashlib
 import io
 import itertools
 import logging
@@ -98,7 +99,7 @@ _PICKLE_BY_VALUE_MODULES = set()
 _DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
 _DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary()
 _DYNAMIC_CLASS_STATE_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
-_DYNAMIC_CLASS_TRACKER_LOCK = threading.Lock()
+_DYNAMIC_CLASS_TRACKER_LOCK = threading.RLock()
 
 PYPY = platform.python_implementation() == "PyPy"
 
@@ -168,6 +169,7 @@ class CloudPickleConfig:
 
 
 DEFAULT_CONFIG = CloudPickleConfig()
+_GENERATING_SENTINEL = object()
 builtin_code_type = None
 if PYPY:
   # builtin-code objects only exist in pypy
@@ -179,10 +181,21 @@ _extract_code_globals_cache = weakref.WeakKeyDictionary()
 def _get_or_create_tracker_id(class_def, id_generator):
   with _DYNAMIC_CLASS_TRACKER_LOCK:
     class_tracker_id = _DYNAMIC_CLASS_TRACKER_BY_CLASS.get(class_def)
+    if class_tracker_id is _GENERATING_SENTINEL and id_generator:
+      raise RuntimeError(
+          f"Recursive ID generation detected for {class_def}. "
+          f"The id_generator cannot recursively request an ID for the same 
class."
+      )
+
     if class_tracker_id is None and id_generator is not None:
-      class_tracker_id = id_generator(class_def)
-      _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
-      _DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
+      _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = _GENERATING_SENTINEL
+      try:
+        class_tracker_id = id_generator(class_def)
+        _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
+        _DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
+      except Exception:
+        _DYNAMIC_CLASS_TRACKER_BY_CLASS.pop(class_def, None)
+        raise
   return class_tracker_id
 
 
@@ -1720,3 +1733,10 @@ load, loads = pickle.load, pickle.loads
 
 # Backward compat alias.
 CloudPickler = Pickler
+
+
+def hash_dynamic_classdef(classdef):
+  """Generates a deterministic ID by hashing the pickled class definition."""
+  hexdigest = hashlib.sha256(
+      dumps(classdef, config=CloudPickleConfig(id_generator=None))).hexdigest()
+  return hexdigest

Reply via email to