Repository: incubator-beam Updated Branches: refs/heads/python-sdk 19e3eff91 -> dc92438fa
Compress serialized function data. Pickled data is often quite compressible, but this is particularly useful for concat sources generated for large expansions of filepatterns. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e80dcb11 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e80dcb11 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e80dcb11 Branch: refs/heads/python-sdk Commit: e80dcb11d5df45e021be4a2c8ff5b84fa1460f91 Parents: 19e3eff Author: Robert Bradshaw <[email protected]> Authored: Mon Sep 26 16:24:06 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Mon Sep 26 16:24:06 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/pickler.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e80dcb11/sdks/python/apache_beam/internal/pickler.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 898e04b..30f0b77 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -31,6 +31,7 @@ import logging import sys import traceback import types +import zlib import dill @@ -182,20 +183,22 @@ logging.getLogger('dill').setLevel(logging.WARN) # encoding. This should be cleaned up. def dumps(o): try: - return base64.b64encode(dill.dumps(o)) + s = dill.dumps(o) except Exception: # pylint: disable=broad-except dill.dill._trace(True) # pylint: disable=protected-access - return base64.b64encode(dill.dumps(o)) + s = dill.dumps(o) finally: dill.dill._trace(False) # pylint: disable=protected-access + return base64.b64encode(zlib.compress(s)) -def loads(s): +def loads(encoded): + s = zlib.decompress(base64.b64decode(encoded)) try: - return dill.loads(base64.b64decode(s)) + return dill.loads(s) except Exception: # pylint: disable=broad-except dill.dill._trace(True) # pylint: disable=protected-access - return dill.loads(base64.b64decode(s)) + return dill.loads(s) finally: dill.dill._trace(False) # pylint: disable=protected-access
