This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cd77c1e Allow more efficient encoding of generic iterable types. new 897ff50 Merge pull request #7427 More efficient generic iterables cd77c1e is described below commit cd77c1e244aeb765fb43ece5637f39f4e6049dfd Author: Robert Bradshaw <rober...@gmail.com> AuthorDate: Mon Jan 7 14:59:42 2019 +0100 Allow more efficient encoding of generic iterable types. --- sdks/python/apache_beam/coders/coder_impl.pxd | 5 +- sdks/python/apache_beam/coders/coder_impl.py | 64 +++++++++++++++------- .../apache_beam/runners/worker/bundle_processor.py | 7 +++ 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 9d5ac80..c5ce4e8 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -74,10 +74,13 @@ cdef class DeterministicFastPrimitivesCoderImpl(CoderImpl): cdef object NoneType cdef unsigned char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE, BOOL_TYPE cdef unsigned char BYTES_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE -cdef unsigned char SET_TYPE +cdef unsigned char SET_TYPE, ITERABLE_LIKE_TYPE + +cdef set _ITERABLE_LIKE_TYPES cdef class FastPrimitivesCoderImpl(StreamCoderImpl): cdef CoderImpl fallback_coder_impl + cdef CoderImpl iterable_coder_impl @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t, unicode_value=unicode) cpdef encode_to_stream(self, value, OutputStream stream, bint nested) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 244b82c..d419038 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -282,6 +282,14 @@ LIST_TYPE = 5 TUPLE_TYPE = 6 DICT_TYPE = 7 SET_TYPE = 8 +ITERABLE_LIKE_TYPE = 10 + + +# Types that can be encoded as iterables, but are not literally +# lists, etc. due to being lazy. The actual type is not preserved +# through encoding, only the elements. This is particularly useful +# for the value list types created in GroupByKey. +_ITERABLE_LIKE_TYPES = set() class FastPrimitivesCoderImpl(StreamCoderImpl): @@ -289,6 +297,11 @@ class FastPrimitivesCoderImpl(StreamCoderImpl): def __init__(self, fallback_coder_impl): self.fallback_coder_impl = fallback_coder_impl + self.iterable_coder_impl = IterableCoderImpl(self) + + @staticmethod + def register_iterable_like_type(t): + _ITERABLE_LIKE_TYPES.add(t) def get_estimated_size_and_observables(self, value, nested=False): if isinstance(value, observable.ObservableMixin): @@ -346,6 +359,9 @@ class FastPrimitivesCoderImpl(StreamCoderImpl): elif t is bool: stream.write_byte(BOOL_TYPE) stream.write_byte(value) + elif t in _ITERABLE_LIKE_TYPES: + stream.write_byte(ITERABLE_LIKE_TYPE) + self.iterable_coder_impl.encode_to_stream(value, stream, nested) else: stream.write_byte(UNKNOWN_TYPE) self.fallback_coder_impl.encode_to_stream(value, stream, nested) @@ -379,6 +395,8 @@ class FastPrimitivesCoderImpl(StreamCoderImpl): return v elif t == BOOL_TYPE: return not not stream.read_byte() + elif t == ITERABLE_LIKE_TYPE: + return self.iterable_coder_impl.decode_from_stream(stream, nested) elif t == UNKNOWN_TYPE: return self.fallback_coder_impl.decode_from_stream(stream, nested) else: @@ -615,6 +633,30 @@ class TupleCoderImpl(AbstractComponentCoderImpl): return tuple(components) +class _ConcatSequence(object): + def __init__(self, head, tail): + self._head = head + self._tail = tail + + def __iter__(self): + for elem in self._head: + yield elem + for elem in self._tail: + yield elem + + def __eq__(self, other): + return list(self) == list(other) + + def __hash__(self): + raise NotImplementedError + + def __reduce__(self): + return list, (list(self),) + + +FastPrimitivesCoderImpl.register_iterable_like_type(_ConcatSequence) + + class SequenceCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees. @@ -725,28 +767,8 @@ class SequenceCoderImpl(StreamCoderImpl): raise ValueError( 'Cannot read state-written iterable without state reader.') - class FullIterable(object): - def __init__(self, head, tail): - self._head = head - self._tail = tail - - def __iter__(self): - for elem in self._head: - yield elem - for elem in self._tail: - yield elem - - def __eq__(self, other): - return list(self) == list(other) - - def __hash__(self): - raise NotImplementedError - - def __reduce__(self): - return list, (list(self),) - state_token = in_stream.read_all(True) - elements = FullIterable( + elements = _ConcatSequence( elements, self._read_state(state_token, self._elem_coder)) return self._construct_from_sequence(elements) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 679b8ea..b3b91ee 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -151,6 +151,10 @@ class _StateBackedIterable(object): return list, (list(self),) +coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type( + _StateBackedIterable) + + class StateBackedSideInputMap(object): def __init__(self, state_handler, transform_id, tag, side_input_data, coder): self._state_handler = state_handler @@ -269,6 +273,9 @@ class _ConcatIterable(object): yield elem +coder_impl.FastPrimitivesCoderImpl.register_iterable_like_type(_ConcatIterable) + + # TODO(BEAM-5428): Implement cross-bundle state caching. class SynchronousBagRuntimeState(userstate.RuntimeState): def __init__(self, state_handler, state_key, value_coder):