This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b73fcc6 [BEAM-11113] Switch default pickler compressor back to zlib
for Coders in python sdk
new 38feb03 Merge pull request #13183 from y1chi/BEAM-11113
b73fcc6 is described below
commit b73fcc6502624a1b19db28ec108db4a1424df2d4
Author: Yichi Zhang <[email protected]>
AuthorDate: Fri Oct 23 13:41:37 2020 -0700
[BEAM-11113] Switch default pickler compressor back to zlib for Coders in
python sdk
---
sdks/python/apache_beam/coders/coders.py | 5 +++--
sdks/python/apache_beam/internal/pickler.py | 19 +++++++++++++++----
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/coders/coders.py
b/sdks/python/apache_beam/coders/coders.py
index bc35779..1641ae1 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -107,12 +107,13 @@ ConstructorFn = Callable[[Optional[Any], List['Coder'],
'PipelineContext'], Any]
def serialize_coder(coder):
from apache_beam.internal import pickler
return b'%s$%s' % (
- coder.__class__.__name__.encode('utf-8'), pickler.dumps(coder))
+ coder.__class__.__name__.encode('utf-8'),
+ pickler.dumps(coder, use_zlib=True))
def deserialize_coder(serialized):
from apache_beam.internal import pickler
- return pickler.loads(serialized.split(b'$', 1)[1])
+ return pickler.loads(serialized.split(b'$', 1)[1], use_zlib=True)
# pylint: enable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/internal/pickler.py
b/sdks/python/apache_beam/internal/pickler.py
index c4bfb44..395d511 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -39,6 +39,7 @@ import sys
import threading
import traceback
import types
+import zlib
from typing import Any
from typing import Dict
from typing import Tuple
@@ -241,7 +242,7 @@ if 'save_module' in dir(dill.dill):
logging.getLogger('dill').setLevel(logging.WARN)
-def dumps(o, enable_trace=True):
+def dumps(o, enable_trace=True, use_zlib=False):
# type: (...) -> bytes
"""For internal use only; no backwards-compatibility guarantees."""
@@ -260,18 +261,28 @@ def dumps(o, enable_trace=True):
# Compress as compactly as possible (compresslevel=9) to decrease peak memory
# usage (of multiple in-memory copies) and to avoid hitting protocol buffer
# limits.
- c = bz2.compress(s, compresslevel=9)
+ # WARNING: Be cautious about compressor change since it can lead to pipeline
+ # representation change, and can break streaming job update compatibility on
+ # runners such as Dataflow.
+ if use_zlib:
+ c = zlib.compress(s, 9)
+ else:
+ c = bz2.compress(s, compresslevel=9)
del s # Free up some possibly large and no-longer-needed memory.
return base64.b64encode(c)
-def loads(encoded, enable_trace=True):
+def loads(encoded, enable_trace=True, use_zlib=False):
"""For internal use only; no backwards-compatibility guarantees."""
c = base64.b64decode(encoded)
- s = bz2.decompress(c)
+ if use_zlib:
+ s = zlib.decompress(c)
+ else:
+ s = bz2.decompress(c)
+
del c # Free up some possibly large and no-longer-needed memory.
with _pickle_lock_unless_py2: