Repository: beam Updated Branches: refs/heads/master a628ce353 -> b67bd111e
Add cross-SDK implementations and tests of IntervalWindowCoder Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac7c4714 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac7c4714 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac7c4714 Branch: refs/heads/master Commit: ac7c471473510e4f9a9281447a99ceb9552acd17 Parents: a628ce3 Author: Dan Halperin <dhalp...@google.com> Authored: Fri Feb 10 11:56:00 2017 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Feb 13 15:17:21 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/fn/v1/standard_coders.yaml | 10 +++++++ .../transforms/windowing/IntervalWindow.java | 15 ++++++---- .../org/apache/beam/sdk/util/CoderUtils.java | 2 ++ .../apache/beam/sdk/coders/CommonCoderTest.java | 31 ++++++++++++++++++++ sdks/python/apache_beam/coders/coder_impl.py | 31 +++++++++++++++++++- sdks/python/apache_beam/coders/coders.py | 15 ++++++++++ .../apache_beam/coders/coders_test_common.py | 9 +++++- sdks/python/apache_beam/coders/slow_stream.py | 6 ++++ .../apache_beam/coders/standard_coders_test.py | 11 +++++-- .../apache_beam/tests/data/standard_coders.yaml | 10 +++++++ 10 files changed, 130 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml index afa92e9..948ac6b 100644 --- a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml +++ b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml @@ -96,3 +96,13 @@ nested: true examples: "\u0003abc\u0003def": {key: abc, value: def} "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"} + +--- + +coder: + urn: "urn:beam:coders:intervalwindow:0.1" +examples: + "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000} + "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000} + "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365} + "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0} http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index fb0fc11..c0ad2c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.util.CloudObject; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.ReadableDuration; @@ -166,10 +167,9 @@ public class IntervalWindow extends BoundedWindow /** * Encodes an {@link IntervalWindow} as a pair of its upper bound and duration. */ - private static class IntervalWindowCoder extends AtomicCoder<IntervalWindow> { + public static class IntervalWindowCoder extends AtomicCoder<IntervalWindow> { - private static final IntervalWindowCoder INSTANCE = - new IntervalWindowCoder(); + private static final IntervalWindowCoder INSTANCE = new IntervalWindowCoder(); private static final Coder<Instant> instantCoder = InstantCoder.of(); private static final Coder<ReadableDuration> durationCoder = DurationCoder.of(); @@ -180,9 +180,7 @@ public class IntervalWindow extends BoundedWindow } @Override - public void encode(IntervalWindow window, - OutputStream outStream, - Context context) + public void encode(IntervalWindow window, OutputStream outStream, Context context) throws IOException, CoderException { instantCoder.encode(window.end, outStream, context.nested()); durationCoder.encode(new Duration(window.start, window.end), outStream, context); @@ -195,5 +193,10 @@ public class IntervalWindow extends BoundedWindow ReadableDuration duration = durationCoder.decode(inStream, context); return new IntervalWindow(end.minus(duration), end); } + + @Override + protected CloudObject initializeCloudObject() { + return CloudObject.forClassName("kind:interval_window"); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java index 7b93b59..5d03574 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -61,6 +62,7 @@ public final class CoderUtils { .put("kind:pair", KvCoder.class) .put("kind:stream", IterableCoder.class) .put("kind:global_window", GlobalWindow.Coder.class) + .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class) .put("kind:length_prefix", LengthPrefixCoder.class) .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class) .build(); http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java index a776fd7..ad5d9c3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java @@ -26,6 +26,8 @@ import static org.junit.Assert.fail; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.auto.value.AutoValue; @@ -43,8 +45,12 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -63,6 +69,7 @@ public class CommonCoderTest { .put("urn:beam:coders:bytes:0.1", ByteCoder.class) .put("urn:beam:coders:kv:0.1", KvCoder.class) .put("urn:beam:coders:varint:0.1", VarLongCoder.class) + .put("urn:beam:coders:intervalwindow:0.1", IntervalWindowCoder.class) .build(); @AutoValue @@ -185,6 +192,12 @@ public class CommonCoderTest { } case "urn:beam:coders:varint:0.1": return ((Number) value).longValue(); + case "urn:beam:coders:intervalwindow:0.1": { + Map<String, Object> kvMap = (Map<String, Object>) value; + Instant end = new Instant(((Number) kvMap.get("end")).longValue()); + Duration span = Duration.millis(((Number) kvMap.get("span")).longValue()); + return new IntervalWindow(end.minus(span), span); + } default: throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn()); } @@ -202,6 +215,8 @@ public class CommonCoderTest { return KvCoder.of(components.get(0), components.get(1)); case "urn:beam:coders:varint:0.1": return VarLongCoder.of(); + case "urn:beam:coders:intervalwindow:0.1": + return IntervalWindowCoder.of(); default: throw new IllegalStateException("Unknown coder URN: " + coder.getUrn()); } @@ -216,4 +231,20 @@ public class CommonCoderTest { byte[] encoded = CoderUtils.encodeToByteArray(coder, testValue, context); assertThat(testSpec.toString(), encoded, equalTo(testSpec.getSerialized())); } + + /** + * Utility for adding new entries to the common coder spec -- prints the serialized bytes of + * the given value in the given context using JSON-escaped strings. + */ + private static <T> String jsonByteString(Coder<T> coder, T value, Context context) + throws CoderException { + byte[] bytes = CoderUtils.encodeToByteArray(coder, value, context); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true); + try { + return mapper.writeValueAsString(new String(bytes, StandardCharsets.ISO_8859_1)); + } catch (JsonProcessingException e) { + throw new CoderException(String.format("Unable to encode %s with coder %s", value, coder), e); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 563c444..840397a 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -348,8 +348,37 @@ class FloatCoderImpl(StreamCoderImpl): return 8 -class TimestampCoderImpl(StreamCoderImpl): +class IntervalWindowCoderImpl(StreamCoderImpl): + # TODO: Fn Harness only supports millis. Is this important enough to fix? + def _to_normal_time(self, value): + """Convert "lexicographically ordered unsigned" to signed.""" + return value - (1 << 63) + + def _from_normal_time(self, value): + """Convert signed to "lexicographically ordered unsigned".""" + return value + (1 << 63) + + def encode_to_stream(self, value, out, nested): + span_micros = value.end.micros - value.start.micros + out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000)) + out.write_var_int64(span_micros / 1000) + + def decode_from_stream(self, in_, nested): + end_millis = self._to_normal_time(in_.read_bigendian_uint64()) + start_millis = end_millis - in_.read_var_int64() + from apache_beam.transforms.window import IntervalWindow + ret = IntervalWindow(start=Timestamp(micros=start_millis * 1000), + end=Timestamp(micros=end_millis * 1000)) + return ret + def estimate_size(self, value, nested=False): + # An IntervalWindow is context-insensitive, with a timestamp (8 bytes) + # and a varint timespam. + span = value.end.micros - value.start.micros + return 8 + get_varint_size(span / 1000) + + +class TimestampCoderImpl(StreamCoderImpl): def encode_to_stream(self, value, out, nested): out.write_bigendian_int64(value.micros) http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 25af934..1d29f32 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -629,6 +629,21 @@ class GlobalWindowCoder(SingletonCoder): } +class IntervalWindowCoder(FastCoder): + """Coder for an window defined by a start timestamp and a duration.""" + + def _create_impl(self): + return coder_impl.IntervalWindowCoderImpl() + + def is_deterministic(self): + return True + + def as_cloud_object(self): + return { + '@type': 'kind:interval_window', + } + + class WindowedValueCoder(FastCoder): """Coder for windowed values.""" http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index d75d253..7284287 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -62,7 +62,8 @@ class CodersTest(unittest.TestCase): coders.FastCoder, coders.ProtoCoder, coders.ToStringCoder, - coders.WindowCoder]) + coders.WindowCoder, + coders.IntervalWindowCoder]) assert not standard - cls.seen, standard - cls.seen assert not standard - cls.seen_nested, standard - cls.seen_nested @@ -166,6 +167,12 @@ class CodersTest(unittest.TestCase): self.check_coder(coders.TupleCoder((coders.SingletonCoder(a), coders.SingletonCoder(b))), (a, b)) + def test_interval_window_coder(self): + self.check_coder(coders.IntervalWindowCoder(), + *[window.IntervalWindow(x, y) + for x in [-2**52, 0, 2**52] + for y in range(-100, 100)]) + def test_timestamp_coder(self): self.check_coder(coders.TimestampCoder(), *[timestamp.Timestamp(micros=x) for x in range(-100, 100)]) http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/slow_stream.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index ceb1f0d..5462fae 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -52,6 +52,9 @@ class OutputStream(object): def write_bigendian_int64(self, v): self.write(struct.pack('>q', v)) + def write_bigendian_uint64(self, v): + self.write(struct.pack('>Q', v)) + def write_bigendian_int32(self, v): self.write(struct.pack('>i', v)) @@ -132,6 +135,9 @@ class InputStream(object): def read_bigendian_int64(self): return struct.unpack('>q', self.read(8))[0] + def read_bigendian_uint64(self): + return struct.unpack('>Q', self.read(8))[0] + def read_bigendian_int32(self): return struct.unpack('>i', self.read(4))[0] http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/standard_coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index c6fc58a..e66ec7b 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -27,6 +27,8 @@ import yaml from apache_beam import coders from apache_beam.coders import coder_impl +from apache_beam.utils.timestamp import Timestamp +from apache_beam.transforms.window import IntervalWindow class StandardCodersTest(unittest.TestCase): @@ -34,7 +36,8 @@ class StandardCodersTest(unittest.TestCase): _urn_to_coder_class = { 'urn:beam:coders:bytes:0.1': coders.BytesCoder, 'urn:beam:coders:varint:0.1': coders.VarIntCoder, - 'urn:beam:coders:kv:0.1': lambda k, v: coders.TupleCoder((k, v)) + 'urn:beam:coders:kv:0.1': lambda k, v: coders.TupleCoder((k, v)), + 'urn:beam:coders:intervalwindow:0.1': coders.IntervalWindowCoder, } _urn_to_json_value_parser = { @@ -42,7 +45,11 @@ class StandardCodersTest(unittest.TestCase): 'urn:beam:coders:varint:0.1': lambda x: x, 'urn:beam:coders:kv:0.1': lambda x, key_parser, value_parser: (key_parser(x['key']), - value_parser(x['value'])) + value_parser(x['value'])), + 'urn:beam:coders:intervalwindow:0.1': + lambda x: IntervalWindow( + start=Timestamp(micros=(x['end'] - x['span']) * 1000), + end=Timestamp(micros=x['end'] * 1000)), } # We must prepend an underscore to this name so that the open-source unittest http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/tests/data/standard_coders.yaml ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml b/sdks/python/apache_beam/tests/data/standard_coders.yaml index afa92e9..948ac6b 100644 --- a/sdks/python/apache_beam/tests/data/standard_coders.yaml +++ b/sdks/python/apache_beam/tests/data/standard_coders.yaml @@ -96,3 +96,13 @@ nested: true examples: "\u0003abc\u0003def": {key: abc, value: def} "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"} + +--- + +coder: + urn: "urn:beam:coders:intervalwindow:0.1" +examples: + "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000} + "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000} + "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: -9223372036854410, span: 365} + "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 9223372036854775, span: 0}