Repository: samza Updated Branches: refs/heads/master f2c14fa63 -> 7a2e19250
SAMZA-1494; Flush operator state at end of stream - Propagate operator messages at endOfStream to all down-stream operators. - Emit all pending windows when endOfStream is reached. - Flush all state on endOfStream irrespective of auto-commit behavior. Author: Jagadish <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #366 from vjagadish1989/eos-flush Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a2e1925 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a2e1925 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a2e1925 Branch: refs/heads/master Commit: 7a2e19250f5a74f34eef1d53de5d52399850bc89 Parents: f2c14fa Author: Jagadish <[email protected]> Authored: Tue Nov 21 12:42:17 2017 -0800 Committer: Jagadish <[email protected]> Committed: Tue Nov 21 12:42:17 2017 -0800 ---------------------------------------------------------------------- .../org/apache/samza/config/TaskConfigJava.java | 2 +- .../samza/operators/impl/OperatorImpl.java | 13 +- .../operators/impl/PartitionByOperatorImpl.java | 3 +- .../operators/impl/WindowOperatorImpl.java | 13 + .../org/apache/samza/config/TaskConfig.scala | 5 + .../apache/samza/config/TestTaskConfigJava.java | 20 + .../samza/operators/TestWindowOperator.java | 565 ---------------- .../operators/impl/TestWindowOperator.java | 677 +++++++++++++++++++ 8 files changed, 728 insertions(+), 570 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java index 0bf078e..04e15dc 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -116,7 +116,7 @@ public class TaskConfigJava extends MapConfig { Set<SystemStream> allInputSS = new HashSet<>(); TaskConfig taskConfig = TaskConfig.Config2Task(this); - allInputSS.addAll(JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava()); + allInputSS.addAll((Set<? extends SystemStream>) JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava()); allInputSS.addAll(getBroadcastSystemStreams()); return Collections.unmodifiableSet(allInputSS); http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 92a563a..862e5f9 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -266,6 +266,7 @@ public abstract class OperatorImpl<M, RM> { if (eosStates.allEndOfStream()) { // all inputs have been end-of-stream, shut down the task LOG.info("All input streams have reached the end for task {}", taskName.getTaskName()); + coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK); coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); } } @@ -279,7 +280,12 @@ public abstract class OperatorImpl<M, RM> { */ private final void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { if (inputStreams.stream().allMatch(input -> eosStates.isEndOfStream(input))) { - handleEndOfStream(collector, coordinator); + Collection<RM> results = handleEndOfStream(collector, coordinator); + + results.forEach(rm -> + this.registeredOperators.forEach(op -> + op.onMessage(rm, collector, coordinator))); + this.registeredOperators.forEach(op -> op.onEndOfStream(collector, coordinator)); } } @@ -291,9 +297,10 @@ public abstract class OperatorImpl<M, RM> { * override this to actually propagate EOS over the wire. * @param collector message collector * @param coordinator task coordinator + * @return results to be emitted when this operator reaches end-of-stream */ - protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { - //Do nothing by default + protected Collection<RM> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { + return Collections.emptyList(); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java index 3811da8..424c10f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java @@ -91,8 +91,9 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { } @Override - protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { + protected Collection<Void> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { sendControlMessage(new EndOfStreamMessage(taskName), collector); + return Collections.emptyList(); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 7031ff9..32406cb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -53,9 +53,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -195,6 +197,17 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje } @Override + protected Collection<WindowPane<K, Object>> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { + List<WindowPane<K, Object>> results = new ArrayList<>(); + Set<TriggerKey<K>> triggerKeys = new HashSet<>(triggers.keySet()); + for(TriggerKey<K> triggerKey : triggerKeys) { + Optional<WindowPane<K, Object>> triggerResult = onTriggerFired(triggerKey, collector, coordinator); + triggerResult.ifPresent(results::add); + } + return results; + } + + @Override protected void handleClose() { if (foldLeftFn != null) { foldLeftFn.close(); http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 2ee0032..419e15b 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -137,4 +137,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { case Some(asyncCommit) => Some(asyncCommit.toBoolean) case _ => None } + + def isAutoCommitEnabled() = getOption(TaskConfig.COMMIT_MS) match { + case Some(commitMs) => commitMs.toInt > 0 + case _ => true + } } http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java index 878ca01..baf2d4f 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Set; +import com.google.common.collect.ImmutableMap; import org.apache.samza.Partition; import org.apache.samza.system.SystemStreamPartition; import org.junit.Test; @@ -70,4 +71,23 @@ public class TestTaskConfigJava { } assertTrue(invalidFormatException); } + + @Test + public void testAutoCommitConfig() { + // positive values of commit.ms => autoCommit = true + Config config1 = new MapConfig(ImmutableMap.of("task.commit.ms", "1")); + assertTrue(new TaskConfig(config1).isAutoCommitEnabled()); + + // no value for commit.ms => autoCommit = true + Config config2 = new MapConfig(ImmutableMap.of()); + assertTrue(new TaskConfig(config2).isAutoCommitEnabled()); + + // A zero value for commit.ms => autoCommit = false + Config config3 = new MapConfig(ImmutableMap.of("task.commit.ms", "0")); + assertFalse(new TaskConfig(config3).isAutoCommitEnabled()); + + // negative value for commit.ms => autoCommit = false + Config config4 = new MapConfig(ImmutableMap.of("task.commit.ms", "-1")); + assertFalse(new TaskConfig(config4).isAutoCommitEnabled()); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java deleted file mode 100644 index 8abfb92..0000000 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators; - - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import junit.framework.Assert; -import org.apache.samza.Partition; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.container.TaskContextImpl; -import org.apache.samza.metrics.MetricsRegistryMap; -import org.apache.samza.operators.impl.store.TestInMemoryStore; -import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; -import org.apache.samza.operators.triggers.FiringType; -import org.apache.samza.operators.triggers.Trigger; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.AccumulationMode; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.ApplicationRunner; -import org.apache.samza.serializers.IntegerSerde; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamOperatorTask; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.testUtils.TestClock; -import org.junit.Before; -import org.junit.Test; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.function.Function; - -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestWindowOperator { - private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); - private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); - private Config config; - private TaskContextImpl taskContext; - private ApplicationRunner runner; - - @Before - public void setup() throws Exception { - config = mock(Config.class); - when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName"); - when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - taskContext = mock(TaskContextImpl.class); - runner = mock(ApplicationRunner.class); - Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde()); - Serde storeValSerde = new IntegerEnvelopeSerde(); - - when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet - .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); - when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - - when(taskContext.getStore("jobName-jobId-window-w1")) - .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); - when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka")); - } - - @Test - public void testTumblingWindowsDiscardingMode() throws Exception { - - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, - Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); - testClock.advanceTime(Duration.ofSeconds(1)); - - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 5); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); - Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); - Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); - Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3)); - Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1); - } - - @Test - public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception { - - StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING, - Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000))); - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - Assert.assertEquals(windowPanes.size(), 0); - - integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); - Assert.assertEquals(windowPanes.size(), 0); - - testClock.advanceTime(Duration.ofSeconds(1)); - Assert.assertEquals(windowPanes.size(), 0); - - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 1); - Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9); - } - - @Test - public void testTumblingAggregatingWindowsDiscardingMode() throws Exception { - - when(taskContext.getStore("jobName-jobId-window-w1")) - .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde())); - - StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING, - Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); - List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>(); - - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage()); - integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); - testClock.advanceTime(Duration.ofSeconds(1)); - - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 5); - Assert.assertEquals(windowPanes.get(0).getMessage(), new Integer(2)); - Assert.assertEquals(windowPanes.get(1).getMessage(), new Integer(2)); - Assert.assertEquals(windowPanes.get(2).getMessage(), new Integer(2)); - Assert.assertEquals(windowPanes.get(3).getMessage(), new Integer(2)); - Assert.assertEquals(windowPanes.get(4).getMessage(), new Integer(1)); - } - - @Test - public void testTumblingWindowsAccumulatingMode() throws Exception { - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, - Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 7); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); - Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); - Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4); - - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); - Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4); - } - - @Test - public void testSessionWindowsDiscardingMode() throws Exception { - StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); - TestClock testClock = new TestClock(); - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 1); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 3); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); - Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001"); - Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001"); - Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); - Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); - Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2); - - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 4); - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); - Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001"); - Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2); - - } - - @Test - public void testSessionWindowsAccumulatingMode() throws Exception { - StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, - Duration.ofMillis(500)); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - task.init(config, taskContext); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofSeconds(1)); - - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 2); - Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); - Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); - Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4); - } - - @Test - public void testCancellationOfOnceTrigger() throws Exception { - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, - Duration.ofSeconds(1), Triggers.count(2)); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 1); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 1); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 2); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); - Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); - Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); - - task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 3); - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3)); - Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); - Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT); - Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1); - - } - - @Test - public void testCancellationOfAnyTrigger() throws Exception { - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), - Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - //assert that the count trigger fired - Assert.assertEquals(windowPanes.size(), 1); - - //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger - testClock.advanceTime(Duration.ofMillis(500)); - - //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger - Assert.assertEquals(windowPanes.size(), 1); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - - //advance timer by 500 more millis to enable the default trigger - testClock.advanceTime(Duration.ofMillis(500)); - task.window(messageCollector, taskCoordinator); - - //assert that the default trigger fired - Assert.assertEquals(windowPanes.size(), 2); - Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); - Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - - //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger - testClock.advanceTime(Duration.ofMillis(500)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 3); - Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY); - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); - - //advance timer by > 500 millis to enable the default trigger - testClock.advanceTime(Duration.ofMillis(900)); - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 4); - Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT); - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000"); - } - - @Test - public void testCancelationOfRepeatingNestedTriggers() throws Exception { - - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), - Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); - List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); - - MessageCollector messageCollector = - envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); - - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); - task.init(config, taskContext); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - //assert that the count trigger fired - Assert.assertEquals(windowPanes.size(), 1); - - //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofMillis(500)); - //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 2); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 3); - - task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); - //advance timer by 500 more millis to enable the default trigger - testClock.advanceTime(Duration.ofMillis(500)); - task.window(messageCollector, taskCoordinator); - //assert that the default trigger fired - Assert.assertEquals(windowPanes.size(), 4); - } - - private class KeyedTumblingWindowStreamApplication implements StreamApplication { - - private final AccumulationMode mode; - private final Duration duration; - private final Trigger<IntegerEnvelope> earlyTrigger; - private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - - KeyedTumblingWindowStreamApplication(AccumulationMode mode, - Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) { - this.mode = mode; - this.duration = timeDuration; - this.earlyTrigger = earlyTrigger; - } - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<IntegerEnvelope> inStream = - graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) - .map(kv -> new IntegerEnvelope(kv.getKey())); - Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); - inStream - .map(m -> m) - .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()) - .setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - } - } - - private class TumblingWindowStreamApplication implements StreamApplication { - - private final AccumulationMode mode; - private final Duration duration; - private final Trigger<IntegerEnvelope> earlyTrigger; - private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - - TumblingWindowStreamApplication(AccumulationMode mode, - Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) { - this.mode = mode; - this.duration = timeDuration; - this.earlyTrigger = earlyTrigger; - } - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<IntegerEnvelope> inStream = - graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) - .map(kv -> new IntegerEnvelope(kv.getKey())); - Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); - inStream - .map(m -> m) - .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde()) - .setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - } - } - - private class AggregateTumblingWindowStreamApplication implements StreamApplication { - - private final AccumulationMode mode; - private final Duration duration; - private final Trigger<IntegerEnvelope> earlyTrigger; - private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - - AggregateTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, - Trigger<IntegerEnvelope> earlyTrigger) { - this.mode = mode; - this.duration = timeDuration; - this.earlyTrigger = earlyTrigger; - } - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers", - KVSerde.of(new IntegerSerde(), new IntegerSerde())); - - integers - .map(kv -> new IntegerEnvelope(kv.getKey())) - .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(this.duration, () -> 0, (m, c) -> c + 1, new IntegerSerde()) - .setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - } - } - - private class KeyedSessionWindowStreamApplication implements StreamApplication { - - private final AccumulationMode mode; - private final Duration duration; - private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - - KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { - this.mode = mode; - this.duration = duration; - } - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<IntegerEnvelope> inStream = - graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) - .map(kv -> new IntegerEnvelope(kv.getKey())); - Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); - - inStream - .map(m -> m) - .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()) - .setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - } - } - - private class IntegerEnvelope extends IncomingMessageEnvelope { - - IntegerEnvelope(Integer key) { - super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key); - } - } - - private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> { - private final IntegerSerde intSerde = new IntegerSerde(); - - @Override - public byte[] toBytes(IntegerEnvelope object) { - return intSerde.toBytes((Integer) object.getKey()); - } - - @Override - public IntegerEnvelope fromBytes(byte[] bytes) { - return new IntegerEnvelope(intSerde.fromBytes(bytes)); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java new file mode 100644 index 0000000..7d0c623 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java @@ -0,0 +1,677 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import junit.framework.Assert; +import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.container.TaskName; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.impl.store.TestInMemoryStore; +import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; +import org.apache.samza.operators.triggers.FiringType; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.testUtils.TestClock; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +public class TestWindowOperator { + private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); + private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); + private Config config; + private TaskContextImpl taskContext; + private ApplicationRunner runner; + + @Before + public void setup() throws Exception { + config = mock(Config.class); + when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName"); + when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); + taskContext = mock(TaskContextImpl.class); + runner = mock(ApplicationRunner.class); + Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde()); + Serde storeValSerde = new IntegerEnvelopeSerde(); + + when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet + .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); + when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + + when(taskContext.getStore("jobName-jobId-window-w1")) + .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); + when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka")); + } + + @Test + public void testTumblingWindowsDiscardingMode() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 5); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3)); + Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1); + } + + @Test + public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception { + + StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + Assert.assertEquals(windowPanes.size(), 0); + + integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); + Assert.assertEquals(windowPanes.size(), 0); + + testClock.advanceTime(Duration.ofSeconds(1)); + Assert.assertEquals(windowPanes.size(), 0); + + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9); + } + + @Test + public void testTumblingAggregatingWindowsDiscardingMode() throws Exception { + + when(taskContext.getStore("jobName-jobId-window-w1")) + .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde())); + + StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>(); + + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage()); + integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 5); + Assert.assertEquals(windowPanes.get(0).getMessage(), new Integer(2)); + Assert.assertEquals(windowPanes.get(1).getMessage(), new Integer(2)); + Assert.assertEquals(windowPanes.get(2).getMessage(), new Integer(2)); + Assert.assertEquals(windowPanes.get(3).getMessage(), new Integer(2)); + Assert.assertEquals(windowPanes.get(4).getMessage(), new Integer(1)); + } + + @Test + public void testTumblingWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 7); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4); + } + + @Test + public void testSessionWindowsDiscardingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001"); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001"); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2); + + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001"); + Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2); + } + + @Test + public void testSessionWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + task.init(config, taskContext); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4); + } + + @Test + public void testCancellationOfOnceTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, + Duration.ofSeconds(1), Triggers.count(2)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + + task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1); + + } + + @Test + public void testCancellationOfAnyTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + Assert.assertEquals(windowPanes.size(), 1); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + + //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + + //advance timer by > 500 millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(900)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000"); + } + + @Test + public void testCancelationOfRepeatingNestedTriggers() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofMillis(500)); + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 3); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 4); + } + + @Test + public void testEndOfStreamFlushesWithEarlyTriggerFirings() throws Exception { + EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka", + "integers", new Partition(0))), Collections.emptyMap()); + + when(taskContext.getTaskName()).thenReturn(new TaskName("task 1")); + when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates); + when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class)); + + StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING, + Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + Assert.assertEquals(windowPanes.size(), 0); + + List<Integer> integerList = ImmutableList.of(1, 2, 1, 2, 1); + integerList.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator)); + + // early triggers should emit (1,2) and (1,2) in the same window. + Assert.assertEquals(windowPanes.size(), 2); + + testClock.advanceTime(Duration.ofSeconds(1)); + Assert.assertEquals(windowPanes.size(), 2); + + final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope( + new SystemStreamPartition("kafka", "integers", new Partition(0))); + task.process(endOfStream, messageCollector, taskCoordinator); + + // end of stream flushes the last entry (1) + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2); + verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK); + verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); + } + + @Test + public void testEndOfStreamFlushesWithDefaultTriggerFirings() throws Exception { + EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka", + "integers", new Partition(0))), Collections.emptyMap()); + + when(taskContext.getTaskName()).thenReturn(new TaskName("task 1")); + when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates); + when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class)); + + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + testClock.advanceTime(1000); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + + final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope( + new SystemStreamPartition("kafka", "integers", new Partition(0))); + task.process(endOfStream, messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(0).getMessage().size(), 2); + verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK); + verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); + } + + @Test + public void testEndOfStreamFlushesWithNoTriggerFirings() throws Exception { + EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka", + "integers", new Partition(0))), Collections.emptyMap()); + + when(taskContext.getTaskName()).thenReturn(new TaskName("task 1")); + when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates); + when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class)); + + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>(); + StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock); + task.init(config, taskContext); + + MessageCollector messageCollector = + envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage()); + + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator); + + final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope( + new SystemStreamPartition("kafka", "integers", new Partition(0))); + task.process(endOfStream, messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getMessage().size(), 4); + verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK); + verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); + } + + private class KeyedTumblingWindowStreamApplication implements StreamApplication { + + private final AccumulationMode mode; + private final Duration duration; + private final Trigger<IntegerEnvelope> earlyTrigger; + private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + + KeyedTumblingWindowStreamApplication(AccumulationMode mode, + Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) { + this.mode = mode; + this.duration = timeDuration; + this.earlyTrigger = earlyTrigger; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<IntegerEnvelope> inStream = + graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) + .map(kv -> new IntegerEnvelope(kv.getKey())); + Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); + inStream + .map(m -> m) + .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()) + .setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + } + } + + private class TumblingWindowStreamApplication implements StreamApplication { + + private final AccumulationMode mode; + private final Duration duration; + private final Trigger<IntegerEnvelope> earlyTrigger; + private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + + TumblingWindowStreamApplication(AccumulationMode mode, + Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) { + this.mode = mode; + this.duration = timeDuration; + this.earlyTrigger = earlyTrigger; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<IntegerEnvelope> inStream = + graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) + .map(kv -> new IntegerEnvelope(kv.getKey())); + Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); + inStream + .map(m -> m) + .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde()) + .setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + } + } + + private class AggregateTumblingWindowStreamApplication implements StreamApplication { + + private final AccumulationMode mode; + private final Duration duration; + private final Trigger<IntegerEnvelope> earlyTrigger; + private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + + AggregateTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, + Trigger<IntegerEnvelope> earlyTrigger) { + this.mode = mode; + this.duration = timeDuration; + this.earlyTrigger = earlyTrigger; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers", + KVSerde.of(new IntegerSerde(), new IntegerSerde())); + + integers + .map(kv -> new IntegerEnvelope(kv.getKey())) + .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(this.duration, () -> 0, (m, c) -> c + 1, new IntegerSerde()) + .setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + } + } + + private class KeyedSessionWindowStreamApplication implements StreamApplication { + + private final AccumulationMode mode; + private final Duration duration; + private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + + KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { + this.mode = mode; + this.duration = duration; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<IntegerEnvelope> inStream = + graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) + .map(kv -> new IntegerEnvelope(kv.getKey())); + Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); + + inStream + .map(m -> m) + .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()) + .setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + } + } + + private class IntegerEnvelope extends IncomingMessageEnvelope { + + IntegerEnvelope(Integer key) { + super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key); + } + } + + private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> { + private final IntegerSerde intSerde = new IntegerSerde(); + + @Override + public byte[] toBytes(IntegerEnvelope object) { + return intSerde.toBytes((Integer) object.getKey()); + } + + @Override + public IntegerEnvelope fromBytes(byte[] bytes) { + return new IntegerEnvelope(intSerde.fromBytes(bytes)); + } + } +}
