Repository: beam
Updated Branches:
  refs/heads/master a628ce353 -> b67bd111e


Add cross-SDK implementations and tests of IntervalWindowCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac7c4714
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac7c4714
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac7c4714

Branch: refs/heads/master
Commit: ac7c471473510e4f9a9281447a99ceb9552acd17
Parents: a628ce3
Author: Dan Halperin <dhalp...@google.com>
Authored: Fri Feb 10 11:56:00 2017 -0800
Committer: Dan Halperin <dhalp...@google.com>
Committed: Mon Feb 13 15:17:21 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/fn/v1/standard_coders.yaml  | 10 +++++++
 .../transforms/windowing/IntervalWindow.java    | 15 ++++++----
 .../org/apache/beam/sdk/util/CoderUtils.java    |  2 ++
 .../apache/beam/sdk/coders/CommonCoderTest.java | 31 ++++++++++++++++++++
 sdks/python/apache_beam/coders/coder_impl.py    | 31 +++++++++++++++++++-
 sdks/python/apache_beam/coders/coders.py        | 15 ++++++++++
 .../apache_beam/coders/coders_test_common.py    |  9 +++++-
 sdks/python/apache_beam/coders/slow_stream.py   |  6 ++++
 .../apache_beam/coders/standard_coders_test.py  | 11 +++++--
 .../apache_beam/tests/data/standard_coders.yaml | 10 +++++++
 10 files changed, 130 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
----------------------------------------------------------------------
diff --git 
a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
 
b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
index afa92e9..948ac6b 100644
--- 
a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
+++ 
b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
@@ -96,3 +96,13 @@ nested: true
 examples:
   "\u0003abc\u0003def": {key: abc, value: def}
   "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+  urn: "urn:beam:coders:intervalwindow:0.1"
+examples:
+  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : 
{end: 1454293425000, span: 3600000}
+  
"\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009"
 : {end: 1456881825000, span: 2592000000}
+  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: 
-9223372036854410, span: 365}
+  "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 
9223372036854775, span: 0}

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index fb0fc11..c0ad2c0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.DurationCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.util.CloudObject;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDuration;
@@ -166,10 +167,9 @@ public class IntervalWindow extends BoundedWindow
   /**
    * Encodes an {@link IntervalWindow} as a pair of its upper bound and 
duration.
    */
-  private static class IntervalWindowCoder extends AtomicCoder<IntervalWindow> 
{
+  public static class IntervalWindowCoder extends AtomicCoder<IntervalWindow> {
 
-    private static final IntervalWindowCoder INSTANCE =
-        new IntervalWindowCoder();
+    private static final IntervalWindowCoder INSTANCE = new 
IntervalWindowCoder();
 
     private static final Coder<Instant> instantCoder = InstantCoder.of();
     private static final Coder<ReadableDuration> durationCoder = 
DurationCoder.of();
@@ -180,9 +180,7 @@ public class IntervalWindow extends BoundedWindow
     }
 
     @Override
-    public void encode(IntervalWindow window,
-                       OutputStream outStream,
-                       Context context)
+    public void encode(IntervalWindow window, OutputStream outStream, Context 
context)
         throws IOException, CoderException {
       instantCoder.encode(window.end, outStream, context.nested());
       durationCoder.encode(new Duration(window.start, window.end), outStream, 
context);
@@ -195,5 +193,10 @@ public class IntervalWindow extends BoundedWindow
       ReadableDuration duration = durationCoder.decode(inStream, context);
       return new IntervalWindow(end.minus(duration), end);
     }
+
+    @Override
+    protected CloudObject initializeCloudObject() {
+      return CloudObject.forClassName("kind:interval_window");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 7b93b59..5d03574 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -61,6 +62,7 @@ public final class CoderUtils {
       .put("kind:pair", KvCoder.class)
       .put("kind:stream", IterableCoder.class)
       .put("kind:global_window", GlobalWindow.Coder.class)
+      .put("kind:interval_window", IntervalWindow.IntervalWindowCoder.class)
       .put("kind:length_prefix", LengthPrefixCoder.class)
       .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
       .build();

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
index a776fd7..ad5d9c3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
@@ -26,6 +26,8 @@ import static org.junit.Assert.fail;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import com.google.auto.value.AutoValue;
@@ -43,8 +45,12 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -63,6 +69,7 @@ public class CommonCoderTest {
       .put("urn:beam:coders:bytes:0.1", ByteCoder.class)
       .put("urn:beam:coders:kv:0.1", KvCoder.class)
       .put("urn:beam:coders:varint:0.1", VarLongCoder.class)
+      .put("urn:beam:coders:intervalwindow:0.1", IntervalWindowCoder.class)
       .build();
 
   @AutoValue
@@ -185,6 +192,12 @@ public class CommonCoderTest {
       }
       case "urn:beam:coders:varint:0.1":
         return ((Number) value).longValue();
+      case "urn:beam:coders:intervalwindow:0.1": {
+        Map<String, Object> kvMap = (Map<String, Object>) value;
+        Instant end = new Instant(((Number) kvMap.get("end")).longValue());
+        Duration span = Duration.millis(((Number) 
kvMap.get("span")).longValue());
+        return new IntervalWindow(end.minus(span), span);
+      }
       default:
         throw new IllegalStateException("Unknown coder URN: " + 
coderSpec.getUrn());
     }
@@ -202,6 +215,8 @@ public class CommonCoderTest {
         return KvCoder.of(components.get(0), components.get(1));
       case "urn:beam:coders:varint:0.1":
         return VarLongCoder.of();
+      case "urn:beam:coders:intervalwindow:0.1":
+        return IntervalWindowCoder.of();
       default:
         throw new IllegalStateException("Unknown coder URN: " + 
coder.getUrn());
     }
@@ -216,4 +231,20 @@ public class CommonCoderTest {
     byte[] encoded = CoderUtils.encodeToByteArray(coder, testValue, context);
     assertThat(testSpec.toString(), encoded, 
equalTo(testSpec.getSerialized()));
   }
+
+  /**
+   * Utility for adding new entries to the common coder spec -- prints the 
serialized bytes of
+   * the given value in the given context using JSON-escaped strings.
+   */
+  private static <T> String jsonByteString(Coder<T> coder, T value, Context 
context)
+      throws CoderException {
+    byte[] bytes = CoderUtils.encodeToByteArray(coder, value, context);
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true);
+    try {
+      return mapper.writeValueAsString(new String(bytes, 
StandardCharsets.ISO_8859_1));
+    } catch (JsonProcessingException e) {
+      throw new CoderException(String.format("Unable to encode %s with coder 
%s", value, coder), e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index 563c444..840397a 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -348,8 +348,37 @@ class FloatCoderImpl(StreamCoderImpl):
     return 8
 
 
-class TimestampCoderImpl(StreamCoderImpl):
+class IntervalWindowCoderImpl(StreamCoderImpl):
+  # TODO: Fn Harness only supports millis. Is this important enough to fix?
+  def _to_normal_time(self, value):
+    """Convert "lexicographically ordered unsigned" to signed."""
+    return value - (1 << 63)
+
+  def _from_normal_time(self, value):
+    """Convert signed to "lexicographically ordered unsigned"."""
+    return value + (1 << 63)
+
+  def encode_to_stream(self, value, out, nested):
+    span_micros = value.end.micros - value.start.micros
+    out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000))
+    out.write_var_int64(span_micros / 1000)
+
+  def decode_from_stream(self, in_, nested):
+    end_millis = self._to_normal_time(in_.read_bigendian_uint64())
+    start_millis = end_millis - in_.read_var_int64()
+    from apache_beam.transforms.window import IntervalWindow
+    ret = IntervalWindow(start=Timestamp(micros=start_millis * 1000),
+                         end=Timestamp(micros=end_millis * 1000))
+    return ret
 
+  def estimate_size(self, value, nested=False):
+    # An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
+    # and a varint timespam.
+    span = value.end.micros - value.start.micros
+    return 8 + get_varint_size(span / 1000)
+
+
+class TimestampCoderImpl(StreamCoderImpl):
   def encode_to_stream(self, value, out, nested):
     out.write_bigendian_int64(value.micros)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index 25af934..1d29f32 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -629,6 +629,21 @@ class GlobalWindowCoder(SingletonCoder):
     }
 
 
+class IntervalWindowCoder(FastCoder):
+  """Coder for an window defined by a start timestamp and a duration."""
+
+  def _create_impl(self):
+    return coder_impl.IntervalWindowCoderImpl()
+
+  def is_deterministic(self):
+    return True
+
+  def as_cloud_object(self):
+    return {
+        '@type': 'kind:interval_window',
+    }
+
+
 class WindowedValueCoder(FastCoder):
   """Coder for windowed values."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index d75d253..7284287 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -62,7 +62,8 @@ class CodersTest(unittest.TestCase):
                      coders.FastCoder,
                      coders.ProtoCoder,
                      coders.ToStringCoder,
-                     coders.WindowCoder])
+                     coders.WindowCoder,
+                     coders.IntervalWindowCoder])
     assert not standard - cls.seen, standard - cls.seen
     assert not standard - cls.seen_nested, standard - cls.seen_nested
 
@@ -166,6 +167,12 @@ class CodersTest(unittest.TestCase):
     self.check_coder(coders.TupleCoder((coders.SingletonCoder(a),
                                         coders.SingletonCoder(b))), (a, b))
 
+  def test_interval_window_coder(self):
+    self.check_coder(coders.IntervalWindowCoder(),
+                     *[window.IntervalWindow(x, y)
+                       for x in [-2**52, 0, 2**52]
+                       for y in range(-100, 100)])
+
   def test_timestamp_coder(self):
     self.check_coder(coders.TimestampCoder(),
                      *[timestamp.Timestamp(micros=x) for x in range(-100, 
100)])

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py 
b/sdks/python/apache_beam/coders/slow_stream.py
index ceb1f0d..5462fae 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -52,6 +52,9 @@ class OutputStream(object):
   def write_bigendian_int64(self, v):
     self.write(struct.pack('>q', v))
 
+  def write_bigendian_uint64(self, v):
+    self.write(struct.pack('>Q', v))
+
   def write_bigendian_int32(self, v):
     self.write(struct.pack('>i', v))
 
@@ -132,6 +135,9 @@ class InputStream(object):
   def read_bigendian_int64(self):
     return struct.unpack('>q', self.read(8))[0]
 
+  def read_bigendian_uint64(self):
+    return struct.unpack('>Q', self.read(8))[0]
+
   def read_bigendian_int32(self):
     return struct.unpack('>i', self.read(4))[0]
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py 
b/sdks/python/apache_beam/coders/standard_coders_test.py
index c6fc58a..e66ec7b 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -27,6 +27,8 @@ import yaml
 
 from apache_beam import coders
 from apache_beam.coders import coder_impl
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.transforms.window import IntervalWindow
 
 
 class StandardCodersTest(unittest.TestCase):
@@ -34,7 +36,8 @@ class StandardCodersTest(unittest.TestCase):
   _urn_to_coder_class = {
       'urn:beam:coders:bytes:0.1': coders.BytesCoder,
       'urn:beam:coders:varint:0.1': coders.VarIntCoder,
-      'urn:beam:coders:kv:0.1': lambda k, v: coders.TupleCoder((k, v))
+      'urn:beam:coders:kv:0.1': lambda k, v: coders.TupleCoder((k, v)),
+      'urn:beam:coders:intervalwindow:0.1': coders.IntervalWindowCoder,
   }
 
   _urn_to_json_value_parser = {
@@ -42,7 +45,11 @@ class StandardCodersTest(unittest.TestCase):
       'urn:beam:coders:varint:0.1': lambda x: x,
       'urn:beam:coders:kv:0.1':
           lambda x, key_parser, value_parser: (key_parser(x['key']),
-                                               value_parser(x['value']))
+                                               value_parser(x['value'])),
+      'urn:beam:coders:intervalwindow:0.1':
+          lambda x: IntervalWindow(
+              start=Timestamp(micros=(x['end'] - x['span']) * 1000),
+              end=Timestamp(micros=x['end'] * 1000)),
   }
 
   # We must prepend an underscore to this name so that the open-source unittest

http://git-wip-us.apache.org/repos/asf/beam/blob/ac7c4714/sdks/python/apache_beam/tests/data/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml 
b/sdks/python/apache_beam/tests/data/standard_coders.yaml
index afa92e9..948ac6b 100644
--- a/sdks/python/apache_beam/tests/data/standard_coders.yaml
+++ b/sdks/python/apache_beam/tests/data/standard_coders.yaml
@@ -96,3 +96,13 @@ nested: true
 examples:
   "\u0003abc\u0003def": {key: abc, value: def}
   "\u0004ab\0c\u0004de\0f": {key: "ab\0c", value: "de\0f"}
+
+---
+
+coder:
+  urn: "urn:beam:coders:intervalwindow:0.1"
+examples:
+  "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : 
{end: 1454293425000, span: 3600000}
+  
"\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009"
 : {end: 1456881825000, span: 2592000000}
+  "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {end: 
-9223372036854410, span: 365}
+  "\u0080\u0020\u00c4\u009b\u00a5\u00e3\u0053\u00f7\u0000" : {end: 
9223372036854775, span: 0}

Reply via email to