Repository: beam Updated Branches: refs/heads/master 16736a6b6 -> 13db84bb0
Support unkown length iterables for IterableCoder in python SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/06535afa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/06535afa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/06535afa Branch: refs/heads/master Commit: 06535afa29fd922a0e52724fbb8f4dc7671b46ee Parents: 16736a6 Author: Vikas Kedigehalli <[email protected]> Authored: Wed Feb 22 19:36:33 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Tue Feb 28 00:08:42 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/fn/v1/standard_coders.yaml | 15 ++++ .../beam/sdk/coders/IterableLikeCoder.java | 6 +- .../apache/beam/sdk/coders/CommonCoderTest.java | 56 ++++++++++++++- sdks/python/apache_beam/coders/coder_impl.py | 72 ++++++++++++++++++-- .../apache_beam/coders/coders_test_common.py | 20 ++++++ sdks/python/apache_beam/coders/slow_stream.py | 3 + .../apache_beam/coders/standard_coders_test.py | 13 ++-- sdks/python/apache_beam/coders/stream.pxd | 4 +- sdks/python/apache_beam/coders/stream.pyx | 21 +++--- .../apache_beam/tests/data/standard_coders.yaml | 16 +++++ 10 files changed, 198 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml index 172b6d8..f37b2d3 100644 --- a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml +++ b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml @@ -33,6 +33,8 @@ # # It is expected that future work will move the `coder` field into a format that it would be # represented by the Runner API, so that it can be understood by all SDKs and harnesses. +# +# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated. coder: @@ -131,6 +133,19 @@ examples: coder: urn: "urn:beam:coders:stream:0.1" + components: [{urn: "urn:beam:coders:bytes:0.1"}] + # This is for iterables of unknown length, where the encoding is not + # deterministic. + non_deterministic: True +examples: + "\u00ff\u00ff\u00ff\u00ff\u0000": [] + "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"] + "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"] + +--- + +coder: + urn: "urn:beam:coders:stream:0.1" components: [{urn: "urn:beam:coders:global_window:0.1"}] examples: "\0\0\0\u0001": [""] http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java index da64a93..61402ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java @@ -211,9 +211,9 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> elementCoder.registerByteSizeObserver(elem, observer, nestedContext); } } else { - // TODO: Update to use an accurate count depending on size and count, currently we - // are under estimating the size by up to 10 bytes per block of data since we are - // not encoding the count prefix which occurs at most once per 64k of data and is upto + // TODO: (BEAM-1537) Update to use an accurate count depending on size and count, + // currently we are under estimating the size by up to 10 bytes per block of data since we + // are not encoding the count prefix which occurs at most once per 64k of data and is upto // 10 bytes long. Since we include the total count we can upper bound the underestimate // to be 10 / 65536 ~= 0.0153% of the actual size. observer.update(4L); http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java index 660d608..1db7a2b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java @@ -21,6 +21,9 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -40,6 +43,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -83,13 +87,16 @@ public class CommonCoderTest { abstract static class CommonCoder { abstract String getUrn(); abstract List<CommonCoder> getComponents(); + abstract Boolean getNonDeterministic(); @JsonCreator static CommonCoder create( @JsonProperty("urn") String urn, - @JsonProperty("components") @Nullable List<CommonCoder> components) { + @JsonProperty("components") @Nullable List<CommonCoder> components, + @JsonProperty("non_deterministic") @Nullable Boolean nonDeterministic) { return new AutoValue_CommonCoderTest_CommonCoder( checkNotNull(urn, "urn"), - firstNonNull(components, Collections.<CommonCoder>emptyList())); + firstNonNull(components, Collections.<CommonCoder>emptyList()), + firstNonNull(nonDeterministic, Boolean.FALSE)); } } @@ -280,7 +287,50 @@ public class CommonCoderTest { Object testValue = convertValue(testSpec.getValue(), testSpec.getCoder(), coder); Context context = testSpec.getNested() ? Context.NESTED : Context.OUTER; byte[] encoded = CoderUtils.encodeToByteArray(coder, testValue, context); - assertThat(testSpec.toString(), encoded, equalTo(testSpec.getSerialized())); + Object decodedValue = CoderUtils.decodeFromByteArray(coder, testSpec.getSerialized(), context); + + if (!testSpec.getCoder().getNonDeterministic()) { + assertThat(testSpec.toString(), encoded, equalTo(testSpec.getSerialized())); + } + verifyDecodedValue(testSpec.getCoder(), decodedValue, testValue); + } + + private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object actualValue) { + switch (coder.getUrn()) { + case "urn:beam:coders:bytes:0.1": + assertThat(expectedValue, equalTo(actualValue)); + break; + case "urn:beam:coders:kv:0.1": + assertThat(actualValue, instanceOf(KV.class)); + verifyDecodedValue(coder.getComponents().get(0), + ((KV) expectedValue).getKey(), ((KV) actualValue).getKey()); + verifyDecodedValue(coder.getComponents().get(0), + ((KV) expectedValue).getValue(), ((KV) actualValue).getValue()); + break; + case "urn:beam:coders:varint:0.1": + assertEquals(expectedValue, actualValue); + break; + case "urn:beam:coders:interval_window:0.1": + assertEquals(expectedValue, actualValue); + break; + case "urn:beam:coders:stream:0.1": + assertThat(actualValue, instanceOf(Iterable.class)); + CommonCoder componentCoder = coder.getComponents().get(0); + Iterator<Object> expectedValueIterator = ((Iterable<Object>) expectedValue).iterator(); + for (Object value: (Iterable<Object>) actualValue) { + verifyDecodedValue(componentCoder, expectedValueIterator.next(), value); + } + assertFalse(expectedValueIterator.hasNext()); + break; + case "urn:beam:coders:global_window:0.1": + assertEquals(expectedValue, actualValue); + break; + case "urn:beam:coders:windowed_value:0.1": + assertEquals(expectedValue, actualValue); + break; + default: + throw new IllegalStateException("Unknown coder URN: " + coder.getUrn()); + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 2dbfae7..137d1be 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -509,7 +509,29 @@ class TupleCoderImpl(AbstractComponentCoderImpl): class SequenceCoderImpl(StreamCoderImpl): - """A coder for sequences of known length.""" + """A coder for sequences. + + If the length of the sequence in known we encode the length as a 32 bit + ``int`` followed by the encoded bytes. + + If the length of the sequence is unknown, we encode the length as ``-1`` + followed by the encoding of elements buffered up to 64K bytes before prefixing + the count of number of elements. A ``0`` is encoded at the end to indicate the + end of stream. + + The resulting encoding would look like this:: + + -1 + countA element(0) element(1) ... element(countA - 1) + countB element(0) element(1) ... element(countB - 1) + ... + countX element(0) element(1) ... element(countX - 1) + 0 + + """ + + # Default buffer size of 64kB of handling iterables of unknown length. + _DEFAULT_BUFFER_SIZE = 64 * 1024 def __init__(self, elem_coder): self._elem_coder = elem_coder @@ -519,15 +541,46 @@ class SequenceCoderImpl(StreamCoderImpl): def encode_to_stream(self, value, out, nested): # Compatible with Java's IterableLikeCoder. - out.write_bigendian_int32(len(value)) - for elem in value: - self._elem_coder.encode_to_stream(elem, out, True) + if hasattr(value, '__len__'): + out.write_bigendian_int32(len(value)) + for elem in value: + self._elem_coder.encode_to_stream(elem, out, True) + else: + # We don't know the size without traversing it so use a fixed size buffer + # and encode as many elements as possible into it before outputting + # the size followed by the elements. + + # -1 to indicate that the length is not known. + out.write_bigendian_int32(-1) + buffer = create_OutputStream() + prev_index = index = -1 + for index, elem in enumerate(value): + self._elem_coder.encode_to_stream(elem, buffer, True) + if out.size() > self._DEFAULT_BUFFER_SIZE: + out.write_var_int64(index - prev_index) + out.write(buffer.get()) + prev_index = index + buffer = create_OutputStream() + if index > prev_index: + out.write_var_int64(index - prev_index) + out.write(buffer.get()) + out.write_var_int64(0) def decode_from_stream(self, in_stream, nested): size = in_stream.read_bigendian_int32() - return self._construct_from_sequence( - [self._elem_coder.decode_from_stream(in_stream, True) - for _ in range(size)]) + + if size >= 0: + elements = [self._elem_coder.decode_from_stream(in_stream, True) + for _ in range(size)] + else: + elements = [] + count = in_stream.read_var_int64() + while count > 0: + for _ in range(count): + elements.append(self._elem_coder.decode_from_stream(in_stream, True)) + count = in_stream.read_var_int64() + + return self._construct_from_sequence(elements) def estimate_size(self, value, nested=False): """Estimates the encoded size of the given value, in bytes.""" @@ -551,6 +604,11 @@ class SequenceCoderImpl(StreamCoderImpl): elem, nested=True)) estimated_size += child_size observables += child_observables + # TODO: (BEAM-1537) Update to use an accurate count depending on size and + # count, currently we are underestimating the size by up to 10 bytes + # per block of data since we are not including the count prefix which + # occurs at most once per 64k of data and is upto 10 bytes long. The upper + # bound of the underestimate is 10 / 65536 ~= 0.0153% of the actual size. return estimated_size, observables http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 338f89e..6491ea8 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -249,6 +249,26 @@ class CodersTest(unittest.TestCase): coders.IterableCoder(coders.VarIntCoder()))), (1, [1, 2, 3])) + def test_iterable_coder_unknown_length(self): + # Empty + self._test_iterable_coder_of_unknown_length(0) + # Single element + self._test_iterable_coder_of_unknown_length(1) + # Multiple elements + self._test_iterable_coder_of_unknown_length(100) + # Multiple elements with underlying stream buffer overflow. + self._test_iterable_coder_of_unknown_length(80000) + + def _test_iterable_coder_of_unknown_length(self, count): + def iter_generator(count): + for i in range(count): + yield i + + iterable_coder = coders.IterableCoder(coders.VarIntCoder()) + self.assertItemsEqual(list(iter_generator(count)), + iterable_coder.decode( + iterable_coder.encode(iter_generator(count)))) + def test_windowed_value_coder(self): coder = coders.WindowedValueCoder(coders.VarIntCoder(), coders.GlobalWindowCoder()) http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/slow_stream.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 5462fae..a87495c 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -64,6 +64,9 @@ class OutputStream(object): def get(self): return ''.join(self.data) + def size(self): + return len(self.data) + class ByteCountingOutputStream(OutputStream): """A pure Python implementation of stream.ByteCountingOutputStream.""" http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/standard_coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index e26d2c8..4a48ed9 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -94,13 +94,18 @@ class StandardCodersTest(unittest.TestCase): for expected_encoded, json_value in spec['examples'].items(): value = parse_value(json_value) expected_encoded = expected_encoded.encode('latin1') - actual_encoded = encode_nested(coder, value, nested) - if self.fix and actual_encoded != expected_encoded: - self.to_fix[spec['index'], expected_encoded] = actual_encoded + if not spec['coder'].get('non_deterministic', False): + actual_encoded = encode_nested(coder, value, nested) + if self.fix and actual_encoded != expected_encoded: + self.to_fix[spec['index'], expected_encoded] = actual_encoded + else: + self.assertEqual(expected_encoded, actual_encoded) + self.assertEqual(decode_nested(coder, expected_encoded, nested), + value) else: + # Only verify decoding for a non-deterministic coder self.assertEqual(decode_nested(coder, expected_encoded, nested), value) - self.assertEqual(expected_encoded, actual_encoded) def parse_coder(self, spec): return self._urn_to_coder_class[spec['urn']]( http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/stream.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index 22ad8c1..a1eb9f7 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -20,7 +20,7 @@ cimport libc.stdint cdef class OutputStream(object): cdef char* data - cdef size_t size + cdef size_t _size cdef size_t pos cpdef write(self, bytes b, bint nested=*) @@ -32,7 +32,7 @@ cdef class OutputStream(object): cpdef write_bigendian_double(self, double d) cpdef bytes get(self) - + cpdef size_t size(self) except? -1 cdef extend(self, size_t missing) http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/stream.pyx ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx index 845f289..ae24418 100644 --- a/sdks/python/apache_beam/coders/stream.pyx +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -25,9 +25,9 @@ cdef class OutputStream(object): #TODO(robertwb): Consider using raw C++ streams. def __cinit__(self): - self.size = 1024 + self.buffer_size = 1024 self.pos = 0 - self.data = <char*>libc.stdlib.malloc(self.size) + self.data = <char*>libc.stdlib.malloc(self.buffer_size) assert self.data, "OutputStream malloc failed." def __dealloc__(self): @@ -38,13 +38,13 @@ cdef class OutputStream(object): cdef size_t blen = len(b) if nested: self.write_var_int64(blen) - if self.size < self.pos + blen: + if self.buffer_size < self.pos + blen: self.extend(blen) libc.string.memcpy(self.data + self.pos, <char*>b, blen) self.pos += blen cpdef write_byte(self, unsigned char val): - if self.size < self.pos + 1: + if self.buffer_size < self.pos + 1: self.extend(1) self.data[self.pos] = val self.pos += 1 @@ -66,7 +66,7 @@ cdef class OutputStream(object): self.write_bigendian_uint64(signed_v) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t v): - if self.size < self.pos + 8: + if self.buffer_size < self.pos + 8: self.extend(8) self.data[self.pos ] = <unsigned char>(v >> 56) self.data[self.pos + 1] = <unsigned char>(v >> 48) @@ -80,7 +80,7 @@ cdef class OutputStream(object): cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v): cdef libc.stdint.uint32_t v = signed_v - if self.size < self.pos + 4: + if self.buffer_size < self.pos + 4: self.extend(4) self.data[self.pos ] = <unsigned char>(v >> 24) self.data[self.pos + 1] = <unsigned char>(v >> 16) @@ -94,10 +94,13 @@ cdef class OutputStream(object): cpdef bytes get(self): return self.data[:self.pos] + cpdef size_t size(self) except? -1: + return self.pos + cdef extend(self, size_t missing): - while missing > self.size - self.pos: - self.size *= 2 - self.data = <char*>libc.stdlib.realloc(self.data, self.size) + while missing > self.buffer_size - self.pos: + self.buffer_size *= 2 + self.data = <char*>libc.stdlib.realloc(self.data, self.buffer_size) assert self.data, "OutputStream realloc failed." http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/tests/data/standard_coders.yaml ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml b/sdks/python/apache_beam/tests/data/standard_coders.yaml index 172b6d8..790cacb 100644 --- a/sdks/python/apache_beam/tests/data/standard_coders.yaml +++ b/sdks/python/apache_beam/tests/data/standard_coders.yaml @@ -33,6 +33,9 @@ # # It is expected that future work will move the `coder` field into a format that it would be # represented by the Runner API, so that it can be understood by all SDKs and harnesses. +# +# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated. + coder: @@ -131,6 +134,19 @@ examples: coder: urn: "urn:beam:coders:stream:0.1" + components: [{urn: "urn:beam:coders:bytes:0.1"}] + # This is for iterables of unknown length, where the encoding is not + # deterministic. + non_deterministic: True +examples: + "\u00ff\u00ff\u00ff\u00ff\u0000": [] + "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"] + "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"] + +--- + +coder: + urn: "urn:beam:coders:stream:0.1" components: [{urn: "urn:beam:coders:global_window:0.1"}] examples: "\0\0\0\u0001": [""]
