Repository: samza
Updated Branches:
  refs/heads/master 20b427cc8 -> 81b173246


SAMZA-1249: Fix equality for WindowKey for Non-keyed tumbling windows

- Fix a `ClassCastException` and an NPE when using Tumbling window without keys
- Fix equality and hashCode for `WindowKey`
- Refactor the `TestWindowOperator` unit tests using simpler types and a mock 
`MessageCollector`.

More details in `SAMZA-1249`

Author: vjagadish1989 <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>, Jacob Maes 
<[email protected]>

Closes #149 from vjagadish1989/samza-1249


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/81b17324
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/81b17324
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/81b17324

Branch: refs/heads/master
Commit: 81b17324689696f400f700277ef7b8faa2db484d
Parents: 20b427c
Author: vjagadish1989 <[email protected]>
Authored: Mon May 1 13:57:25 2017 -0700
Committer: Jacob Maes <[email protected]>
Committed: Mon May 1 13:57:25 2017 -0700

----------------------------------------------------------------------
 .../samza/operators/windows/WindowKey.java      |  12 +-
 .../apache/samza/operators/windows/Windows.java |   2 +-
 .../samza/operators/TestWindowOperator.java     | 204 ++++++++++++-------
 3 files changed, 132 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/81b17324/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
index bf52724..a1e7774 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -65,21 +65,15 @@ public class WindowKey<K> {
 
     WindowKey<?> windowKey = (WindowKey<?>) o;
 
-    if (!key.equals(windowKey.key)) return false;
-
-    if (paneId == null) {
-      return windowKey.paneId == null;
-    }
-
-    return paneId.equals(windowKey.paneId);
+    if (key != null ? !key.equals(windowKey.key) : windowKey.key != null) 
return false;
+    return !(paneId != null ? !paneId.equals(windowKey.paneId) : 
windowKey.paneId != null);
 
   }
 
   @Override
   public int hashCode() {
-    int result = key.hashCode();
+    int result = key != null ? key.hashCode() : 0;
     result = 31 * result + (paneId != null ? paneId.hashCode() : 0);
     return result;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/81b17324/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index 721b4c0..a0269cd 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -178,7 +178,7 @@ public final class Windows {
    */
   public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, 
Supplier<? extends WV> initialValue,
                                                            FoldLeftFunction<? 
super M, WV> foldFn) {
-    Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration));
+    Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
     return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, 
(FoldLeftFunction<M, WV>) foldFn,
         null, null, WindowType.TUMBLING);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/81b17324/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index 597244e..ca8a151 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -35,7 +35,9 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
@@ -55,9 +57,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestWindowOperator {
-  private final MessageCollector messageCollector = 
mock(MessageCollector.class);
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
-  private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, 
Integer>>>> windowPanes = new ArrayList<>();
   private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 
2, 3);
   private Config config;
   private TaskContext taskContext;
@@ -65,15 +65,13 @@ public class TestWindowOperator {
 
   @Before
   public void setup() throws Exception {
-    windowPanes.clear();
-
     config = mock(Config.class);
     taskContext = mock(TaskContext.class);
     runner = mock(ApplicationRunner.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
-    when(runner.getStreamSpec("integer-stream")).thenReturn(new 
StreamSpec("integer-stream", "integers", "kafka"));
+    when(runner.getStreamSpec("integers")).thenReturn(new 
StreamSpec("integers", "integers", "kafka"));
   }
 
   @Test
@@ -81,11 +79,13 @@ public class TestWindowOperator {
 
     StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
+
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
-
-    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), 
messageCollector, taskCoordinator));
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), 
messageCollector, taskCoordinator));
     testClock.advanceTime(Duration.ofSeconds(1));
 
     task.window(messageCollector, taskCoordinator);
@@ -107,14 +107,42 @@ public class TestWindowOperator {
   }
 
   @Test
+  public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new 
TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
+
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), 
messageCollector, taskCoordinator));
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
+  }
+
+
+  @Test
   public void testTumblingWindowsAccumulatingMode() throws Exception {
     StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), 
messageCollector, taskCoordinator));
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), 
messageCollector, taskCoordinator));
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
 
@@ -136,11 +164,12 @@ public class TestWindowOperator {
   public void testSessionWindowsDiscardingMode() throws Exception {
     StreamApplication sgb = new 
KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, 
Duration.ofMillis(500));
     TestClock testClock = new TestClock();
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
-
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
 
@@ -148,10 +177,10 @@ public class TestWindowOperator {
     Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
     Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
 
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
@@ -164,8 +193,8 @@ public class TestWindowOperator {
     Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
     Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
@@ -182,17 +211,20 @@ public class TestWindowOperator {
         Duration.ofMillis(500));
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
+
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofSeconds(1));
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
@@ -212,16 +244,18 @@ public class TestWindowOperator {
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, 
taskCoordinator);
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     Assert.assertEquals(windowPanes.size(), 1);
     Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
     Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
     Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
 
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
     Assert.assertEquals(windowPanes.size(), 1);
 
@@ -233,7 +267,7 @@ public class TestWindowOperator {
     Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
     Assert.assertEquals(windowPanes.get(1).getFiringType(), 
FiringType.DEFAULT);
 
-    task.process(new IntegerMessageEnvelope(3, 6), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
 
@@ -253,8 +287,10 @@ public class TestWindowOperator {
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, 
taskCoordinator);
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     //assert that the count trigger fired
     Assert.assertEquals(windowPanes.size(), 1);
 
@@ -264,9 +300,9 @@ public class TestWindowOperator {
     //assert that the triggering of the count trigger cancelled the inner 
timeSinceFirstMessage trigger
     Assert.assertEquals(windowPanes.size(), 1);
 
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
     //advance timer by 500 more millis to enable the default trigger
     testClock.advanceTime(Duration.ofMillis(500));
@@ -279,7 +315,7 @@ public class TestWindowOperator {
     Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
     Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
 
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
     //advance timer by 500 millis to enable the inner timeSinceFirstMessage 
trigger
     testClock.advanceTime(Duration.ofMillis(500));
@@ -304,28 +340,32 @@ public class TestWindowOperator {
 
     StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, 
Duration.ofSeconds(1),
         Triggers.repeat(Triggers.any(Triggers.count(2), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new 
ArrayList<>();
+
+    MessageCollector messageCollector = envelope -> 
windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) 
envelope.getMessage());
+
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     //assert that the count trigger fired
     Assert.assertEquals(windowPanes.size(), 1);
 
     //advance the timer to enable the potential triggering of the inner 
timeSinceFirstMessage trigger
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofMillis(500));
     //assert that the triggering of the count trigger cancelled the inner 
timeSinceFirstMessage trigger
     task.window(messageCollector, taskCoordinator);
     Assert.assertEquals(windowPanes.size(), 2);
 
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     Assert.assertEquals(windowPanes.size(), 3);
 
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     //advance timer by 500 more millis to enable the default trigger
     testClock.advanceTime(Duration.ofMillis(500));
     task.window(messageCollector, taskCoordinator);
@@ -337,10 +377,11 @@ public class TestWindowOperator {
 
     private final AccumulationMode mode;
     private final Duration duration;
-    private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+    private final Trigger<IntegerEnvelope> earlyTrigger;
+    private final SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
 
     KeyedTumblingWindowStreamApplication(AccumulationMode mode,
-        Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> 
earlyTrigger) {
+        Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
       this.mode = mode;
       this.duration = timeDuration;
       this.earlyTrigger = earlyTrigger;
@@ -348,68 +389,79 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = 
graph.getInputStream("integer-stream",
-          (k, m) -> new MessageEnvelope(k, m));
-      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> 
m.getKey();
+      MessageStream<IntegerEnvelope> inStream = 
graph.getInputStream("integers",
+          (k, m) -> new IntegerEnvelope((Integer) k));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
         .map(m -> m)
         .window(Windows.keyedTumblingWindow(keyFn, 
duration).setEarlyTrigger(earlyTrigger)
           .setAccumulationMode(mode))
-        .map(m -> {
-            windowPanes.add(m);
-            return m;
-          });
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+            });
     }
   }
 
-  private class KeyedSessionWindowStreamApplication implements 
StreamApplication {
+  private class TumblingWindowStreamApplication implements StreamApplication {
 
     private final AccumulationMode mode;
     private final Duration duration;
+    private final Trigger<IntegerEnvelope> earlyTrigger;
+    private final SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
 
-    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration 
duration) {
+    TumblingWindowStreamApplication(AccumulationMode mode,
+                                         Duration timeDuration, 
Trigger<IntegerEnvelope> earlyTrigger) {
       this.mode = mode;
-      this.duration = duration;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
     }
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = 
graph.getInputStream("integer-stream",
-          (k, m) -> new MessageEnvelope(k, m));
-      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> 
m.getKey();
-
+      MessageStream<IntegerEnvelope> inStream = 
graph.getInputStream("integers",
+          (k, m) -> new IntegerEnvelope((Integer) k));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
           .map(m -> m)
-          .window(Windows.keyedSessionWindow(keyFn, duration)
+          
.window(Windows.<IntegerEnvelope>tumblingWindow(duration).setEarlyTrigger(earlyTrigger)
               .setAccumulationMode(mode))
-          .map(m -> {
-              windowPanes.add(m);
-              return m;
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
             });
     }
   }
 
-  private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
-    IntegerMessageEnvelope(int key, int msg) {
-      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), 
"1", key, msg);
-    }
-  }
+  private class KeyedSessionWindowStreamApplication implements 
StreamApplication {
 
-  private class MessageEnvelope<K, V> {
-    private final K key;
-    private final V value;
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
 
-    MessageEnvelope(K key, V value) {
-      this.key = key;
-      this.value = value;
+    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration 
duration) {
+      this.mode = mode;
+      this.duration = duration;
     }
 
-    public K getKey() {
-      return key;
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<IntegerEnvelope> inStream = 
graph.getInputStream("integers",
+          (k, m) -> new IntegerEnvelope((Integer) k));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedSessionWindow(keyFn, duration)
+              .setAccumulationMode(mode))
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+            });
     }
+  }
+
+  private class IntegerEnvelope extends IncomingMessageEnvelope  {
 
-    public V getValue() {
-      return value;
+    IntegerEnvelope(Integer key) {
+      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), 
"1", key, key);
     }
   }
 }

Reply via email to