Repository: samza Updated Branches: refs/heads/master 05915bfc8 -> d399d6f3c
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java new file mode 100644 index 0000000..8544efd --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link TimeSinceLastMessageTrigger} + * @param <M> the type of the incoming message + */ +public class TimeSinceLastMessageTriggerImpl<M, WK> implements TriggerImpl<M, WK> { + + private static final Logger LOG = LoggerFactory.getLogger(TimeSinceLastMessageTriggerImpl.class); + private final TimeSinceLastMessageTrigger<M> trigger; + private final long durationMs; + private final Clock clock; + private final TriggerKey<WK> triggerKey; + private long callbackTime = Integer.MIN_VALUE; + private Cancellable cancellable = null; + private boolean shouldFire = false; + + public TimeSinceLastMessageTriggerImpl(TimeSinceLastMessageTrigger<M> trigger, Clock clock, TriggerKey<WK> key) { + this.trigger = trigger; + this.durationMs = trigger.getDuration().toMillis(); + this.clock = clock; + this.triggerKey = key; + } + + @Override + public void onMessage(M message, TriggerScheduler<WK> context) { + if (!shouldFire) { + long currTime = clock.currentTimeMillis(); + + if (currTime < callbackTime && cancellable != null) { + cancellable.cancel(); + } + + callbackTime = currTime + durationMs; + Runnable runnable = () -> { + LOG.trace("Time since last message trigger fired"); + shouldFire = true; + }; + + cancellable = context.scheduleCallback(runnable, callbackTime, triggerKey); + } + } + + @Override + public void cancel() { + if (cancellable != null) { + cancellable.cancel(); + } + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java new file mode 100644 index 0000000..2454ce9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link TimeTrigger} + */ +public class TimeTriggerImpl<M, WK> implements TriggerImpl<M, WK> { + + private static final Logger LOG = LoggerFactory.getLogger(TimeTriggerImpl.class); + + private final TimeTrigger<M> trigger; + private final TriggerKey<WK> triggerKey; + private Cancellable cancellable; + private final Clock clock; + private boolean shouldFire = false; + + public TimeTriggerImpl(TimeTrigger<M> trigger, Clock clock, TriggerKey<WK> key) { + this.trigger = trigger; + this.clock = clock; + this.triggerKey = key; + } + + public void onMessage(M message, TriggerScheduler<WK> context) { + final long now = clock.currentTimeMillis(); + long triggerDurationMs = trigger.getDuration().toMillis(); + Long callbackTime = (now - now % triggerDurationMs) + triggerDurationMs; + + if (cancellable == null) { + cancellable = context.scheduleCallback(() -> { + LOG.trace("Time trigger fired"); + shouldFire = true; + }, callbackTime, triggerKey); + } + } + + @Override + public void cancel() { + if (cancellable != null) { + cancellable.cancel(); + } + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java new file mode 100644 index 0000000..705cab7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + + +import org.apache.samza.operators.impl.TriggerScheduler; + +/** + * Implementation class for a {@link Trigger}. A {@link TriggerImpl} is used with a + * which is invoked when the trigger fires. + * + * <p> When messages arrive in the {@code WindowOperatorImpl}, they are assigned to one or more windows. An + * instance of a {@link TriggerImpl} is created corresponding to each {@link Trigger} configured for a window. For every + * message added to the window, the {@code WindowOperatorImpl} invokes the {@link #onMessage} on its corresponding + * {@link TriggerImpl}s. A {@link TriggerImpl} instance is scoped to a window and its firing determines when results for + * its window are emitted. + * + * {@link TriggerImpl}s can use the {@link TriggerScheduler} to schedule and cancel callbacks (for example, implementations + * of time-based triggers). + * + * <p> State management: The state maintained by {@link TriggerImpl}s is not durable across re-starts and is transient. + * New instances of {@link TriggerImpl} are created on a re-start. + * + */ +public interface TriggerImpl<M, WK> { + + /** + * Invoked when a message is added to the window corresponding to this {@link TriggerImpl}. + * @param message the incoming message + * @param context the {@link TriggerScheduler} to schedule and cancel callbacks + */ + public void onMessage(M message, TriggerScheduler<WK> context); + + /** + * Returns {@code true} if the current state of the trigger indicates that its condition + * is satisfied and it is ready to fire. + * @return if this trigger should fire. + */ + public boolean shouldFire(); + + /** + * Invoked when the execution of this {@link TriggerImpl} is canceled by an up-stream {@link TriggerImpl}. + * + * No calls to {@link #onMessage(Object, TriggerScheduler)} or {@link #shouldFire()} will be invoked + * after this invocation. + */ + public void cancel(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java new file mode 100644 index 0000000..f64a1db --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.SamzaException; +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.util.Clock; + +/** + * Factory methods for instantiating {@link TriggerImpl}s from individual {@link Trigger}s. + */ +public class TriggerImpls { + + public static <M, WK> TriggerImpl<M, WK> createTriggerImpl(Trigger<M> trigger, Clock clock, TriggerKey<WK> triggerKey) { + + if (trigger == null) { + throw new IllegalArgumentException("Trigger must not be null"); + } + + if (trigger instanceof CountTrigger) { + return new CountTriggerImpl<>((CountTrigger<M>) trigger, triggerKey); + } else if (trigger instanceof RepeatingTrigger) { + return new RepeatingTriggerImpl<>((RepeatingTrigger<M>) trigger, clock, triggerKey); + } else if (trigger instanceof AnyTrigger) { + return new AnyTriggerImpl<>((AnyTrigger<M>) trigger, clock, triggerKey); + } else if (trigger instanceof TimeSinceLastMessageTrigger) { + return new TimeSinceLastMessageTriggerImpl<>((TimeSinceLastMessageTrigger<M>) trigger, clock, triggerKey); + } else if (trigger instanceof TimeTrigger) { + return new TimeTriggerImpl((TimeTrigger<M>) trigger, clock, triggerKey); + } else if (trigger instanceof TimeSinceFirstMessageTrigger) { + return new TimeSinceFirstMessageTriggerImpl<>((TimeSinceFirstMessageTrigger<M>) trigger, clock, triggerKey); + } + + throw new SamzaException("No implementation class defined for the trigger " + trigger.getClass().getCanonicalName()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java index e5dab80..b8672c6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.operators.util; import org.apache.samza.storage.kv.Entry; @@ -30,24 +31,28 @@ import java.util.Map; /** * Implements a {@link KeyValueStore} using an in-memory Java Map. - * @param <K> the type of the key in the store - * @param <V> the type of the value in the store + * @param <K> the type of key + * @param <V> the type of value + * + * TODO: This class is a stop-gap until we implement persistent store creation from TaskContext. * - * TODO HIGH prateekm: Remove when we switch to an persistent implementation for KeyValueStore API. */ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> { - final Map<K, V> map = new LinkedHashMap<>(); + private final Map<K, V> map = new LinkedHashMap<>(); @Override public V get(K key) { + if (key == null) { + throw new NullPointerException("Null key provided"); + } return map.get(key); } @Override public Map<K, V> getAll(List<K> keys) { Map<K, V> values = new HashMap<>(); - for (K key: keys) { + for (K key : keys) { values.put(key, map.get(key)); } return values; @@ -55,18 +60,24 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> { @Override public void put(K key, V value) { + if (key == null) { + throw new NullPointerException("Null key provided"); + } map.put(key, value); } @Override public void putAll(List<Entry<K, V>> entries) { - for (Entry<K, V> entry: entries) { + for (Entry<K, V> entry : entries) { put(entry.getKey(), entry.getValue()); } } @Override public void delete(K key) { + if (key == null) { + throw new NullPointerException("Null key provided"); + } map.remove(key); } @@ -119,4 +130,4 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> { public void flush() { //not applicable } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index 9ec8e5a..d4224c3 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -20,6 +20,8 @@ package org.apache.samza.task; import java.util.HashMap; import java.util.Map; +import java.util.Set; + import org.apache.samza.config.Config; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.MessageStreamImpl; @@ -30,6 +32,9 @@ import org.apache.samza.operators.impl.OperatorGraph; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.SystemClock; /** @@ -65,16 +70,27 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo /** * A mapping from each {@link SystemStream} to the root node of its operator chain DAG. */ - private final OperatorGraph operatorGraph = new OperatorGraph(); + private final OperatorGraph operatorGraph; private final StreamApplication graphBuilder; private final ApplicationRunner runner; + private final Clock clock; + private ContextManager contextManager; + private Set<SystemStreamPartition> systemStreamPartitions; + public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner runner) { + this(graphBuilder, SystemClock.instance(), runner); + } + + // purely for testing. + public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, ApplicationRunner runner) { this.graphBuilder = graphBuilder; + this.operatorGraph = new OperatorGraph(clock); + this.clock = clock; this.runner = runner; } @@ -85,9 +101,10 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo this.graphBuilder.init(streamGraph, config); // get the context manager of the {@link StreamGraph} and initialize the task-specific context this.contextManager = streamGraph.getContextManager(); + this.systemStreamPartitions = context.getSystemStreamPartitions(); Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>(); - context.getSystemStreamPartitions().forEach(ssp -> { + systemStreamPartitions.forEach(ssp -> { if (!inputBySystemStream.containsKey(ssp.getSystemStream())) { // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream} inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streamGraph.getInputStream(ssp.getSystemStream())); @@ -103,8 +120,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo } @Override - public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - this.operatorGraph.getAll().forEach(r -> r.onTimer(collector, coordinator)); + public final void window(MessageCollector collector, TaskCoordinator coordinator) { + systemStreamPartitions.forEach(ssp -> { + this.operatorGraph.get(ssp.getSystemStream()) + .onTick(collector, coordinator); + }); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java index 543716a..6edf048 100644 --- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -33,6 +33,7 @@ import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; import java.time.Duration; +import java.util.function.Supplier; /** @@ -44,9 +45,10 @@ public class PageViewCounterExample implements StreamApplication { MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + Supplier<Integer> initialValue = () -> 0; pageViewEvents. - window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). + window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 1). setEarlyTrigger(Triggers.repeat(Triggers.count(5))). setAccumulationMode(AccumulationMode.DISCARDING)). map(MyStreamOutput::new). http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java index 729b26f..e222fe4 100644 --- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -31,6 +31,7 @@ import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; import java.time.Duration; +import java.util.function.Supplier; /** @@ -67,11 +68,12 @@ public class RepartitionExample implements StreamApplication { MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + Supplier<Integer> initialValue = () -> 0; pageViewEvents. partitionBy(m -> m.getMessage().memberId). window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow( - msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)). + msg -> msg.getMessage().memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1)). map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java index d988270..1c30a21 100644 --- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java @@ -21,13 +21,15 @@ package org.apache.samza.example; import java.time.Duration; import java.util.Set; -import java.util.function.BiFunction; +import java.util.function.Supplier; + import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.Windows; import org.apache.samza.system.StreamSpec; @@ -65,18 +67,20 @@ public class TestBroadcastExample extends TestExampleBase { @Override public void init(StreamGraph graph, Config config) { - BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; + FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c + 1; + Supplier<Integer> initialValue = () -> 0; + inputs.keySet().forEach(entry -> { MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream( new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage); - inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); }); http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java index 6896da5..c88df7c 100644 --- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java @@ -25,13 +25,14 @@ import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.windows.Windows; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import java.time.Duration; -import java.util.function.BiFunction; import java.util.Set; +import java.util.function.Supplier; /** @@ -57,11 +58,12 @@ public class TestWindowExample extends TestExampleBase { @Override public void init(StreamGraph graph, Config config) { - BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; + FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c + 1; + Supplier<Integer> initialValue = () -> 0; inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream( new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null). map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), - m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); + m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator))); } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 361972e..5722dbd 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -50,7 +50,12 @@ public class TestOperatorImpl { TestOperatorImpl.this.curCollector = collector; TestOperatorImpl.this.curCoordinator = coordinator; } - }; + @Override + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + + } + + }; // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext() OperatorImpl mockSub = mock(OperatorImpl.class); opImpl.registerNextOperator(mockSub); http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java index 088cb00..31f6f4a 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -38,6 +38,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.windows.Windows; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.task.TaskContext; import org.junit.Before; import org.junit.Test; @@ -77,7 +78,7 @@ public class TestOperatorImpls { public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException { // get window operator WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class); - WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null); + WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING); when(mockWnd.getWindow()).thenReturn(windowInternal); MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class); Config mockConfig = mock(Config.class); http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java index ae3d151..ec1d74c 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java @@ -25,18 +25,20 @@ import org.apache.samza.operators.TestMessageStreamImplUtil; import org.apache.samza.operators.TestOutputMessageEnvelope; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; import java.util.ArrayList; import java.util.Collection; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -69,17 +71,17 @@ public class TestOperatorSpecs { @Test public void testGetWindowOperator() throws Exception { Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey"; - BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1; - + FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1; + Supplier<Integer> initialValue = () -> 0; //instantiate a window using reflection - WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null); + WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING); StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class); WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut); assertEquals(spec.getWindow(), window); assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor); - assertEquals(spec.getWindow().getFoldFunction(), aggregator); + assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java new file mode 100644 index 0000000..674a8f1 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.util.Clock; + +import java.time.Duration; + +/** + * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration. + * Used for testing. + */ +public class TestClock implements Clock { + + long currentTime = 1; + + public void advanceTime(Duration duration) { + currentTime += duration.toMillis(); + } + + public void advanceTime(long millis) { + currentTime += millis; + } + + @Override + public long currentTimeMillis() { + return currentTime; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java new file mode 100644 index 0000000..0d720dd --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import junit.framework.Assert; +import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestWindowOperator { + private final MessageCollector messageCollector = mock(MessageCollector.class); + private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); + private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>(); + private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); + private Config config; + private TaskContext taskContext; + private ApplicationRunner runner; + + @Before + public void setup() throws Exception { + windowPanes.clear(); + + config = mock(Config.class); + taskContext = mock(TaskContext.class); + runner = mock(ApplicationRunner.class); + when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet + .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); + + } + + @Test + public void testTumblingWindowsDiscardingMode() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 5); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(4).getMessage()).size(), 1); + } + + @Test + public void testTumblingWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 7); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 4); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 4); + } + + @Test + public void testSessionWindowsDiscardingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001"); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001"); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001"); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2); + + } + + @Test + public void testSessionWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 4); + } + + @Test + public void testCancelationOfOnceTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.count(2)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + + task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 1); + + } + + @Test + public void testCancelationOfAnyTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + Assert.assertEquals(windowPanes.size(), 1); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 5); + + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + + //advance timer by > 500 millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(900)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000"); + } + + @Test + public void testCancelationOfRepeatingNestedTriggers() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofMillis(500)); + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 3); + + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 4); + } + + private class KeyedTumblingWindowStreamApplication implements StreamApplication { + + private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka"); + private final AccumulationMode mode; + private final Duration duration; + private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger; + + KeyedTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) { + this.mode = mode; + this.duration = timeDuration; + this.earlyTrigger = earlyTrigger; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null); + Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); + inStream + .map(m -> m) + .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode)) + .map(m -> { + windowPanes.add(m); + return m; + }); + } + } + + private class KeyedSessionWindowStreamApplication implements StreamApplication { + + private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka"); + private final AccumulationMode mode; + private final Duration duration; + + KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { + this.mode = mode; + this.duration = duration; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null); + Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); + + inStream + .map(m -> m) + .window(Windows.keyedSessionWindow(keyFn, duration) + .setAccumulationMode(mode)) + .map(m -> { + windowPanes.add(m); + return m; + }); + } + } + + private class IntegerMessageEnvelope extends IncomingMessageEnvelope { + IntegerMessageEnvelope(int key, int msg) { + super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg); + } + } +}
