SAMZA-1108: Implementation of Windows and various kinds of Triggers * Implemented various triggers and the orchestration logic of the window operator. * Implemented wire-up of window and the flow of messages through various trigger implementations. * Implementations for count, time, timeSinceFirst, timeSinceLast, Any, Repeating triggers.
Author: vjagadish1989 <jvenk...@linkedin.com> Reviewers: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>, Prateek Maheshwari <pmahe...@linkedin.com>, Chris Pettitt <cpett...@linkedin.com> Closes #66 from vjagadish1989/window-impl Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d399d6f3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d399d6f3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d399d6f3 Branch: refs/heads/master Commit: d399d6f3ca0fa20afcfad5864ec7f8550aa7a00f Parents: 05915bf Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Sun Mar 19 12:25:40 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Sun Mar 19 12:31:21 2017 -0700 ---------------------------------------------------------------------- checkstyle/checkstyle.xml | 7 +- .../operators/functions/FilterFunction.java | 2 +- .../operators/functions/FoldLeftFunction.java | 36 ++ .../samza/operators/functions/MapFunction.java | 5 +- .../samza/operators/triggers/AnyTrigger.java | 9 +- .../samza/operators/triggers/CountTrigger.java | 2 +- .../samza/operators/triggers/FiringType.java | 29 ++ .../operators/triggers/RepeatingTrigger.java | 4 + .../triggers/TimeSinceFirstMessageTrigger.java | 2 +- .../triggers/TimeSinceLastMessageTrigger.java | 3 +- .../samza/operators/triggers/TimeTrigger.java | 2 +- .../samza/operators/windows/WindowKey.java | 36 +- .../samza/operators/windows/WindowPane.java | 14 +- .../apache/samza/operators/windows/Windows.java | 153 ++------ .../windows/internal/WindowInternal.java | 65 +++- .../operators/windows/internal/WindowType.java | 24 ++ .../samza/operators/windows/TestWindowPane.java | 3 +- .../apache/samza/operators/StreamGraphImpl.java | 1 + .../org/apache/samza/operators/WindowState.java | 44 +++ .../samza/operators/impl/OperatorGraph.java | 16 +- .../samza/operators/impl/OperatorImpl.java | 30 +- .../operators/impl/PartialJoinOperatorImpl.java | 1 - .../apache/samza/operators/impl/TriggerKey.java | 73 ++++ .../samza/operators/impl/TriggerScheduler.java | 120 ++++++ .../operators/impl/WindowOperatorImpl.java | 290 +++++++++++++- .../operators/spec/WindowOperatorSpec.java | 11 +- .../operators/triggers/AnyTriggerImpl.java | 80 ++++ .../samza/operators/triggers/Cancellable.java | 34 ++ .../operators/triggers/CountTriggerImpl.java | 61 +++ .../triggers/RepeatingTriggerImpl.java | 67 ++++ .../TimeSinceFirstMessageTriggerImpl.java | 71 ++++ .../TimeSinceLastMessageTriggerImpl.java | 79 ++++ .../operators/triggers/TimeTriggerImpl.java | 71 ++++ .../samza/operators/triggers/TriggerImpl.java | 66 ++++ .../samza/operators/triggers/TriggerImpls.java | 53 +++ .../operators/util/InternalInMemoryStore.java | 25 +- .../apache/samza/task/StreamOperatorTask.java | 28 +- .../samza/example/PageViewCounterExample.java | 4 +- .../samza/example/RepartitionExample.java | 4 +- .../samza/example/TestBroadcastExample.java | 14 +- .../apache/samza/example/TestWindowExample.java | 8 +- .../samza/operators/impl/TestOperatorImpl.java | 7 +- .../samza/operators/impl/TestOperatorImpls.java | 3 +- .../samza/operators/spec/TestOperatorSpecs.java | 12 +- .../samza/operators/triggers/TestClock.java | 45 +++ .../operators/triggers/TestWindowOperator.java | 389 +++++++++++++++++++ 46 files changed, 1885 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 0999fd7..775d674 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -20,7 +20,9 @@ --> <module name="Checker"> <property name="localeLanguage" value="en"/> - + <!-- allow suppression for specific files --> + <module name="SuppressionCommentFilter"/> + <module name="FileTabCharacter"/> <!-- header: use one star only --> @@ -32,6 +34,7 @@ <!-- code cleanup --> <module name="UnusedImports"/> + <module name="FileContentsHolder"/> <module name="RedundantImport"/> <module name="IllegalImport" /> <module name="EqualsHashCode"/> @@ -62,8 +65,8 @@ <!-- whitespace --> <module name="GenericWhitespace"/> <module name="NoWhitespaceBefore"/> - <module name="WhitespaceAfter" /> <module name="NoWhitespaceAfter"/> + <module name="WhitespaceAfter" /> <module name="WhitespaceAround"> <property name="allowEmptyConstructors" value="true"/> <property name="allowEmptyMethods" value="true"/> http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java index 58479d6..143bae0 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java @@ -31,7 +31,7 @@ public interface FilterFunction<M> extends InitableFunction { /** * Returns a boolean indicating whether this message should be retained or filtered out. - * @param message the input message to be checked + * @param message the input message to be checked. This object should not be mutated. * @return true if {@code message} should be retained */ boolean apply(M message); http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java new file mode 100644 index 0000000..58e88fd --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.functions; + +/** + * A fold function that incrementally combines and aggregates values for a window. + */ +public interface FoldLeftFunction<M, WV> extends InitableFunction { + + /** + * Incrementally combine and aggregate values for the window. Guaranteed to be invoked for every + * message added to the window. + * + * @param message the incoming message that is added to the window. This object should not be mutated. + * @param oldValue the previous value + * @return the new value + */ + WV apply(M message, WV oldValue); +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java index 05a554f..b09fb99 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java @@ -31,8 +31,9 @@ import org.apache.samza.annotation.InterfaceStability; public interface MapFunction<M, OM> extends InitableFunction { /** - * Transforms the provided message into another message - * @param message the input message to be transformed + * Transforms the provided message into another message. + * + * @param message the input message to be transformed. This object should not be mutated. * @return the transformed message */ OM apply(M message); http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java index 6e134df..f52b57b 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java @@ -23,16 +23,15 @@ import java.util.List; /** * A {@link Trigger} fires as soon as any of its individual triggers has fired. */ -public class AnyTrigger<M> implements Trigger { +public class AnyTrigger<M> implements Trigger<M> { - private final List<Trigger> triggers; + private final List<Trigger<M>> triggers; - AnyTrigger(List<Trigger> triggers) { + AnyTrigger(List<Trigger<M>> triggers) { this.triggers = triggers; } - public List<Trigger> getTriggers() { + public List<Trigger<M>> getTriggers() { return triggers; } } - http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java index 1cf930c..dbae3a9 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java @@ -22,7 +22,7 @@ package org.apache.samza.operators.triggers; * A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane} * reaches the specified count. */ -public class CountTrigger<M> implements Trigger { +public class CountTrigger<M> implements Trigger<M> { private final long count; http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java new file mode 100644 index 0000000..49d971d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +/** + * The type of the {@link org.apache.samza.operators.triggers.Trigger} firing. + * Firings can be either early or late or default. Late triggers are not supported currently. + */ +public enum FiringType { + EARLY, + DEFAULT, + LATE +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java index 7f78eb8..166d0d9 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java @@ -28,5 +28,9 @@ class RepeatingTrigger<M> implements Trigger<M> { RepeatingTrigger(Trigger<M> trigger) { this.trigger = trigger; } + + public Trigger<M> getTrigger() { + return trigger; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java index 4de60a2..94b7769 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java @@ -26,7 +26,7 @@ import java.time.Duration; * A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in * the window pane. */ -public class TimeSinceFirstMessageTrigger<M> implements Trigger { +public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java index 6b09625..2231fd4 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java @@ -22,8 +22,9 @@ import java.time.Duration; /* * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration. + * @param <M> the type of the incoming {@link MessageEnvelope} */ -public class TimeSinceLastMessageTrigger<M> implements Trigger { +public class TimeSinceLastMessageTrigger<M> implements Trigger<M> { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java index c5875aa..d854d74 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java @@ -23,7 +23,7 @@ import java.time.Duration; /* * A {@link Trigger} that fires after the specified duration in processing time. */ -public class TimeTrigger<M> implements Trigger { +public class TimeTrigger<M> implements Trigger<M> { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java index 14bd5ab..bf52724 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java @@ -26,14 +26,19 @@ package org.apache.samza.operators.windows; * */ public class WindowKey<K> { - + /** + * A (key,paneId) tuple uniquely identifies an emission from a window. For instance, in case of keyed-tumbling time windows, + * the key is provided by the keyExtractor function, and the paneId is the start of the time window boundary. In case + * of session windows, the key is provided by the keyExtractor function, and the paneId is the time at which the earliest + * message in the window arrived. + */ private final K key; private final String paneId; - public WindowKey(K key, String windowId) { + public WindowKey(K key, String paneId) { this.key = key; - this.paneId = windowId; + this.paneId = paneId; } public K getKey() { @@ -52,4 +57,29 @@ public class WindowKey<K> { } return String.format("%s%s", wndKey, paneId); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + WindowKey<?> windowKey = (WindowKey<?>) o; + + if (!key.equals(windowKey.key)) return false; + + if (paneId == null) { + return windowKey.paneId == null; + } + + return paneId.equals(windowKey.paneId); + + } + + @Override + public int hashCode() { + int result = key.hashCode(); + result = 31 * result + (paneId != null ? paneId.hashCode() : 0); + return result; + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java index 3b66bd1..3b19f8a 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java @@ -18,6 +18,8 @@ */ package org.apache.samza.operators.windows; +import org.apache.samza.operators.triggers.FiringType; + /** * Specifies the result emitted from a {@link Window}. * @@ -32,10 +34,16 @@ public final class WindowPane<K, V> { private final AccumulationMode mode; - WindowPane(WindowKey<K> key, V value, AccumulationMode mode) { + /** + * The type of the trigger that emitted this result. Results can be emitted from early, late or default triggers. + */ + private final FiringType type; + + public WindowPane(WindowKey<K> key, V value, AccumulationMode mode, FiringType type) { this.key = key; this.value = value; this.mode = mode; + this.type = type; } public V getMessage() { @@ -46,8 +54,8 @@ public final class WindowPane<K, V> { return this.key; } - static public <K, M> WindowPane<K, M> of(WindowKey<K> key, M result) { - return new WindowPane<>(key, result, AccumulationMode.DISCARDING); + public FiringType getFiringType() { + return type; } } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index 73fb5c8..9192fc1 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -20,15 +20,18 @@ package org.apache.samza.operators.windows; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.TimeTrigger; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.internal.WindowType; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; /** * APIs for creating different types of {@link Window}s. @@ -84,6 +87,8 @@ import java.util.function.Function; * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window * types. * + * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of + * finer granularity are not supported. */ @InterfaceStability.Unstable public final class Windows { @@ -107,17 +112,18 @@ public final class Windows { * * @param keyFn the function to extract the window key from a message * @param interval the duration in processing time + * @param initialValue the initial value to be used for aggregations * @param foldFn the function to aggregate messages in the {@link WindowPane} * @param <M> the type of the input message * @param <WV> the type of the {@link WindowPane} output value * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function. */ - public static <M, K, WV> Window<M, K, WV> - keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, WV, WV> foldFn) { + public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<M, K> keyFn, Duration interval, + Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) { Trigger<M> defaultTrigger = new TimeTrigger<>(interval); - return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null); + return new WindowInternal<M, K, WV>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.TUMBLING); } @@ -142,11 +148,10 @@ public final class Windows { * @return the created {@link Window} function */ public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) { - BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return keyedTumblingWindow(keyFn, interval, aggregator); + FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); + + Supplier<Collection<M>> initialValue = () -> new ArrayList<>(); + return keyedTumblingWindow(keyFn, interval, initialValue, aggregator); } /** @@ -164,15 +169,16 @@ public final class Windows { * </pre> * * @param duration the duration in processing time + * @param initialValue the initial value to be used for aggregations * @param foldFn to aggregate messages in the {@link WindowPane} * @param <M> the type of the input message * @param <WV> the type of the {@link WindowPane} output value * @return the created {@link Window} function */ - public static <M, WV> Window<M, Void, WV> - tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) { + public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<WV> initialValue, + FoldLeftFunction<M, WV> foldFn) { Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration)); - return new WindowInternal<>(defaultTrigger, foldFn, null, null); + return new WindowInternal<>(defaultTrigger, initialValue, foldFn, null, null, WindowType.TUMBLING); } /** @@ -195,11 +201,10 @@ public final class Windows { * @return the created {@link Window} function */ public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) { - BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return tumblingWindow(duration, aggregator); + FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); + + Supplier<Collection<M>> initialValue = () -> new ArrayList<>(); + return tumblingWindow(duration, initialValue, aggregator); } /** @@ -223,15 +228,17 @@ public final class Windows { * * @param keyFn the function to extract the window key from a message * @param sessionGap the timeout gap for defining the session + * @param initialValue the initial value to be used for aggregations * @param foldFn the function to aggregate messages in the {@link WindowPane} * @param <M> the type of the input message * @param <K> the type of the key in the {@link Window} * @param <WV> the type of the output value in the {@link WindowPane} * @return the created {@link Window} function */ - public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) { + public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, + Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) { Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); - return new WindowInternal<>(defaultTrigger, foldFn, keyFn, null); + return new WindowInternal<>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.SESSION); } /** @@ -260,114 +267,18 @@ public final class Windows { */ public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) { - BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return keyedSessionWindow(keyFn, sessionGap, aggregator); - } - + FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); - /** - * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a - * default trigger. The triggering behavior must be specified by setting an early trigger. - * - * <p>The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when - * there are either 50 messages in the window pane or when 10 seconds have passed since the first message in the pane. - * - * <pre> {@code - * MessageStream<Long> stream = ...; - * BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c); - * MessageStream<WindowPane<Void, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator) - * .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10)))))) - * } - * </pre> - * - * @param foldFn the function to aggregate messages in the {@link WindowPane} - * @param <M> the type of message - * @param <WV> type of the output value in the {@link WindowPane} - * @return the created {@link Window} function. - */ - public static <M, WV> Window<M, Void, WV> globalWindow(BiFunction<M, WV, WV> foldFn) { - return new WindowInternal<>(null, foldFn, null, null); + Supplier<Collection<M>> initialValue = () -> new ArrayList<>(); + return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator); } - /** - * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a - * default trigger. The triggering behavior must be specified by setting an early trigger. - * - * The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes. - * <pre> {@code - * MessageStream<Long> stream = ...; - * MessageStream<WindowPane<Void, Collection<Long>> windowedStream = stream.window(Windows.globalWindow() - * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10)))))) - * } - * </pre> - * - * @param <M> the type of message - * @return the created {@link Window} function. - */ - public static <M> Window<M, Void, Collection<M>> globalWindow() { - BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return globalWindow(aggregator); - } - /** - * Returns a global {@link Window} that groups incoming messages using the provided keyFn. - * The window does not have a default trigger. The triggering behavior must be specified by setting an early - * trigger. - * - * <p> The below example groups the stream into count based windows. The window triggers every 50 messages or every - * 10 minutes. - * - * <pre> {@code - * MessageStream<UserClick> stream = ...; - * BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c); - * Function<UserClick, String> keyFn = ...; - * MessageStream<WindowPane<String, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator) - * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10)))))) - * } - * </pre> - * - * @param keyFn the function to extract the window key from a message - * @param foldFn the function to aggregate messages in the {@link WindowPane} - * @param <M> the type of message - * @param <K> type of the key in the {@link Window} - * @param <WV> the type of the output value in the {@link WindowPane} - * @return the created {@link Window} function - */ - public static <M, K, WV> Window<M, K, WV> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) { - return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null); - } - - /** - * Returns a global {@link Window} that groups incoming messages using the provided keyFn. - * The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger. - * - * <p> The below example groups the stream per-key into count based windows. The window triggers every 50 messages or - * every 10 minutes. - * - * <pre> {@code - * MessageStream<UserClick> stream = ...; - * Function<UserClick, String> keyFn = ...; - * MessageStream<WindowPane<String, Collection<UserClick>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn) - * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10)))))) - * } - * </pre> - * - * @param keyFn the function to extract the window key from a message - * @param <M> the type of message - * @param <K> the type of the key in the {@link Window} - * @return the created {@link Window} function - */ - public static <M, K> Window<M, K, Collection<M>> keyedGlobalWindow(Function<M, K> keyFn) { - BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { + private static <M> FoldLeftFunction<M, Collection<M>> createAggregator() { + return (m, c) -> { c.add(m); return c; }; - return keyedGlobalWindow(keyFn, aggregator); } + } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java index 9479eea..f6ac301 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java @@ -18,12 +18,13 @@ */ package org.apache.samza.operators.windows.internal; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.Window; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; /** * Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window} @@ -32,81 +33,105 @@ import java.util.function.Function; * Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers. * * @param <M> the type of input message - * @param <K> the type of key for the window + * @param <WK> the type of key for the window * @param <WV> the type of aggregated value in the window output */ @InterfaceStability.Unstable -public final class WindowInternal<M, K, WV> implements Window<M, K, WV> { +public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> { - private final Trigger defaultTrigger; + private final Trigger<M> defaultTrigger; + + /** + * The supplier of initial value to be used for windowed aggregations + */ + private final Supplier<WV> initializer; /* * The function that is applied each time a {@link MessageEnvelope} is added to this window. */ - private final BiFunction<M, WV, WV> foldFunction; + private final FoldLeftFunction<M, WV> foldLeftFunction; /* * The function that extracts the key from a {@link MessageEnvelope} */ - private final Function<M, K> keyExtractor; + private final Function<M, WK> keyExtractor; /* * The function that extracts the event time from a {@link MessageEnvelope} */ private final Function<M, Long> eventTimeExtractor; - private Trigger earlyTrigger; + /** + * The type of this window. Tumbling and Session windows are supported for now. + */ + private final WindowType windowType; + + private Trigger<M> earlyTrigger; - private Trigger lateTrigger; + private Trigger<M> lateTrigger; private AccumulationMode mode; - public WindowInternal(Trigger defaultTrigger, BiFunction<M, WV, WV> foldFunction, Function<M, K> keyExtractor, Function<M, Long> eventTimeExtractor) { - this.foldFunction = foldFunction; + public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldLeftFunction, Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType) { + this.defaultTrigger = defaultTrigger; + this.initializer = initialValue; + this.foldLeftFunction = foldLeftFunction; this.eventTimeExtractor = eventTimeExtractor; this.keyExtractor = keyExtractor; - this.defaultTrigger = defaultTrigger; + this.windowType = windowType; } @Override - public Window<M, K, WV> setEarlyTrigger(Trigger trigger) { + public Window<M, WK, WV> setEarlyTrigger(Trigger<M> trigger) { this.earlyTrigger = trigger; return this; } @Override - public Window<M, K, WV> setLateTrigger(Trigger trigger) { + public Window<M, WK, WV> setLateTrigger(Trigger<M> trigger) { this.lateTrigger = trigger; return this; } @Override - public Window<M, K, WV> setAccumulationMode(AccumulationMode mode) { + public Window<M, WK, WV> setAccumulationMode(AccumulationMode mode) { this.mode = mode; return this; } - public Trigger getDefaultTrigger() { + public Trigger<M> getDefaultTrigger() { return defaultTrigger; } - public Trigger getEarlyTrigger() { + public Trigger<M> getEarlyTrigger() { return earlyTrigger; } - public Trigger getLateTrigger() { + public Trigger<M> getLateTrigger() { return lateTrigger; } - public BiFunction<M, WV, WV> getFoldFunction() { - return foldFunction; + public Supplier<WV> getInitializer() { + return initializer; } - public Function<M, K> getKeyExtractor() { + public FoldLeftFunction<M, WV> getFoldLeftFunction() { + return foldLeftFunction; + } + + public Function<M, WK> getKeyExtractor() { return keyExtractor; } public Function<M, Long> getEventTimeExtractor() { return eventTimeExtractor; } + + public WindowType getWindowType() { + return windowType; + } + + public AccumulationMode getAccumulationMode() { + return mode; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java new file mode 100644 index 0000000..409d56a --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows.internal; + +public enum WindowType { + TUMBLING, SESSION + //,SLIDING +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java index 54d0b2f..4184c9d 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.windows; +import org.apache.samza.operators.triggers.FiringType; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -26,7 +27,7 @@ import static org.junit.Assert.assertEquals; public class TestWindowPane { @Test public void testConstructor() { - WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey<>("testMsg", null), 10); + WindowPane<String, Integer> wndOutput = new WindowPane(new WindowKey("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY); assertEquals(wndOutput.getKey().getKey(), "testMsg"); assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index f801097..1b36f76 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Function; + import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.data.MessageEnvelope; http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java new file mode 100644 index 0000000..4e80862 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +/** + * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata. + */ +public class WindowState<WV> { + + final WV wv; + /** + * Time of the first message in the window + */ + final long earliestRecvTime; + + public WindowState(WV wv, long earliestRecvTime) { + this.wv = wv; + this.earliestRecvTime = earliestRecvTime; + } + + public WV getWindowValue() { + return wv; + } + + public long getEarliestTimestamp() { + return earliestRecvTime; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java index 3efd5f5..ca8e34b 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java @@ -27,6 +27,8 @@ import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskContext; +import org.apache.samza.util.Clock; +import org.apache.samza.util.SystemClock; import java.util.Collection; import java.util.Collections; @@ -52,6 +54,16 @@ public class OperatorGraph { */ private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>(); + private final Clock clock; + + public OperatorGraph(Clock clock) { + this.clock = clock; + } + + public OperatorGraph() { + this(SystemClock.instance()); + } + /** * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}. * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and @@ -156,14 +168,14 @@ public class OperatorGraph { * @param context the {@link TaskContext} required to instantiate operators * @return the {@link OperatorImpl} implementation instance */ - private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) { + private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) { if (operatorSpec instanceof StreamOperatorSpec) { StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec; return new StreamOperatorImpl<>(streamOpSpec, source, config, context); } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context); } else if (operatorSpec instanceof WindowOperatorSpec) { - return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context); + return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock); } else if (operatorSpec instanceof PartialJoinOperatorSpec) { return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context); } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 9983307..b9a606b 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -52,41 +52,37 @@ public abstract class OperatorImpl<M, RM> { public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); /** - * Perform the actions required on a timer tick and call the downstream operators. - * - * Overriding implementations must call {@link #propagateTimer} to propagate the timer tick to registered - * downstream operators correctly. + * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)} * * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - propagateTimer(collector, coordinator); + public final void onTick(MessageCollector collector, TaskCoordinator coordinator) { + onTimer(collector, coordinator); + nextOperators.forEach(sub -> sub.onTick(collector, coordinator)); } /** - * Helper method to propagate the output of this operator to all registered downstream operators. - * - * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly. + * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output + * to registered downstream operators. * - * @param outputMessage output message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { - nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { } /** - * Helper method to propagate the timer tick to all registered downstream operators. + * Helper method to propagate the output of this operator to all registered downstream operators. * - * This method <b>must</b> be called from {@link #onTimer} to propagate the timer tick correctly. + * This method <b>must</b> be called from {@link #onNext} and {@link #onTimer} + * to propagate the operator output correctly. * + * @param outputMessage output message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - void propagateTimer(MessageCollector collector, TaskCoordinator coordinator) { - nextOperators.forEach(sub -> sub.onTimer(collector, coordinator)); + void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { + nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); } - } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index f704f3f..b2948a3 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -93,7 +93,6 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { thisState.deleteAll(keysToRemove); LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, System.currentTimeMillis() - now); - this.propagateTimer(collector, coordinator); } } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java new file mode 100644 index 0000000..49fefc0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.triggers.FiringType; +import org.apache.samza.operators.windows.WindowKey; + +/** + * Uniquely identifies a trigger firing + */ +public class TriggerKey<WK> { + private final FiringType type; + private final WindowKey<WK> key; + + public TriggerKey(FiringType type, WindowKey<WK> key) { + if (type == null) { + throw new IllegalArgumentException("Firing type cannot be null"); + } + + if (key == null) { + throw new IllegalArgumentException("WindowKey cannot be null"); + } + + this.type = type; + this.key = key; + } + + /** + * Equality is determined by both the type, and the window key. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TriggerKey<WK> that = (TriggerKey<WK>) o; + return type == that.type && key.equals(that.key); + } + + /** + * Hashcode is computed by from the type, and the window key. + */ + @Override + public int hashCode() { + int result = type.hashCode(); + result = 31 * result + key.hashCode(); + return result; + } + + public WindowKey<WK> getKey() { + return key; + } + + public FiringType getType() { + return type; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java new file mode 100644 index 0000000..952d9f1 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.triggers.Cancellable; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; + +/** + * Allows to schedule and cancel callbacks for triggers. + */ +public class TriggerScheduler<WK> { + + private static final Logger LOG = LoggerFactory.getLogger(TriggerScheduler.class); + + private final PriorityQueue<TriggerCallbackState<WK>> pendingCallbacks; + private final Clock clock; + + public TriggerScheduler(Clock clock) { + this.pendingCallbacks = new PriorityQueue<>(); + this.clock = clock; + } + + /** + * Schedule the provided runnable for execution at the specified duration. + * @param runnable the provided runnable to schedule. + * @param scheduledTimeMs time at which the runnable must be scheduled for execution + * @param triggerKey a key that uniquely identifies the corresponding trigger firing. + * @return a {@link Cancellable} that can be used to cancel the execution of this runnable. + */ + public Cancellable scheduleCallback(Runnable runnable, long scheduledTimeMs, TriggerKey<WK> triggerKey) { + TriggerCallbackState<WK> timerState = new TriggerCallbackState(triggerKey, runnable, scheduledTimeMs); + pendingCallbacks.add(timerState); + LOG.trace("Scheduled a new callback: {} at {} for triggerKey {}", new Object[] {runnable, scheduledTimeMs, triggerKey}); + return timerState; + } + + /** + * Run all pending callbacks that are ready to be scheduled. A callback is defined as "ready" if it's scheduledTime + * is less than or equal to {@link Clock#currentTimeMillis()} + * + * @return the list of {@link TriggerKey}s corresponding to the callbacks that were run. + */ + public List<TriggerKey<WK>> runPendingCallbacks() { + TriggerCallbackState<WK> state; + List<TriggerKey<WK>> keys = new ArrayList<>(); + long now = clock.currentTimeMillis(); + + while ((state = pendingCallbacks.peek()) != null && state.getScheduledTimeMs() <= now) { + pendingCallbacks.remove(); + state.getCallback().run(); + TriggerKey<WK> key = state.getTriggerKey(); + keys.add(key); + } + return keys; + } + + /** + * State corresponding to pending timer callbacks scheduled by various triggers. + */ + private class TriggerCallbackState<WK> implements Comparable<TriggerCallbackState<WK>>, Cancellable { + + private final TriggerKey<WK> triggerKey; + private final Runnable callback; + + // the time at which the callback should trigger + private final long scheduledTimeMs; + + private TriggerCallbackState(TriggerKey<WK> triggerKey, Runnable callback, long scheduledTimeMs) { + this.triggerKey = triggerKey; + this.callback = callback; + this.scheduledTimeMs = scheduledTimeMs; + } + + private Runnable getCallback() { + return callback; + } + + private long getScheduledTimeMs() { + return scheduledTimeMs; + } + + private TriggerKey<WK> getTriggerKey() { + return triggerKey; + } + + @Override + public int compareTo(TriggerCallbackState<WK> other) { + return Long.compare(this.scheduledTimeMs, other.scheduledTimeMs); + } + + @Override + public boolean cancel() { + LOG.trace("Cancelled a callback: {} at {} for triggerKey {}", new Object[] {callback, scheduledTimeMs, triggerKey}); + return pendingCallbacks.remove(this); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index af00553..cd3b1bc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -1,3 +1,5 @@ +// CHECKSTYLE:OFF +// Turn off checkstyle for this class because of a checkstyle bug in handling nested typed collections /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,26 +20,300 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.triggers.RepeatingTriggerImpl; +import org.apache.samza.operators.triggers.TimeTrigger; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.triggers.TriggerImpl; +import org.apache.samza.operators.triggers.TriggerImpls; +import org.apache.samza.operators.triggers.FiringType; +import org.apache.samza.operators.util.InternalInMemoryStore; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowKey; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.internal.WindowType; +import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * Implementation of a window operator that groups messages into finite windows for processing. + * + * This class implements the processing logic for various types of windows and triggers. It tracks and manages state for + * all open windows, the active triggers that correspond to each of the windows and the pending callbacks. It provides + * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can use to schedule and cancel callbacks. It + * also orchestrates the flow of messages through the various {@link TriggerImpl}s. + * + * <p> An instance of a {@link TriggerImplHandler} is created corresponding to each {@link Trigger} configured for a + * particular window. For every message added to the window, this class looks up the corresponding {@link TriggerImplHandler} + * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}. + * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet + * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A + * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The + * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream + * operators. + * + * @param <M> the type of the incoming message + * @param <WK> the type of the key in this {@link org.apache.samza.operators.MessageStream} + * @param <WV> the type of the value in the emitted window pane + * + */ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> { + private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class); + private final WindowInternal<M, WK, WV> window; + private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore<>(); + TriggerScheduler<WK> triggerScheduler ; + + // The trigger state corresponding to each {@link TriggerKey}. + private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new HashMap<>(); + private final Clock clock; - public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) { - // source, config, and context are used to initialize the window kv-store - window = spec.getWindow(); + public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> spec, Clock clock) { + this.clock = clock; + this.window = spec.getWindow(); + this.triggerScheduler= new TriggerScheduler(clock); } @Override public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + LOG.trace("Processing message envelope: {}", message); + WindowKey<WK> storeKey = getStoreKey(message); + WindowState<WV> existingState = store.get(storeKey); + WindowState<WV> newState = applyFoldFunction(existingState, message); + + LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp()); + store.put(storeKey, newState); + + if (window.getEarlyTrigger() != null) { + TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey); + + getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger()) + .onMessage(triggerKey, message, collector, coordinator); + } + + if (window.getDefaultTrigger() != null) { + TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey); + getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger()) + .onMessage(triggerKey, message, collector, coordinator); + } } -} + + @Override + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks(); + + for (TriggerKey<WK> key : keys) { + TriggerImplHandler triggerImplHandler = triggers.get(key); + if (triggerImplHandler != null) { + triggerImplHandler.onTimer(key, collector, coordinator); + } + } + + } + + /** + * Get the key to be used for lookups in the store for this message. + */ + private WindowKey<WK> getStoreKey(M message) { + Function<M, WK> keyExtractor = window.getKeyExtractor(); + WK key = null; + + if (keyExtractor != null) { + key = keyExtractor.apply(message); + } + + String paneId = null; + + if (window.getWindowType() == WindowType.TUMBLING) { + long triggerDurationMs = ((TimeTrigger<M>) window.getDefaultTrigger()).getDuration().toMillis(); + final long now = clock.currentTimeMillis(); + Long windowBoundary = now - now % triggerDurationMs; + paneId = windowBoundary.toString(); + } + + return new WindowKey<>(key, paneId); + } + + private WindowState<WV> applyFoldFunction(WindowState<WV> existingState, M message) { + WV wv; + long earliestTimestamp; + + if (existingState == null) { + LOG.trace("No existing state found for key"); + wv = window.getInitializer().get(); + earliestTimestamp = clock.currentTimeMillis(); + } else { + wv = existingState.getWindowValue(); + earliestTimestamp = existingState.getEarliestTimestamp(); + } + + WV newVal = window.getFoldLeftFunction().apply(message, wv); + WindowState<WV> newState = new WindowState(newVal, earliestTimestamp); + + return newState; + } + + private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey<WK> triggerKey, Trigger<M> trigger) { + TriggerImplHandler wrapper = triggers.get(triggerKey); + if (wrapper != null) { + LOG.trace("Returning existing trigger wrapper for {}", triggerKey); + return wrapper; + } + + LOG.trace("Creating a new trigger wrapper for {}", triggerKey); + + TriggerImpl<M, WK> triggerImpl = TriggerImpls.createTriggerImpl(trigger, clock, triggerKey); + wrapper = new TriggerImplHandler(triggerKey, triggerImpl); + triggers.put(triggerKey, wrapper); + + return wrapper; + } + + /** + * Handles trigger firings, and propagates results to downstream operators. + */ + private void onTriggerFired(TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) { + LOG.trace("Trigger key {} fired." , triggerKey); + + TriggerImplHandler wrapper = triggers.get(triggerKey); + WindowKey<WK> windowKey = triggerKey.getKey(); + WindowState<WV> state = store.get(windowKey); + + if (state == null) { + LOG.trace("No state found for triggerKey: {}", triggerKey); + return; + } + + WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state); + super.propagateResult(paneOutput, collector, coordinator); + + // Handle accumulation modes. + if (window.getAccumulationMode() == AccumulationMode.DISCARDING) { + LOG.trace("Clearing state for trigger key: {}", triggerKey); + store.put(windowKey, null); + } + + // Cancel all early triggers too when the default trigger fires. Also, clean all state for the key. + // note: We don't handle late arrivals yet, So, all arrivals are either early or on-time. + if (triggerKey.getType() == FiringType.DEFAULT) { + + LOG.trace("Default trigger fired. Canceling triggers for {}", triggerKey); + + cancelTrigger(triggerKey, true); + cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey()), true); + WindowKey<WK> key = triggerKey.getKey(); + store.delete(key); + } + + // Cancel non-repeating early triggers. All early triggers should be removed from the "triggers" map only after the + // firing of their corresponding default trigger. Removing them pre-maturely (immediately after cancellation) will + // will create a new {@link TriggerImplWrapper} instance at a future invocation of getOrCreateTriggerWrapper(). + // This would cause an already canceled trigger to fire again for the window. + + if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) { + cancelTrigger(triggerKey, false); + } + } + + /** + * Computes the pane output corresponding to a {@link TriggerKey} that fired. + */ + private WindowPane<WK, WV> computePaneOutput(TriggerKey<WK> triggerKey, WindowState<WV> state) { + WindowKey<WK> windowKey = triggerKey.getKey(); + WV windowVal = state.getWindowValue(); + + // For session windows, we will create a new window key by using the time of the first message in the window as + //the paneId. + if (window.getWindowType() == WindowType.SESSION) { + windowKey = new WindowKey<>(windowKey.getKey(), Long.toString(state.getEarliestTimestamp())); + } + + // Make a defensive copy so that we are immune to further mutations on the collection + if (windowVal instanceof Collection) { + windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal); + } + + WindowPane<WK, WV> paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType()); + LOG.trace("Emitting pane output for trigger key {}", triggerKey); + return paneOutput; + } + + /** + * Cancels the firing of the {@link TriggerImpl} identified by this {@link TriggerKey} and optionally removes it. + */ + private void cancelTrigger(TriggerKey<WK> triggerKey, boolean shouldRemove) { + TriggerImplHandler triggerImplHandler = triggers.get(triggerKey); + if (triggerImplHandler != null) { + triggerImplHandler.cancel(); + } + if (shouldRemove && triggerKey != null) { + triggers.remove(triggerKey); + } + } + + /** + * State corresponding to a created {@link TriggerImpl} instance. + */ + private class TriggerImplHandler { + // The context, and the {@link TriggerImpl} instance corresponding to this triggerKey + private final TriggerImpl<M, WK> impl; + // Guard to ensure that we don't invoke onMessage or onTimer on already cancelled triggers + private boolean isCancelled = false; + + public TriggerImplHandler(TriggerKey<WK> key, TriggerImpl<M, WK> impl) { + this.impl = impl; + } + + public void onMessage(TriggerKey<WK> triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) { + if (!isCancelled) { + LOG.trace("Forwarding callbacks for {}", message); + impl.onMessage(message, triggerScheduler); + + if (impl.shouldFire()) { + // repeating trigger can trigger multiple times, So, clear the state to allow future triggerings. + if (impl instanceof RepeatingTriggerImpl) { + ((RepeatingTriggerImpl<M, WK>) impl).clear(); + } + onTriggerFired(triggerKey, collector, coordinator); + } + } + } + + public void onTimer(TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) { + if (impl.shouldFire() && !isCancelled) { + LOG.trace("Triggering timer triggers"); + + // repeating trigger can trigger multiple times, So, clear the trigger to allow future triggerings. + if (impl instanceof RepeatingTriggerImpl) { + ((RepeatingTriggerImpl<M, WK>) impl).clear(); + } + onTriggerFired(key, collector, coordinator); + } + } + + public void cancel() { + impl.cancel(); + isCancelled = true; + } + + public boolean isRepeating() { + return this.impl instanceof RepeatingTriggerImpl; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 46417ed..6d948d7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -19,9 +19,11 @@ package org.apache.samza.operators.spec; +import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.task.TaskContext; /** @@ -54,11 +56,18 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK } @Override + public void init(Config config, TaskContext context) { + if (window.getFoldLeftFunction() != null) { + window.getFoldLeftFunction().init(config, context); + } + } + + @Override public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() { return this.outputStream; } - public WindowInternal getWindow() { + public WindowInternal<M, WK, WV> getWindow() { return window; } http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java new file mode 100644 index 0000000..a0aa384 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Implementation of an {@link AnyTrigger} + */ +public class AnyTriggerImpl<M, WK> implements TriggerImpl<M, WK> { + + private final List<Trigger<M>> triggers; + + private final List<TriggerImpl<M, WK>> triggerImpls = new ArrayList<>(); + private final Clock clock; + private boolean shouldFire = false; + + public AnyTriggerImpl(AnyTrigger<M> anyTrigger, Clock clock, TriggerKey<WK> triggerKey) { + this.triggers = anyTrigger.getTriggers(); + this.clock = clock; + for (Trigger<M> trigger : triggers) { + triggerImpls.add(TriggerImpls.createTriggerImpl(trigger, clock, triggerKey)); + } + } + + @Override + public void onMessage(M message, TriggerScheduler<WK> context) { + for (TriggerImpl<M, WK> impl : triggerImpls) { + impl.onMessage(message, context); + if (impl.shouldFire()) { + shouldFire = true; + break; + } + } + if (shouldFire) { + cancel(); + } + } + + public void cancel() { + for (Iterator<TriggerImpl<M, WK>> it = triggerImpls.iterator(); it.hasNext(); ) { + TriggerImpl<M, WK> impl = it.next(); + impl.cancel(); + it.remove(); + } + } + + @Override + public boolean shouldFire() { + for (TriggerImpl<M, WK> impl : triggerImpls) { + if (impl.shouldFire()) { + shouldFire = true; + break; + } + } + return shouldFire; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java new file mode 100644 index 0000000..ca0ba67 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +/** + * Represents a task or an operation whose execution can be cancelled. + */ +public interface Cancellable { + + /** + * Cancel the execution of this operation (if it is not scheduled for execution yet). If the operation is in progress, + * it is not interrupted / cancelled. + * + * @return the result of the cancelation + */ + public boolean cancel(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java new file mode 100644 index 0000000..da1efda --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link CountTrigger} + */ +public class CountTriggerImpl<M, WK> implements TriggerImpl<M, WK> { + private static final Logger LOG = LoggerFactory.getLogger(CountTriggerImpl.class); + + private final long triggerCount; + private final TriggerKey<WK> triggerKey; + private long currentCount; + private boolean shouldFire = false; + + public CountTriggerImpl(CountTrigger<M> triggerCount, TriggerKey<WK> triggerKey) { + this.triggerCount = triggerCount.getCount(); + this.currentCount = 0; + this.triggerKey = triggerKey; + } + + public void onMessage(M message, TriggerScheduler<WK> context) { + currentCount++; + if (currentCount == triggerCount) { + LOG.trace("count trigger fired for {}", message); + shouldFire = true; + } + } + + @Override + public void cancel() { + //no-op + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java new file mode 100644 index 0000000..39b9b40 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link RepeatingTrigger} + */ +public class RepeatingTriggerImpl<M, WK> implements TriggerImpl<M, WK> { + private static final Logger LOG = LoggerFactory.getLogger(RepeatingTriggerImpl.class); + + private final Trigger<M> repeatingTrigger; + private final Clock clock; + private final TriggerKey<WK> triggerKey; + + private TriggerImpl<M, WK> currentTriggerImpl; + + public RepeatingTriggerImpl(RepeatingTrigger<M> repeatingTrigger, Clock clock, TriggerKey<WK> key) { + this.repeatingTrigger = repeatingTrigger.getTrigger(); + this.clock = clock; + this.triggerKey = key; + this.currentTriggerImpl = TriggerImpls.createTriggerImpl(this.repeatingTrigger, clock, triggerKey); + } + + @Override + public void onMessage(M message, TriggerScheduler<WK> context) { + currentTriggerImpl.onMessage(message, context); + } + + @Override + public void cancel() { + currentTriggerImpl.cancel(); + } + + public void clear() { + LOG.trace("Clearing state for repeating trigger"); + currentTriggerImpl.cancel(); + currentTriggerImpl = TriggerImpls.createTriggerImpl(repeatingTrigger, clock, triggerKey); + } + + @Override + public boolean shouldFire() { + return currentTriggerImpl.shouldFire(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java new file mode 100644 index 0000000..32bf988 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link TimeSinceFirstMessageTrigger} + * @param <M> the type of the incoming message + */ +public class TimeSinceFirstMessageTriggerImpl<M, WK> implements TriggerImpl<M, WK> { + + private static final Logger LOG = LoggerFactory.getLogger(TimeSinceFirstMessageTriggerImpl.class); + + private final TimeSinceFirstMessageTrigger<M> trigger; + private final Clock clock; + private final TriggerKey<WK> triggerKey; + private Cancellable cancellable; + private boolean shouldFire = false; + + public TimeSinceFirstMessageTriggerImpl(TimeSinceFirstMessageTrigger<M> trigger, Clock clock, TriggerKey<WK> key) { + this.trigger = trigger; + this.clock = clock; + this.triggerKey = key; + } + + public void onMessage(M message, TriggerScheduler<WK> context) { + if (cancellable == null && !shouldFire) { + final long now = clock.currentTimeMillis(); + long triggerDurationMs = trigger.getDuration().toMillis(); + Long callbackTime = now + triggerDurationMs; + cancellable = context.scheduleCallback(() -> { + LOG.trace("Time since first message trigger fired"); + shouldFire = true; + }, callbackTime, triggerKey); + } + } + + @Override + public void cancel() { + if (cancellable != null) { + cancellable.cancel(); + } + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +}