ryanthompson591 commented on a change in pull request #15472:
URL: https://github.com/apache/beam/pull/15472#discussion_r742882919



##########
File path: sdks/python/apache_beam/internal/cloudpickle_pickler.py
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pickler for values, functions, and classes.
+
+For internal use only. No backwards compatibility guarantees.
+
+Uses the cloudpickle library to pickle data, functions, lambdas
+and classes.
+
+dump_session and load_session are no ops.
+"""
+
+# pytype: skip-file
+
+import base64
+import bz2
+import io
+import logging
+import sys
+import threading
+import traceback
+import types
+import zlib
+from typing import Any
+from typing import Dict
+from typing import Tuple
+from _thread import RLock as RLockType
+
+try:
+  from absl import flags
+except (ImportError, ModuleNotFoundError):
+  pass
+
+import cloudpickle
+
+# Pickling, especially unpickling, causes broken module imports on Python 3
+# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884.
+_pickle_lock = threading.RLock()
+import __main__ as _main_module
+
+
+def dumps(o, enable_trace=True, use_zlib=False):
+  # type: (...) -> bytes
+
+  """For internal use only; no backwards-compatibility guarantees."""
+  with _pickle_lock:
+    with io.BytesIO() as file:
+      pickler = cloudpickle.CloudPickler(file)
+      pickler.dispatch_table[RLockType] = _pickle_rlock
+      try:
+        pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
+      except NameError:
+        pass
+      pickler.dump(o)
+      s = file.getvalue()
+      # TODO(ryanthompson): See if echoing dill.enable_trace is useful.
+
+  # Compress as compactly as possible (compresslevel=9) to decrease peak memory
+  # usage (of multiple in-memory copies) and to avoid hitting protocol buffer
+  # limits.
+  # WARNING: Be cautious about compressor change since it can lead to pipeline
+  # representation change, and can break streaming job update compatibility on
+  # runners such as Dataflow.
+  if use_zlib:
+    c = zlib.compress(s, 9)
+  else:
+    c = bz2.compress(s, compresslevel=9)
+  del s  # Free up some possibly large and no-longer-needed memory.
+
+  return base64.b64encode(c)
+
+
+def loads(encoded, enable_trace=True, use_zlib=False):
+  """For internal use only; no backwards-compatibility guarantees."""
+
+  c = base64.b64decode(encoded)
+
+  if use_zlib:
+    s = zlib.decompress(c)
+  else:
+    s = bz2.decompress(c)
+
+  del c  # Free up some possibly large and no-longer-needed memory.
+
+  with _pickle_lock:
+    unpickled = cloudpickle.loads(s)
+    return unpickled
+
+
+def _pickle_rlock(obj):
+  return _create_rlock, tuple([])
+
+
+def _create_rlock():
+  return RLockType()

Review comment:
       yes. Dill does do that.
   
https://github.com/uqfoundation/dill/blob/282560bdb10980e4addcf60f9ef2e124fe3cdd83/dill/_dill.py#L1087
   
   What dill does, that I don't plan on doing, is actually setting the locks to 
locked.  
   
   I think the purpose in replicating some of this behavior, is to make sure 
that we are backward compatible and don't break all our tests and users.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to