Repository: samza Updated Branches: refs/heads/master 20b427cc8 -> 81b173246
SAMZA-1249: Fix equality for WindowKey for Non-keyed tumbling windows - Fix a `ClassCastException` and an NPE when using Tumbling window without keys - Fix equality and hashCode for `WindowKey` - Refactor the `TestWindowOperator` unit tests using simpler types and a mock `MessageCollector`. More details in `SAMZA-1249` Author: vjagadish1989 <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Jacob Maes <[email protected]> Closes #149 from vjagadish1989/samza-1249 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/81b17324 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/81b17324 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/81b17324 Branch: refs/heads/master Commit: 81b17324689696f400f700277ef7b8faa2db484d Parents: 20b427c Author: vjagadish1989 <[email protected]> Authored: Mon May 1 13:57:25 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Mon May 1 13:57:25 2017 -0700 ---------------------------------------------------------------------- .../samza/operators/windows/WindowKey.java | 12 +- .../apache/samza/operators/windows/Windows.java | 2 +- .../samza/operators/TestWindowOperator.java | 204 ++++++++++++------- 3 files changed, 132 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/81b17324/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java index bf52724..a1e7774 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java @@ -65,21 +65,15 @@ public class WindowKey<K> { WindowKey<?> windowKey = (WindowKey<?>) o; - if (!key.equals(windowKey.key)) return false; - - if (paneId == null) { - return windowKey.paneId == null; - } - - return paneId.equals(windowKey.paneId); + if (key != null ? !key.equals(windowKey.key) : windowKey.key != null) return false; + return !(paneId != null ? !paneId.equals(windowKey.paneId) : windowKey.paneId != null); } @Override public int hashCode() { - int result = key.hashCode(); + int result = key != null ? key.hashCode() : 0; result = 31 * result + (paneId != null ? paneId.hashCode() : 0); return result; } - } http://git-wip-us.apache.org/repos/asf/samza/blob/81b17324/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index 721b4c0..a0269cd 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -178,7 +178,7 @@ public final class Windows { */ public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) { - Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration)); + Trigger<M> defaultTrigger = new TimeTrigger<>(duration); return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, null, null, WindowType.TUMBLING); } http://git-wip-us.apache.org/repos/asf/samza/blob/81b17324/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java index 597244e..ca8a151 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -35,7 +35,9 @@ import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; @@ -55,9 +57,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestWindowOperator { - private final MessageCollector messageCollector = mock(MessageCollector.class); private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); - private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>(); private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); private Config config; private TaskContext taskContext; @@ -65,15 +65,13 @@ public class TestWindowOperator { @Before public void setup() throws Exception { - windowPanes.clear(); - config = mock(Config.class); taskContext = mock(TaskContext.class); runner = mock(ApplicationRunner.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka")); + when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka")); } @Test @@ -81,11 +79,13 @@ public class TestWindowOperator { StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + TestClock testClock = new TestClock(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - - integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -107,14 +107,42 @@ public class TestWindowOperator { } @Test + public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception { + + StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + Assert.assertEquals(windowPanes.size(), 0); + + integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); + Assert.assertEquals(windowPanes.size(), 0); + + testClock.advanceTime(Duration.ofSeconds(1)); + Assert.assertEquals(windowPanes.size(), 0); + + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9); + } + + + @Test public void testTumblingWindowsAccumulatingMode() throws Exception { StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); TestClock testClock = new TestClock(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -136,11 +164,12 @@ public class TestWindowOperator { public void testSessionWindowsDiscardingMode() throws Exception { StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); TestClock testClock = new TestClock(); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -148,10 +177,10 @@ public class TestWindowOperator { Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -164,8 +193,8 @@ public class TestWindowOperator { Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -182,17 +211,20 @@ public class TestWindowOperator { Duration.ofMillis(500)); TestClock testClock = new TestClock(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); task.init(config, taskContext); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofSeconds(1)); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -212,16 +244,18 @@ public class TestWindowOperator { StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); Assert.assertEquals(windowPanes.size(), 1); Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY); - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); Assert.assertEquals(windowPanes.size(), 1); @@ -233,7 +267,7 @@ public class TestWindowOperator { Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); - task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofSeconds(1)); task.window(messageCollector, taskCoordinator); @@ -253,8 +287,10 @@ public class TestWindowOperator { StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); //assert that the count trigger fired Assert.assertEquals(windowPanes.size(), 1); @@ -264,9 +300,9 @@ public class TestWindowOperator { //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger Assert.assertEquals(windowPanes.size(), 1); - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); //advance timer by 500 more millis to enable the default trigger testClock.advanceTime(Duration.ofMillis(500)); @@ -279,7 +315,7 @@ public class TestWindowOperator { Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5); - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger testClock.advanceTime(Duration.ofMillis(500)); @@ -304,28 +340,32 @@ public class TestWindowOperator { StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + TestClock testClock = new TestClock(); StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); task.init(config, taskContext); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); //assert that the count trigger fired Assert.assertEquals(windowPanes.size(), 1); //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); testClock.advanceTime(Duration.ofMillis(500)); //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger task.window(messageCollector, taskCoordinator); Assert.assertEquals(windowPanes.size(), 2); - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); Assert.assertEquals(windowPanes.size(), 3); - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); //advance timer by 500 more millis to enable the default trigger testClock.advanceTime(Duration.ofMillis(500)); task.window(messageCollector, taskCoordinator); @@ -337,10 +377,11 @@ public class TestWindowOperator { private final AccumulationMode mode; private final Duration duration; - private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger; + private final Trigger<IntegerEnvelope> earlyTrigger; + private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); KeyedTumblingWindowStreamApplication(AccumulationMode mode, - Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) { + Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) { this.mode = mode; this.duration = timeDuration; this.earlyTrigger = earlyTrigger; @@ -348,68 +389,79 @@ public class TestWindowOperator { @Override public void init(StreamGraph graph, Config config) { - MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream", - (k, m) -> new MessageEnvelope(k, m)); - Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); + MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers", + (k, m) -> new IntegerEnvelope((Integer) k)); + Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream .map(m -> m) .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger) .setAccumulationMode(mode)) - .map(m -> { - windowPanes.add(m); - return m; - }); + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); } } - private class KeyedSessionWindowStreamApplication implements StreamApplication { + private class TumblingWindowStreamApplication implements StreamApplication { private final AccumulationMode mode; private final Duration duration; + private final Trigger<IntegerEnvelope> earlyTrigger; + private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { + TumblingWindowStreamApplication(AccumulationMode mode, + Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) { this.mode = mode; - this.duration = duration; + this.duration = timeDuration; + this.earlyTrigger = earlyTrigger; } @Override public void init(StreamGraph graph, Config config) { - MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream", - (k, m) -> new MessageEnvelope(k, m)); - Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); - + MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers", + (k, m) -> new IntegerEnvelope((Integer) k)); + Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream .map(m -> m) - .window(Windows.keyedSessionWindow(keyFn, duration) + .window(Windows.<IntegerEnvelope>tumblingWindow(duration).setEarlyTrigger(earlyTrigger) .setAccumulationMode(mode)) - .map(m -> { - windowPanes.add(m); - return m; + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); }); } } - private class IntegerMessageEnvelope extends IncomingMessageEnvelope { - IntegerMessageEnvelope(int key, int msg) { - super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg); - } - } + private class KeyedSessionWindowStreamApplication implements StreamApplication { - private class MessageEnvelope<K, V> { - private final K key; - private final V value; + private final AccumulationMode mode; + private final Duration duration; + private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - MessageEnvelope(K key, V value) { - this.key = key; - this.value = value; + KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { + this.mode = mode; + this.duration = duration; } - public K getKey() { - return key; + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers", + (k, m) -> new IntegerEnvelope((Integer) k)); + Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); + + inStream + .map(m -> m) + .window(Windows.keyedSessionWindow(keyFn, duration) + .setAccumulationMode(mode)) + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); } + } + + private class IntegerEnvelope extends IncomingMessageEnvelope { - public V getValue() { - return value; + IntegerEnvelope(Integer key) { + super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key); } } }
