Repository: samza
Updated Branches:
  refs/heads/master 05915bfc8 -> d399d6f3c


http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
new file mode 100644
index 0000000..8544efd
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeSinceLastMessageTrigger}
+ * @param <M> the type of the incoming message
+ */
+public class TimeSinceLastMessageTriggerImpl<M, WK> implements TriggerImpl<M, 
WK> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TimeSinceLastMessageTriggerImpl.class);
+  private final TimeSinceLastMessageTrigger<M> trigger;
+  private final long durationMs;
+  private final Clock clock;
+  private final TriggerKey<WK> triggerKey;
+  private long callbackTime = Integer.MIN_VALUE;
+  private Cancellable cancellable = null;
+  private boolean shouldFire = false;
+
+  public TimeSinceLastMessageTriggerImpl(TimeSinceLastMessageTrigger<M> 
trigger, Clock clock, TriggerKey<WK> key) {
+    this.trigger = trigger;
+    this.durationMs = trigger.getDuration().toMillis();
+    this.clock = clock;
+    this.triggerKey = key;
+  }
+
+  @Override
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    if (!shouldFire) {
+      long currTime = clock.currentTimeMillis();
+
+      if (currTime < callbackTime && cancellable != null) {
+        cancellable.cancel();
+      }
+
+      callbackTime = currTime + durationMs;
+      Runnable runnable = () -> {
+        LOG.trace("Time since last message trigger fired");
+        shouldFire = true;
+      };
+
+      cancellable = context.scheduleCallback(runnable, callbackTime, 
triggerKey);
+    }
+  }
+
+  @Override
+  public void cancel() {
+    if (cancellable != null) {
+      cancellable.cancel();
+    }
+  }
+
+  @Override
+  public boolean shouldFire() {
+    return shouldFire;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
new file mode 100644
index 0000000..2454ce9
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeTrigger}
+ */
+public class TimeTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TimeTriggerImpl.class);
+
+  private final TimeTrigger<M> trigger;
+  private final TriggerKey<WK> triggerKey;
+  private Cancellable cancellable;
+  private final Clock clock;
+  private boolean shouldFire = false;
+
+  public TimeTriggerImpl(TimeTrigger<M> trigger, Clock clock, TriggerKey<WK> 
key) {
+    this.trigger = trigger;
+    this.clock = clock;
+    this.triggerKey = key;
+  }
+
+  public void onMessage(M message, TriggerScheduler<WK> context) {
+    final long now = clock.currentTimeMillis();
+    long triggerDurationMs = trigger.getDuration().toMillis();
+    Long callbackTime = (now - now % triggerDurationMs) + triggerDurationMs;
+
+    if (cancellable == null) {
+      cancellable = context.scheduleCallback(() -> {
+          LOG.trace("Time trigger fired");
+          shouldFire = true;
+        }, callbackTime, triggerKey);
+    }
+  }
+
+  @Override
+  public void cancel() {
+    if (cancellable != null) {
+      cancellable.cancel();
+    }
+  }
+
+  @Override
+  public boolean shouldFire() {
+    return shouldFire;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
new file mode 100644
index 0000000..705cab7
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+
+import org.apache.samza.operators.impl.TriggerScheduler;
+
+/**
+ * Implementation class for a {@link Trigger}. A {@link TriggerImpl} is used 
with a
+ * which is invoked when the trigger fires.
+ *
+ * <p> When messages arrive in the {@code WindowOperatorImpl}, they are 
assigned to one or more windows. An
+ * instance of a {@link TriggerImpl} is created corresponding to each {@link 
Trigger} configured for a window. For every
+ * message added to the window, the {@code WindowOperatorImpl} invokes the 
{@link #onMessage} on its corresponding
+ * {@link TriggerImpl}s. A {@link TriggerImpl} instance is scoped to a window 
and its firing determines when results for
+ * its window are emitted.
+ *
+ * {@link TriggerImpl}s can use the {@link TriggerScheduler} to schedule and 
cancel callbacks (for example, implementations
+ * of time-based triggers).
+ *
+ * <p> State management: The state maintained by {@link TriggerImpl}s is not 
durable across re-starts and is transient.
+ * New instances of {@link TriggerImpl} are created on a re-start.
+ *
+ */
+public interface TriggerImpl<M, WK> {
+
+  /**
+   * Invoked when a message is added to the window corresponding to this 
{@link TriggerImpl}.
+   * @param message the incoming message
+   * @param context the {@link TriggerScheduler} to schedule and cancel 
callbacks
+   */
+  public void onMessage(M message, TriggerScheduler<WK> context);
+
+  /**
+   * Returns {@code true} if the current state of the trigger indicates that 
its condition
+   * is satisfied and it is ready to fire.
+   * @return if this trigger should fire.
+   */
+  public boolean shouldFire();
+
+  /**
+   * Invoked when the execution of this {@link TriggerImpl} is canceled by an 
up-stream {@link TriggerImpl}.
+   *
+   * No calls to {@link #onMessage(Object, TriggerScheduler)} or {@link 
#shouldFire()} will be invoked
+   * after this invocation.
+   */
+  public void cancel();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
new file mode 100644
index 0000000..f64a1db
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.util.Clock;
+
+/**
+ * Factory methods for instantiating {@link TriggerImpl}s from individual 
{@link Trigger}s.
+ */
+public class TriggerImpls {
+
+  public static <M, WK> TriggerImpl<M, WK> createTriggerImpl(Trigger<M> 
trigger, Clock clock, TriggerKey<WK> triggerKey) {
+
+    if (trigger == null) {
+      throw new IllegalArgumentException("Trigger must not be null");
+    }
+
+    if (trigger instanceof CountTrigger) {
+      return new CountTriggerImpl<>((CountTrigger<M>) trigger, triggerKey);
+    } else if (trigger instanceof RepeatingTrigger) {
+      return new RepeatingTriggerImpl<>((RepeatingTrigger<M>) trigger, clock, 
triggerKey);
+    } else if (trigger instanceof AnyTrigger) {
+      return new AnyTriggerImpl<>((AnyTrigger<M>) trigger, clock, triggerKey);
+    } else if (trigger instanceof TimeSinceLastMessageTrigger) {
+      return new 
TimeSinceLastMessageTriggerImpl<>((TimeSinceLastMessageTrigger<M>) trigger, 
clock, triggerKey);
+    } else if (trigger instanceof TimeTrigger) {
+      return new TimeTriggerImpl((TimeTrigger<M>) trigger, clock, triggerKey);
+    } else if (trigger instanceof TimeSinceFirstMessageTrigger) {
+      return new 
TimeSinceFirstMessageTriggerImpl<>((TimeSinceFirstMessageTrigger<M>) trigger, 
clock, triggerKey);
+    }
+
+    throw new SamzaException("No implementation class defined for the trigger  
" + trigger.getClass().getCanonicalName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
 
b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index e5dab80..b8672c6 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.operators.util;
 
 import org.apache.samza.storage.kv.Entry;
@@ -30,24 +31,28 @@ import java.util.Map;
 
 /**
  * Implements a {@link KeyValueStore} using an in-memory Java Map.
- * @param <K>  the type of the key in the store
- * @param <V>  the type of the value in the store
+ * @param <K>  the type of key
+ * @param <V>  the type of value
+ *
+ * TODO: This class is a stop-gap until we implement persistent store creation 
from TaskContext.
  *
- * TODO HIGH prateekm: Remove when we switch to an persistent implementation 
for KeyValueStore API.
  */
 public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
 
-  final Map<K, V> map = new LinkedHashMap<>();
+  private final Map<K, V> map = new LinkedHashMap<>();
 
   @Override
   public V get(K key) {
+    if (key == null) {
+      throw new NullPointerException("Null key provided");
+    }
     return map.get(key);
   }
 
   @Override
   public Map<K, V> getAll(List<K> keys) {
     Map<K, V> values = new HashMap<>();
-    for (K key: keys) {
+    for (K key : keys) {
       values.put(key, map.get(key));
     }
     return values;
@@ -55,18 +60,24 @@ public class InternalInMemoryStore<K, V> implements 
KeyValueStore<K, V> {
 
   @Override
   public void put(K key, V value) {
+    if (key == null) {
+      throw new NullPointerException("Null key provided");
+    }
     map.put(key, value);
   }
 
   @Override
   public void putAll(List<Entry<K, V>> entries) {
-    for (Entry<K, V> entry: entries) {
+    for (Entry<K, V> entry : entries) {
       put(entry.getKey(), entry.getValue());
     }
   }
 
   @Override
   public void delete(K key) {
+    if (key == null) {
+      throw new NullPointerException("Null key provided");
+    }
     map.remove(key);
   }
 
@@ -119,4 +130,4 @@ public class InternalInMemoryStore<K, V> implements 
KeyValueStore<K, V> {
   public void flush() {
     //not applicable
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 9ec8e5a..d4224c3 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -20,6 +20,8 @@ package org.apache.samza.task;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.MessageStreamImpl;
@@ -30,6 +32,9 @@ import org.apache.samza.operators.impl.OperatorGraph;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
 
 
 /**
@@ -65,16 +70,27 @@ public final class StreamOperatorTask implements 
StreamTask, InitableTask, Windo
   /**
    * A mapping from each {@link SystemStream} to the root node of its operator 
chain DAG.
    */
-  private final OperatorGraph operatorGraph = new OperatorGraph();
+  private final OperatorGraph operatorGraph;
 
   private final StreamApplication graphBuilder;
 
   private final ApplicationRunner runner;
 
+  private final Clock clock;
+
   private ContextManager contextManager;
 
+  private Set<SystemStreamPartition> systemStreamPartitions;
+
   public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner 
runner) {
+    this(graphBuilder, SystemClock.instance(), runner);
+  }
+
+  // purely for testing.
+  public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, 
ApplicationRunner runner) {
     this.graphBuilder = graphBuilder;
+    this.operatorGraph = new OperatorGraph(clock);
+    this.clock = clock;
     this.runner = runner;
   }
 
@@ -85,9 +101,10 @@ public final class StreamOperatorTask implements 
StreamTask, InitableTask, Windo
     this.graphBuilder.init(streamGraph, config);
     // get the context manager of the {@link StreamGraph} and initialize the 
task-specific context
     this.contextManager = streamGraph.getContextManager();
+    this.systemStreamPartitions = context.getSystemStreamPartitions();
 
     Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
-    context.getSystemStreamPartitions().forEach(ssp -> {
+    systemStreamPartitions.forEach(ssp -> {
         if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
           // create mapping from the physical input {@link SystemStream} to 
the logic {@link MessageStream}
           inputBySystemStream.putIfAbsent(ssp.getSystemStream(), 
streamGraph.getInputStream(ssp.getSystemStream()));
@@ -103,8 +120,11 @@ public final class StreamOperatorTask implements 
StreamTask, InitableTask, Windo
   }
 
   @Override
-  public final void window(MessageCollector collector, TaskCoordinator 
coordinator) throws Exception {
-    this.operatorGraph.getAll().forEach(r -> r.onTimer(collector, 
coordinator));
+  public final void window(MessageCollector collector, TaskCoordinator 
coordinator)  {
+    systemStreamPartitions.forEach(ssp -> {
+        this.operatorGraph.get(ssp.getSystemStream())
+          .onTick(collector, coordinator);
+      });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java 
b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 543716a..6edf048 100644
--- 
a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ 
b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -33,6 +33,7 @@ import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
+import java.util.function.Supplier;
 
 
 /**
@@ -44,9 +45,10 @@ public class PageViewCounterExample implements 
StreamApplication {
 
     MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, 
new StringSerde("UTF-8"), new JsonSerde<>());
     OutputStream<MyStreamOutput> pageViewPerMemberCounters = 
graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+    Supplier<Integer> initialValue = () -> 0;
 
     pageViewEvents.
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m 
-> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m 
-> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 
1).
             setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
             setAccumulationMode(AccumulationMode.DISCARDING)).
         map(MyStreamOutput::new).

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java 
b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 729b26f..e222fe4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -31,6 +31,7 @@ import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
+import java.util.function.Supplier;
 
 
 /**
@@ -67,11 +68,12 @@ public class RepartitionExample implements 
StreamApplication {
 
     MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, 
new StringSerde("UTF-8"), new JsonSerde<>());
     OutputStream<MyStreamOutput> pageViewPerMemberCounters = 
graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+    Supplier<Integer> initialValue = () -> 0;
 
     pageViewEvents.
         partitionBy(m -> m.getMessage().memberId).
         window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
-            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> 
c + 1)).
+            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), 
initialValue, (m, c) -> c + 1)).
         map(MyStreamOutput::new).
         sendTo(pageViewPerMemberCounters);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java 
b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index d988270..1c30a21 100644
--- 
a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ 
b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -21,13 +21,15 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.system.StreamSpec;
@@ -65,18 +67,20 @@ public class TestBroadcastExample extends TestExampleBase {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) 
-> c + 1;
+    FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c 
+ 1;
+    Supplier<Integer> initialValue = () -> 0;
+
     inputs.keySet().forEach(entry -> {
         MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, 
Object, InputMessageEnvelope>createInStream(
                 new StreamSpec(entry.getSystem() + "-" + entry.getStream(), 
entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
 
-        
inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100),
 sumAggregator)
+        
inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100),
 initialValue, sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
 
-        
inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100),
 sumAggregator)
+        
inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100),
 initialValue, sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
 
-        
inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100),
 sumAggregator)
+        
inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100),
 initialValue, sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
 
       });

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java 
b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index 6896da5..c88df7c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -25,13 +25,14 @@ import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.time.Duration;
-import java.util.function.BiFunction;
 import java.util.Set;
+import java.util.function.Supplier;
 
 
 /**
@@ -57,11 +58,12 @@ public class TestWindowExample extends TestExampleBase {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) 
-> c + 1;
+    FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c 
+ 1;
+    Supplier<Integer> initialValue = () -> 0;
     inputs.keySet().forEach(source -> graph.<Object, Object, 
InputMessageEnvelope>createInStream(
             new StreamSpec(source.getSystem() + "-" + source.getStream(), 
source.getStream(), source.getSystem()), null, null).
         map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), 
(MessageType) m1.getMessage(), m1.getOffset(),
-            
m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200),
 maxAggregator)));
+            
m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200),
 initialValue, maxAggregator)));
 
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 361972e..5722dbd 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -50,7 +50,12 @@ public class TestOperatorImpl {
         TestOperatorImpl.this.curCollector = collector;
         TestOperatorImpl.this.curCoordinator = coordinator;
       }
-    };
+      @Override
+      public void onTimer(MessageCollector collector, TaskCoordinator 
coordinator) {
+
+      }
+
+      };
     // verify registerNextOperator() added the mockSub and propagateResult() 
invoked the mockSub.onNext()
     OperatorImpl mockSub = mock(OperatorImpl.class);
     opImpl.registerNextOperator(mockSub);

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 088cb00..31f6f4a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -38,6 +38,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.task.TaskContext;
 import org.junit.Before;
 import org.junit.Test;
@@ -77,7 +78,7 @@ public class TestOperatorImpls {
   public void testCreateOperator() throws NoSuchFieldException, 
IllegalAccessException, InvocationTargetException {
     // get window operator
     WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
-    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new 
WindowInternal<>(null, null, null, null);
+    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new 
WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
     when(mockWnd.getWindow()).thenReturn(windowInternal);
     MessageStreamImpl<TestMessageEnvelope> mockStream = 
mock(MessageStreamImpl.class);
     Config mockConfig = mock(Config.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ae3d151..ec1d74c 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -25,18 +25,20 @@ import org.apache.samza.operators.TestMessageStreamImplUtil;
 import org.apache.samza.operators.TestOutputMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -69,17 +71,17 @@ public class TestOperatorSpecs {
   @Test
   public void testGetWindowOperator() throws Exception {
     Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
-    BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c 
+ 1;
-
+    FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 
1;
+    Supplier<Integer> initialValue = () -> 0;
     //instantiate a window using reflection
-    WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, 
null);
+    WindowInternal window = new WindowInternal(null, initialValue, aggregator, 
keyExtractor, null, WindowType.TUMBLING);
 
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = 
mock(MessageStreamImpl.class);
     WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, 
Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
     assertEquals(spec.getWindow(), window);
     assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
-    assertEquals(spec.getWindow().getFoldFunction(), aggregator);
+    assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java 
b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
new file mode 100644
index 0000000..674a8f1
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.util.Clock;
+
+import java.time.Duration;
+
+/**
+ * An implementation of {@link Clock} that allows to advance the time by an 
arbitrary duration.
+ * Used for testing.
+ */
+public class TestClock implements Clock {
+
+  long currentTime = 1;
+
+  public void advanceTime(Duration duration) {
+    currentTime += duration.toMillis();
+  }
+
+  public void advanceTime(long millis) {
+    currentTime += millis;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
 
b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
new file mode 100644
index 0000000..0d720dd
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestWindowOperator {
+  private final MessageCollector messageCollector = 
mock(MessageCollector.class);
+  private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+  private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, 
Integer>>>> windowPanes = new ArrayList<>();
+  private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 
2, 3);
+  private Config config;
+  private TaskContext taskContext;
+  private ApplicationRunner runner;
+
+  @Before
+  public void setup() throws Exception {
+    windowPanes.clear();
+
+    config = mock(Config.class);
+    taskContext = mock(TaskContext.class);
+    runner = mock(ApplicationRunner.class);
+    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+
+  }
+
+  @Test
+  public void testTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, 
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), 
messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 5);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(2).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(3).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(4).getMessage()).size(), 1);
+  }
+
+  @Test
+  public void testTumblingWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, 
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), 
messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 7);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(2).getMessage()).size(), 4);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(3).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testSessionWindowsDiscardingMode() throws Exception {
+    StreamApplication sgb = new 
KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, 
Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, 
taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(1).getMessage()).size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(2).getMessage()).size(), 2);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(3).getMessage()).size(), 2);
+
+  }
+
+  @Test
+  public void testSessionWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new 
KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, 
Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, 
taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(1).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testCancelationOfOnceTrigger() throws Exception {
+    StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, 
Duration.ofSeconds(1), Triggers.count(2));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, 
taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), 
FiringType.DEFAULT);
+
+    task.process(new IntegerMessageEnvelope(3, 6), messageCollector, 
taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), 
FiringType.DEFAULT);
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(2).getMessage()).size(), 1);
+
+  }
+
+  @Test
+  public void testCancelationOfAnyTrigger() throws Exception {
+    StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, 
Duration.ofSeconds(1),
+        Triggers.any(Triggers.count(2), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, 
taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the triggering of the inner 
timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+
+    //assert that the triggering of the count trigger cancelled the inner 
timeSinceFirstMessage trigger
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), 
FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) 
windowPanes.get(1).getMessage()).size(), 5);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+
+    //advance timer by 500 millis to enable the inner timeSinceFirstMessage 
trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+    //advance timer by > 500 millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(900));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getFiringType(), 
FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+  }
+
+  @Test
+  public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+    StreamApplication sgb = new 
KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, 
Duration.ofSeconds(1),
+        Triggers.repeat(Triggers.any(Triggers.count(2), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, 
taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, 
taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the potential triggering of the inner 
timeSinceFirstMessage trigger
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
+    testClock.advanceTime(Duration.ofMillis(500));
+    //assert that the triggering of the count trigger cancelled the inner 
timeSinceFirstMessage trigger
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, 
taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, 
taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 3);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, 
taskCoordinator);
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 4);
+  }
+
+  private class KeyedTumblingWindowStreamApplication implements 
StreamApplication {
+
+    private final StreamSpec streamSpec = new StreamSpec("integer-stream", 
"integers", "kafka");
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+
+    KeyedTumblingWindowStreamApplication(AccumulationMode mode, Duration 
timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
+      this.mode = mode;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = 
graph.createInStream(streamSpec, null, null);
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> 
m.getKey();
+      inStream
+        .map(m -> m)
+        .window(Windows.keyedTumblingWindow(keyFn, 
duration).setEarlyTrigger(earlyTrigger)
+          .setAccumulationMode(mode))
+        .map(m -> {
+            windowPanes.add(m);
+            return m;
+          });
+    }
+  }
+
+  private class KeyedSessionWindowStreamApplication implements 
StreamApplication {
+
+    private final StreamSpec streamSpec = new StreamSpec("integer-stream", 
"integers", "kafka");
+    private final AccumulationMode mode;
+    private final Duration duration;
+
+    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration 
duration) {
+      this.mode = mode;
+      this.duration = duration;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = 
graph.createInStream(streamSpec, null, null);
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> 
m.getKey();
+
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedSessionWindow(keyFn, duration)
+              .setAccumulationMode(mode))
+          .map(m -> {
+              windowPanes.add(m);
+              return m;
+            });
+    }
+  }
+
+  private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
+    IntegerMessageEnvelope(int key, int msg) {
+      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), 
"1", key, msg);
+    }
+  }
+}

Reply via email to