Repository: incubator-beam Updated Branches: refs/heads/python-sdk 4bf848511 -> 8f4551c4e
Don't default to PickleCoder for sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/db0ecc55 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/db0ecc55 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/db0ecc55 Branch: refs/heads/python-sdk Commit: db0ecc5542d6b5ce8cb175eec51cee2a03eac8d5 Parents: 4bf84851 Author: Robert Bradshaw <[email protected]> Authored: Fri Nov 4 14:04:48 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Mon Nov 7 13:26:30 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/typecoders.py | 3 +++ sdks/python/apache_beam/io/concat_source.py | 1 - sdks/python/apache_beam/io/iobase.py | 10 +++++++--- 3 files changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db0ecc55/sdks/python/apache_beam/coders/typecoders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 0681218..c6cdae7 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -125,6 +125,9 @@ class CoderRegistry(object): # In some old code, None is used for Any. # TODO(robertwb): Clean this up. pass + elif typehint is object: + # We explicitly want the fallback coder. + pass elif isinstance(typehint, typehints.TypeVariable): # TODO(robertwb): Clean this up when type inference is fully enabled. pass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db0ecc55/sdks/python/apache_beam/io/concat_source.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py index f2bd238..1656180 100644 --- a/sdks/python/apache_beam/io/concat_source.py +++ b/sdks/python/apache_beam/io/concat_source.py @@ -85,7 +85,6 @@ class ConcatSource(iobase.BoundedSource): # to produce the same coder. return self._source_bundles[0].source.default_output_coder() else: - # Defaulting to PickleCoder. return super(ConcatSource, self).default_output_coder() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db0ecc55/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 9701964..fd46dd6 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -36,7 +36,7 @@ import random import uuid from apache_beam import pvalue -from apache_beam.coders import PickleCoder +from apache_beam import coders from apache_beam.pvalue import AsIter from apache_beam.pvalue import AsSingleton from apache_beam.transforms import core @@ -175,8 +175,12 @@ class BoundedSource(object): raise NotImplementedError def default_output_coder(self): - """Coder that should be used for the records returned by the source.""" - return PickleCoder() + """Coder that should be used for the records returned by the source. + + Should be overridden by sources that produce objects that can be encoded + more efficiently than pickling. + """ + return coders.registry.get_coder(object) class RangeTracker(object):
