Specification of various Window and Trigger APIs in Samza

- Defined APIs for specifying different types of windows - sessions, tumbling, 
global and keyed variants.
- Defined APIs for specifying early and late triggers for a window.
- Standardized all above Window types to be expressed as a combination of 
default, early and late triggers.
- Defined classes for different types of trigger specifications.
- Hide the WindowState class from programmers and move it from samza-api to 
samza-operator. We can choose to add it later if need be.
- Removed some implementation classes in Window and Trigger. We can revisit 
them later when we implement Windows.
- New API for specifying Time durations meaningfully in Samza.
- Unit tests for most of the above changes.
- Misc. Documentation, readability related changes to public APIs.

Author: vjagadish1989 <jvenk...@linkedin.com>

Reviewers: Yi Pan <nickpa...@gmail.com>, Prateek Maheshwari 
<pmahe...@linkedin.com>

Closes #30 from vjagadish/samza-operator-v3


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6dc33a85
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6dc33a85
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6dc33a85

Branch: refs/heads/master
Commit: 6dc33a8504e3e2335cffe3317be1c0ccb5448458
Parents: f01b286
Author: vjagadish1989 <jvenk...@linkedin.com>
Authored: Mon Jan 30 13:50:16 2017 -0800
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Mon Jan 30 13:50:16 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/operators/MessageStream.java   |  46 ++-
 .../samza/operators/triggers/AnyTrigger.java    |  39 ++
 .../samza/operators/triggers/CountTrigger.java  |  38 ++
 .../triggers/DurationCharacteristic.java        |  26 ++
 .../operators/triggers/RepeatingTrigger.java    |  34 ++
 .../triggers/TimeSinceFirstMessageTrigger.java  |  46 +++
 .../triggers/TimeSinceLastMessageTrigger.java   |  44 +++
 .../samza/operators/triggers/TimeTrigger.java   |  44 +++
 .../samza/operators/triggers/Trigger.java       |  34 ++
 .../samza/operators/triggers/Triggers.java      | 108 ++++++
 .../operators/windows/AccumulationMode.java     |  34 ++
 .../samza/operators/windows/SessionWindow.java  | 102 -----
 .../samza/operators/windows/StoreFunctions.java |  67 ----
 .../apache/samza/operators/windows/Trigger.java |  94 -----
 .../samza/operators/windows/TriggerBuilder.java | 320 ----------------
 .../apache/samza/operators/windows/Window.java  |  90 ++++-
 .../samza/operators/windows/WindowFn.java       |  59 ---
 .../samza/operators/windows/WindowKey.java      |  46 +++
 .../samza/operators/windows/WindowOutput.java   |  51 ---
 .../samza/operators/windows/WindowPane.java     |  57 +++
 .../samza/operators/windows/WindowState.java    |  85 -----
 .../apache/samza/operators/windows/Windows.java | 369 ++++++++++++++++---
 .../windows/internal/WindowInternal.java        | 110 ++++++
 .../samza/operators/windows/TestTrigger.java    |  68 ----
 .../operators/windows/TestTriggerBuilder.java   | 226 ------------
 .../operators/windows/TestWindowOutput.java     |   5 +-
 .../samza/operators/windows/TestWindows.java    | 109 ------
 .../org/apache/samza/task/AsyncRunLoop.java     |   4 +-
 .../samza/operators/MessageStreamImpl.java      |  26 +-
 .../apache/samza/operators/StateStoreImpl.java  |  56 ---
 .../samza/operators/impl/OperatorImpls.java     |   5 +-
 .../impl/SessionWindowOperatorImpl.java         |  67 ----
 .../operators/impl/WindowOperatorImpl.java      |  40 ++
 .../samza/operators/spec/OperatorSpecs.java     |  31 +-
 .../operators/spec/PartialJoinOperatorSpec.java |  24 --
 .../operators/spec/WindowOperatorSpec.java      |  92 +----
 .../samza/operators/spec/WindowState.java       |  85 +++++
 .../apache/samza/operators/BroadcastTask.java   |  35 +-
 .../samza/operators/TestMessageStreamImpl.java  |  18 -
 .../samza/operators/TestStateStoreImpl.java     |  72 ----
 .../org/apache/samza/operators/WindowTask.java  |  17 +-
 .../samza/operators/impl/TestOperatorImpls.java |  11 +-
 .../operators/impl/TestSessionWindowImpl.java   | 111 ------
 .../samza/operators/spec/TestOperatorSpecs.java |  50 +--
 44 files changed, 1294 insertions(+), 1801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index d18536b..6a2f95b 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -26,8 +26,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.Window;
-import org.apache.samza.operators.windows.WindowOutput;
-import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.operators.windows.WindowPane;
 
 import java.util.Collection;
 
@@ -46,8 +45,8 @@ public interface MessageStream<M extends MessageEnvelope> {
    * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s 
in this {@link MessageStream} and returns the
    * transformed {@link MessageStream}.
    *
-   * @param mapFn  the function to transform a {@link MessageEnvelope} to 
another {@link MessageEnvelope}
-   * @param <TM>  the type of {@link MessageEnvelope}s in the transformed 
{@link MessageStream}
+   * @param mapFn the function to transform a {@link MessageEnvelope} to 
another {@link MessageEnvelope}
+   * @param <TM> the type of {@link MessageEnvelope}s in the transformed 
{@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
   <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn);
@@ -56,8 +55,8 @@ public interface MessageStream<M extends MessageEnvelope> {
    * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link 
MessageEnvelope} in this {@link MessageStream}
    * to n {@link MessageEnvelope}s in the transformed {@link MessageStream}
    *
-   * @param flatMapFn  the function to transform a {@link MessageEnvelope} to 
zero or more {@link MessageEnvelope}s
-   * @param <TM>  the type of {@link MessageEnvelope}s in the transformed 
{@link MessageStream}
+   * @param flatMapFn the function to transform a {@link MessageEnvelope} to 
zero or more {@link MessageEnvelope}s
+   * @param <TM> the type of {@link MessageEnvelope}s in the transformed 
{@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
   <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, 
TM> flatMapFn);
@@ -69,7 +68,7 @@ public interface MessageStream<M extends MessageEnvelope> {
    * The {@link FilterFunction} is a predicate which determines whether a 
{@link MessageEnvelope} in this {@link MessageStream}
    * should be retained in the transformed {@link MessageStream}.
    *
-   * @param filterFn  the predicate to filter {@link MessageEnvelope}s from 
this {@link MessageStream}
+   * @param filterFn the predicate to filter {@link MessageEnvelope}s from 
this {@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
   MessageStream<M> filter(FilterFunction<M> filterFn);
@@ -78,38 +77,37 @@ public interface MessageStream<M extends MessageEnvelope> {
    * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to 
an output
    * {@link org.apache.samza.system.SystemStream} using the provided {@link 
SinkFunction}.
    *
-   * @param sinkFn  the function to send {@link MessageEnvelope}s in this 
stream to output systems
+   * @param sinkFn the function to send {@link MessageEnvelope}s in this 
stream to output systems
    */
   void sink(SinkFunction<M> sinkFn);
 
   /**
-   * Groups the {@link MessageEnvelope}s in this {@link MessageStream} 
according to the provided {@link Window} semantics
+   * Groups and processes the {@link MessageEnvelope}s in this {@link 
MessageStream} according to the provided {@link Window}
    * (e.g. tumbling, sliding or session windows) and returns the transformed 
{@link MessageStream} of
-   * {@link WindowOutput}s.
+   * {@link WindowPane}s.
    * <p>
    * Use the {@link org.apache.samza.operators.windows.Windows} helper methods 
to create the appropriate windows.
    *
-   * @param window  the {@link Window} to group and process {@link 
MessageEnvelope}s from this {@link MessageStream}
-   * @param <WK>  the type of key in the {@link WindowOutput} from the {@link 
Window}
-   * @param <WV>  the type of value in the {@link WindowOutput} from the 
{@link Window}
-   * @param <WS>  the type of window state kept in the {@link Window}
-   * @param <WM>  the type of {@link WindowOutput} in the transformed {@link 
MessageStream}
-   * @return  the transformed {@link MessageStream}
+   * @param window the window to group and process {@link MessageEnvelope}s 
from this {@link MessageStream}
+   * @param <K> the type of key in the {@link MessageEnvelope} in this {@link 
MessageStream}. If a key is specified,
+   *            panes are emitted per-key.
+   * @param <WV> the type of value in the {@link WindowPane} in the 
transformed {@link MessageStream}
+   * @param <WM> the type of {@link WindowPane} in the transformed {@link 
MessageStream}
+   * @return the transformed {@link MessageStream}
    */
-  <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> 
MessageStream<WM> window(
-      Window<M, WK, WV, WM> window);
+  <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(Window<M, K, 
WV, WM> window);
 
   /**
    * Joins this {@link MessageStream} with another {@link MessageStream} using 
the provided pairwise {@link JoinFunction}.
    * <p>
    * We currently only support 2-way joins.
    *
-   * @param otherStream  the other {@link MessageStream} to be joined with
-   * @param joinFn  the function to join {@link MessageEnvelope}s from this 
and the other {@link MessageStream}
-   * @param <K>  the type of join key
-   * @param <OM>  the type of {@link MessageEnvelope}s in the other stream
-   * @param <RM>  the type of {@link MessageEnvelope}s resulting from the 
{@code joinFn}
-   * @return  the joined {@link MessageStream}
+   * @param otherStream the other {@link MessageStream} to be joined with
+   * @param joinFn the function to join {@link MessageEnvelope}s from this and 
the other {@link MessageStream}
+   * @param <K> the type of join key
+   * @param <OM> the type of {@link MessageEnvelope}s in the other stream
+   * @param <RM> the type of {@link MessageEnvelope}s resulting from the 
{@code joinFn}
+   * @return the joined {@link MessageStream}
    */
   <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> 
MessageStream<RM> join(MessageStream<OM> otherStream,
       JoinFunction<M, OM, RM> joinFn);

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
new file mode 100644
index 0000000..3ca4e9a
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import java.util.List;
+
+/**
+ * A {@link Trigger} fires as soon as any of its individual triggers has fired.
+ */
+public class AnyTrigger<M extends MessageEnvelope> implements Trigger {
+
+  private final List<Trigger> triggers;
+
+  AnyTrigger(List<Trigger> triggers) {
+    this.triggers = triggers;
+  }
+
+  public List<Trigger> getTriggers() {
+    return triggers;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
new file mode 100644
index 0000000..ba14928
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+/**
+ * A {@link Trigger} that fires when the number of messages in the {@link 
org.apache.samza.operators.windows.WindowPane}
+ * reaches the specified count.
+ */
+public class CountTrigger<M extends MessageEnvelope> implements Trigger {
+
+  private final long count;
+
+  CountTrigger(long count) {
+    this.count = count;
+  }
+
+  public long getCount() {
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java
new file mode 100644
index 0000000..36dab72
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+/**
+ * Indicates whether the associated time duration is in event time or 
processing time.
+ */
+public enum DurationCharacteristic {
+  PROCESSING_TIME, EVENT_TIME
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
new file mode 100644
index 0000000..ae9564d
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+/**
+ * A {@link Trigger} that repeats its underlying trigger forever.
+ */
+class RepeatingTrigger<M extends MessageEnvelope> implements Trigger<M> {
+
+  private final Trigger<M> trigger;
+
+  RepeatingTrigger(Trigger<M> trigger) {
+    this.trigger = trigger;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
new file mode 100644
index 0000000..13fc3cd
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.time.Duration;
+
+/*
+ * A {@link Trigger} that fires after the specified duration has passed since 
the first {@link MessageEnvelope} in
+ * the window pane.
+ */
+public class TimeSinceFirstMessageTrigger<M extends MessageEnvelope> 
implements Trigger {
+
+  private final Duration duration;
+  private final DurationCharacteristic characteristic = 
DurationCharacteristic.PROCESSING_TIME;
+
+  TimeSinceFirstMessageTrigger(Duration duration) {
+    this.duration = duration;
+  }
+
+  public Duration getDuration() {
+    return duration;
+  }
+
+  public DurationCharacteristic getCharacteristic() {
+    return characteristic;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
new file mode 100644
index 0000000..0150d86
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.time.Duration;
+
+/*
+ * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s 
in the window pane for the specified duration.
+ */
+public class TimeSinceLastMessageTrigger<M extends MessageEnvelope> implements 
Trigger {
+
+  private final Duration duration;
+  private final DurationCharacteristic characteristic = 
DurationCharacteristic.PROCESSING_TIME;
+
+  TimeSinceLastMessageTrigger(Duration duration) {
+    this.duration = duration;
+  }
+
+  public Duration getDuration() {
+    return duration;
+  }
+
+  public DurationCharacteristic getCharacteristic() {
+    return characteristic;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
new file mode 100644
index 0000000..ed7fef7
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.time.Duration;
+
+/*
+ * A {@link Trigger} that fires after the specified duration in processing 
time.
+ */
+public class TimeTrigger<M extends MessageEnvelope> implements Trigger {
+
+  private final Duration duration;
+  private final DurationCharacteristic characteristic = 
DurationCharacteristic.PROCESSING_TIME;
+
+  public TimeTrigger(Duration duration) {
+    this.duration = duration;
+  }
+
+  public Duration getDuration() {
+    return duration;
+  }
+
+  public DurationCharacteristic getCharacteristic() {
+    return characteristic;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
new file mode 100644
index 0000000..6dc4f43
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+/**
+ * Marker interface for all triggers. The firing of a trigger indicates the 
completion of a window pane.
+ *
+ * <p> Use the {@link Triggers} APIs to create a {@link Trigger}.
+ *
+ * @param <M> the type of the incoming {@link MessageEnvelope}
+ */
+public interface Trigger<M extends MessageEnvelope> {
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
new file mode 100644
index 0000000..f27cfd8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * API for creating {@link Trigger} instances to be used with a {@link 
org.apache.samza.operators.windows.Window}.
+ *
+ * <p> The below example groups an input into tumbling windows of 10s and 
emits early results periodically every 4s in
+ * processing time, or for every 50 messages. It also specifies that window 
results are accumulating.
+ *
+ * <pre> {@code
+ *   MessageStream<> windowedStream = 
stream.window(Windows.tumblingWindow(Duration.of(10, TimeUnit.SECONDS))
+ *     .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), 
Triggers.timeSinceFirstMessage(Duration.of(4, TimeUnit.SECONDS))))))
+ *     .accumulateFiredPanes());
+ * }</pre>
+ *
+ * @param <M> the type of input {@link MessageEnvelope}s in the {@link 
org.apache.samza.operators.MessageStream}
+ */
+@InterfaceStability.Unstable
+public final class Triggers<M extends MessageEnvelope> {
+
+  private Triggers() { }
+
+  /**
+   * Creates a {@link Trigger} that fires when the number of {@link 
MessageEnvelope}s in the pane
+   * reaches the specified count.
+   *
+   * @param count the number of {@link MessageEnvelope}s to fire the trigger 
after
+   * @return the created trigger
+   */
+  public static Trigger count(long count) {
+    return new CountTrigger(count);
+  }
+
+  /**
+   * Creates a trigger that fires after the specified duration has passed 
since the first {@link MessageEnvelope} in
+   * the pane.
+   *
+   * @param duration the duration since the first element
+   * @return the created trigger
+   */
+  public static Trigger timeSinceFirstMessage(Duration duration) {
+    return new TimeSinceFirstMessageTrigger(duration);
+  }
+
+  /**
+   * Creates a trigger that fires when there is no new {@link MessageEnvelope} 
for the specified duration in the pane.
+   *
+   * @param duration the duration since the last element
+   * @return the created trigger
+   */
+  public static Trigger timeSinceLastMessage(Duration duration) {
+    return new TimeSinceLastMessageTrigger(duration);
+  }
+
+  /**
+   * Creates a trigger that fires when any of the provided triggers fire.
+   *
+   * @param <M> the type of input {@link MessageEnvelope} in the window
+   * @param triggers the individual triggers
+   * @return the created trigger
+   */
+  public static <M extends MessageEnvelope> Trigger any(Trigger<M>... 
triggers) {
+    List<Trigger> triggerList = new ArrayList<>();
+    for (Trigger trigger : triggers) {
+      triggerList.add(trigger);
+    }
+    return new AnyTrigger(Collections.unmodifiableList(triggerList));
+  }
+
+  /**
+   * Repeats the provided trigger forever.
+   *
+   * <p>Creating a {@link RepeatingTrigger} from an {@link AnyTrigger} is 
equivalent to creating an {@link AnyTrigger} from
+   * its individual {@link RepeatingTrigger}s.
+   *
+   * @param <M> the type of input {@link MessageEnvelope} in the window
+   * @param trigger the individual trigger to repeat
+   * @return the created trigger
+   */
+  public static <M extends MessageEnvelope> Trigger repeat(Trigger<M> trigger) 
{
+    return new RepeatingTrigger<>(trigger);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java
 
b/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java
new file mode 100644
index 0000000..f8c435c
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+/**
+ * Specifies how a {@link Window} should process its previously emitted {@link 
WindowPane}s.
+ *
+ * <p> There are two types of {@link AccumulationMode}s:
+ * <ul>
+ *   <li> ACCUMULATING: Specifies that window panes should include all 
messages collected for the window (key) so far, even if they were
+ * included in previously emitted window panes.
+ *   <li> DISCARDING: Specifies that window panes should only include messages 
collected for this window (key) since the last emitted
+ * window pane.
+ * </ul>
+ */
+public enum AccumulationMode {
+  ACCUMULATING, DISCARDING
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java
deleted file mode 100644
index 287025c..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-/**
- * This class defines a session window function class
- *
- * @param <M>  the type of input {@link MessageEnvelope}
- * @param <WK>  the type of session key in the session window
- * @param <WV>  the type of output value in each session window
- */
-public class SessionWindow<M extends MessageEnvelope, WK, WV> implements 
Window<M, WK, WV, WindowOutput<WK, WV>> {
-
-  /**
-   * Constructor. Made private s.t. it can only be instantiated via the static 
API methods in {@link Windows}
-   *
-   * @param sessionKeyFunction  function to get the session key from the input 
{@link MessageEnvelope}
-   * @param aggregator  function to calculate the output value based on the 
input {@link MessageEnvelope} and current output value
-   */
-  SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> 
aggregator) {
-    this.wndKeyFunction = sessionKeyFunction;
-    this.aggregator = aggregator;
-  }
-
-  /**
-   * function to calculate the window key from input {@link MessageEnvelope}
-   */
-  private final Function<M, WK> wndKeyFunction;
-
-  /**
-   * function to calculate the output value from the input {@link 
MessageEnvelope} and the current output value
-   */
-  private final BiFunction<M, WV, WV> aggregator;
-
-  /**
-   * trigger condition that determines when to send the {@link WindowOutput}
-   */
-  private Trigger<M, WindowState<WV>> trigger = null;
-
-  //TODO: need to create a set of {@link StoreFunctions} that is default to 
input {@link MessageEnvelope} type for {@link Window}
-  private StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null;
-
-  /**
-   * Public API methods start here
-   */
-
-  /**
-   * Public API method to define the watermark trigger for the window operator
-   *
-   * @param wndTrigger {@link Trigger} function defines the watermark trigger 
for this {@link SessionWindow}
-   * @return The window operator w/ the defined watermark trigger
-   */
-  @Override
-  public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, 
WV> wndTrigger) {
-    this.trigger = wndTrigger.build();
-    return this;
-  }
-
-  private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> 
getTransformFunc() {
-    // TODO: actual implementation of the main session window logic, based on 
the wndKeyFunction, aggregator, and triggers;
-    return null;
-  }
-
-  public WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> 
getInternalWindowFn() {
-    return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() {
-
-      @Override public BiFunction<M, Entry<WK, WindowState<WV>>, 
WindowOutput<WK, WV>> getTransformFn() {
-        return SessionWindow.this.getTransformFunc();
-      }
-
-      @Override public StoreFunctions<M, WK, WindowState<WV>> getStoreFns() {
-        return SessionWindow.this.storeFunctions;
-      }
-
-      @Override public Trigger<M, WindowState<WV>> getTrigger() {
-        return SessionWindow.this.trigger;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java
 
b/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java
deleted file mode 100644
index 0d40761..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-/**
- * The store functions that are used by window and partial join operators to 
store and retrieve buffered {@link MessageEnvelope}s
- * and partial aggregation results.
- *
- * @param <SK>  the type of key used to store the operator state
- * @param <SS>  the type of operator state. E.g. could be the partial 
aggregation result for a window, or a buffered
- *             input {@link MessageEnvelope} from the join stream for a join
- */
-public class StoreFunctions<M extends MessageEnvelope, SK, SS> {
-  /**
-   * Function that returns the key to query in the operator state store for a 
particular {@link MessageEnvelope}.
-   * This 1:1 function only returns a single key for the incoming {@link 
MessageEnvelope}. This is sufficient to support
-   * non-overlapping windows and unique-key based joins.
-   *
-   * TODO: for windows that overlaps (i.e. sliding windows and hopping 
windows) and non-unique-key-based join,
-   * the query to the state store is usually a range scan. We need to add a 
rangeKeyFinder function
-   * (or make this function return a collection) to map from a single input 
{@link MessageEnvelope} to a range of keys in the store.
-   */
-  private final Function<M, SK> storeKeyFn;
-
-  /**
-   * Function to update the store entry based on the current operator state 
and the incoming {@link MessageEnvelope}.
-   *
-   * TODO: this is assuming a 1:1 mapping from the input {@link 
MessageEnvelope} to the store entry. When implementing sliding/hopping
-   * windows and non-unique-key-based join, we may need to include the 
corresponding state key in addition to the
-   * state value. Alternatively this can be called once for each store key for 
the {@link MessageEnvelope}.
-   */
-  private final BiFunction<M, SS, SS> stateUpdaterFn;
-
-  public StoreFunctions(Function<M, SK> storeKeyFn, BiFunction<M, SS, SS> 
stateUpdaterFn) {
-    this.storeKeyFn = storeKeyFn;
-    this.stateUpdaterFn = stateUpdaterFn;
-  }
-
-  public Function<M, SK> getStoreKeyFn() {
-    return this.storeKeyFn;
-  }
-
-  public BiFunction<M, SS, SS> getStateUpdaterFn() {
-    return this.stateUpdaterFn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java
deleted file mode 100644
index c8b0edb..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-/**
- * Defines the trigger functions for the window operator. This class is 
immutable.
- *
- * @param <M>  the type of {@link MessageEnvelope} in the input stream
- * @param <S>  the type of state variable in the window's state store
- */
-public class Trigger<M extends MessageEnvelope, S extends WindowState> {
-
-  /**
-   * System timer based trigger condition. This is the only guarantee that the 
window operator will proceed forward
-   */
-  private final Function<S, Boolean> timerTrigger;
-
-  /**
-   * early trigger condition that determines when to send the first output 
from the window operator
-   */
-  private final BiFunction<M, S, Boolean> earlyTrigger;
-
-  /**
-   * late trigger condition that determines when to send the updated output 
after the first one from a window operator
-   */
-  private final BiFunction<M, S, Boolean> lateTrigger;
-
-  /**
-   * the function to updated the window state when the first output is 
triggered
-   */
-  private final Function<S, S> earlyTriggerUpdater;
-
-  /**
-   * the function to updated the window state when the late output is triggered
-   */
-  private final Function<S, S> lateTriggerUpdater;
-
-  /**
-   * Private constructor to prevent instantiation
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger   late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater   late trigger state updater
-   */
-  private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> 
earlyTrigger, BiFunction<M, S, Boolean> lateTrigger,
-      Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) {
-    this.timerTrigger = timerTrigger;
-    this.earlyTrigger = earlyTrigger;
-    this.lateTrigger = lateTrigger;
-    this.earlyTriggerUpdater = earlyTriggerUpdater;
-    this.lateTriggerUpdater = lateTriggerUpdater;
-  }
-
-  /**
-   * Static method to create a {@link Trigger} object
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger  late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater  late trigger state updater
-   * @param <M>  the type of input {@link MessageEnvelope}
-   * @param <S>  the type of window state extends {@link WindowState}
-   * @return  the {@link Trigger} function
-   */
-  public static <M extends MessageEnvelope, S extends WindowState> Trigger<M, 
S> createTrigger(Function<S, Boolean> timerTrigger,
-      BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> 
lateTrigger, Function<S, S> earlyTriggerUpdater,
-      Function<S, S> lateTriggerUpdater) {
-    return new Trigger(timerTrigger, earlyTrigger, lateTrigger, 
earlyTriggerUpdater, lateTriggerUpdater);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java
 
b/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java
deleted file mode 100644
index 6336a50..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines a builder of {@link Trigger} object for a {@link 
Window}. The triggers are categorized into
- * three types:
- *
- * <p>
- *   early trigger: defines the condition when the first output from the 
window function is sent.
- *   late trigger: defines the condition when the updated output after the 
first output is sent.
- *   timer trigger: defines a system timeout condition to trigger output if no 
more inputs are received to enable early/late triggers
- * </p>
- *
- * If multiple conditions are defined for a specific type of trigger, the 
aggregated trigger is the disjunction
- * of each individual trigger (i.e. OR).
- *
- * @param <M>  the type of input {@link MessageEnvelope} to the {@link Window}
- * @param <V>  the type of output value from the {@link Window}
- */
-@InterfaceStability.Unstable
-public final class TriggerBuilder<M extends MessageEnvelope, V> {
-
-  /**
-   * Predicate helper to OR multiple trigger conditions
-   */
-  static class PredicateHelper {
-    static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, 
BiFunction<M, S, Boolean> rhs) {
-      return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s);
-    }
-
-    static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, 
Boolean> rhs) {
-      return s -> lhs.apply(s) || rhs.apply(s);
-    }
-  }
-
-  /**
-   * The early trigger condition that determines the first output from the 
{@link Window}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null;
-
-  /**
-   * The late trigger condition that determines the late output(s) from the 
{@link Window}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null;
-
-  /**
-   * The system timer based trigger conditions that guarantees the {@link 
Window} proceeds forward
-   */
-  private Function<WindowState<V>, Boolean> timerTrigger = null;
-
-  /**
-   * The state updater function to be applied after the first output is 
triggered
-   */
-  private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = 
Function.identity();
-
-  /**
-   * The state updater function to be applied after the late output is 
triggered
-   */
-  private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = 
Function.identity();
-
-  /**
-   * Helper method to add a trigger condition
-   *
-   * @param currentTrigger  current trigger condition
-   * @param newTrigger  new trigger condition
-   * @return  combined trigger condition that is {@code currentTrigger} OR 
{@code newTrigger}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, 
WindowState<V>, Boolean> currentTrigger,
-      BiFunction<M, WindowState<V>, Boolean> newTrigger) {
-    if (currentTrigger == null) {
-      return newTrigger;
-    }
-
-    return PredicateHelper.or(currentTrigger, newTrigger);
-  }
-
-  /**
-   * Helper method to add a system timer trigger
-   *
-   * @param currentTimer  current timer condition
-   * @param newTimer  new timer condition
-   * @return  combined timer condition that is {@code currentTimer} OR {@code 
newTimer}
-   */
-  private Function<WindowState<V>, Boolean> 
addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer,
-      Function<WindowState<V>, Boolean> newTimer) {
-    if (currentTimer == null) {
-      return newTimer;
-    }
-
-    return PredicateHelper.or(currentTimer, newTimer);
-  }
-
-  /**
-   * default constructor to prevent instantiation
-   */
-  private TriggerBuilder() {}
-
-  /**
-   * Constructor that set the size limit as the early trigger for a window
-   *
-   * @param sizeLimit  the number of {@link MessageEnvelope}s in a window that 
would trigger the first output
-   */
-  private TriggerBuilder(long sizeLimit) {
-    this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit;
-  }
-
-  /**
-   * Constructor that set the event time length as the early trigger
-   *
-   * @param eventTimeFunction  the function that calculate the event time in 
nano-second from the input {@link MessageEnvelope}
-   * @param wndLenMs  the window length in event time in milli-second
-   */
-  private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) {
-    this.earlyTrigger = (m, s) ->
-        TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - 
s.getEarliestEventTimeNs(),
-            eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > 
wndLenMs;
-  }
-
-  /**
-   * Constructor that set the special token {@link MessageEnvelope} as the 
early trigger
-   *
-   * @param tokenFunc  the function that checks whether an input {@link 
MessageEnvelope} is a token {@link MessageEnvelope} that triggers window output
-   */
-  private TriggerBuilder(Function<M, Boolean> tokenFunc) {
-    this.earlyTrigger = (m, s) -> tokenFunc.apply(m);
-  }
-
-  /**
-   * Build method that creates an {@link Trigger} object based on the trigger 
conditions set in {@link TriggerBuilder}
-   * This is kept package private and only used by {@link Windows} to convert 
the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object
-   *
-   * @return  the final {@link Trigger} object
-   */
-  Trigger<M, WindowState<V>> build() {
-    return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, 
this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater);
-  }
-
-  /**
-   * Public API methods start here
-   */
-
-
-  /**
-   * API method to allow users to set an update method to update the output 
value after the first window output is triggered
-   * by the early trigger condition
-   *
-   * @param onTriggerFunc  the method to update the output value after the 
early trigger
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) {
-    this.earlyTriggerUpdater = s -> {
-      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
-      return s;
-    };
-    return this;
-  }
-
-  /**
-   * API method to allow users to set an update method to update the output 
value after a late window output is triggered
-   * by the late trigger condition
-   *
-   * @param onTriggerFunc  the method to update the output value after the 
late trigger
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) {
-    this.lateTriggerUpdater = s -> {
-      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
-      return s;
-    };
-    return this;
-  }
-
-  /**
-   * API method to allow users to add a system timer trigger based on timeout 
after the last {@link MessageEnvelope} received in the window
-   *
-   * @param timeoutMs  the timeout in ms after the last {@link 
MessageEnvelope} received in the window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) {
-    this.timerTrigger = this.addTimerTrigger(this.timerTrigger,
-        s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 
timeoutMs < System.currentTimeMillis());
-    return this;
-  }
-
-  /**
-   * API method to allow users to add a system timer trigger based on the 
timeout after the first {@link MessageEnvelope} received in the window
-   *
-   * @param timeoutMs  the timeout in ms after the first {@link 
MessageEnvelope} received in the window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) {
-    this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s ->
-        TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < 
System.currentTimeMillis());
-    return this;
-  }
-
-  /**
-   * API method allow users to add a late trigger based on the window size 
limit
-   *
-   * @param sizeLimit  limit on the number of {@link MessageEnvelope}s in 
window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) {
-    this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> 
s.getNumberMessages() > sizeLimit);
-    return this;
-  }
-
-  /**
-   * API method to allow users to define a customized late trigger function 
based on input {@link MessageEnvelope} and the window state
-   *
-   * @param lateTrigger  the late trigger condition based on input {@link 
MessageEnvelope} and the current {@link WindowState}
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, 
Boolean> lateTrigger) {
-    this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger);
-    return this;
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger 
condition based on window size limit
-   *
-   * @param sizeLimit  window size limit
-   * @param <M>  the type of input {@link MessageEnvelope}
-   * @param <V>  the type of {@link Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> 
earlyTriggerWhenExceedWndLen(long sizeLimit) {
-    return new TriggerBuilder<M, V>(sizeLimit);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger 
condition based on event time window
-   *
-   *
-   * @param eventTimeFunc  the function to get the event time from the input 
{@link MessageEnvelope}
-   * @param eventTimeWndSizeMs  the event time window size in Ms
-   * @param <M>  the type of input {@link MessageEnvelope}
-   * @param <V>  the type of {@link Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> 
earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long 
eventTimeWndSizeMs) {
-    return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger 
condition based on token {@link MessageEnvelope}s
-   *
-   * @param tokenFunc  the function to determine whether an input {@link 
MessageEnvelope} is a window token or not
-   * @param <M>  the type of input {@link MessageEnvelope}
-   * @param <V>  the type of {@link Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> 
earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) {
-    return new TriggerBuilder<M, V>(tokenFunc);
-  }
-
-  /**
-   * Static API method to allow customized early trigger condition based on 
input {@link MessageEnvelope} and the corresponding {@link WindowState}
-   *
-   * @param earlyTrigger  the user defined early trigger condition
-   * @param <M>   the input {@link MessageEnvelope} type
-   * @param <V>   the output value from the window
-   * @return   the {@link TriggerBuilder} object
-   */
-  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> 
earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) {
-    TriggerBuilder<M, V> newTriggers =  new TriggerBuilder<M, V>();
-    newTriggers.earlyTrigger = 
newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger);
-    return newTriggers;
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ system timeout 
after the last {@link MessageEnvelope} received in the window
-   *
-   * @param timeoutMs  timeout in ms after the last {@link MessageEnvelope} 
received
-   * @param <M>  the type of input {@link MessageEnvelope}
-   * @param <V>  the type of {@link Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> 
timeoutSinceLastMessage(long timeoutMs) {
-    return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ system timeout 
after the first {@link MessageEnvelope} received in the window
-   *
-   * @param timeoutMs  timeout in ms after the first {@link MessageEnvelope} 
received
-   * @param <M>  the type of input {@link MessageEnvelope}
-   * @param <V>  the type of {@link Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> 
timeoutSinceFirstMessage(long timeoutMs) {
-    return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 56a307d..6aae940 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -18,32 +18,94 @@
  */
 package org.apache.samza.operators.windows;
 
+import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.Trigger;
 
 /**
- * The public programming interface class for window function
+ * Groups incoming {@link MessageEnvelope}s in the {@link 
org.apache.samza.operators.MessageStream} into finite
+ * windows for processing.
  *
- * @param <M>  the type of input {@link MessageEnvelope}
- * @param <WK>  the type of key to the {@link Window}
- * @param <WV>  the type of output value in the {@link WindowOutput}
- * @param <WM>  the type of {@link MessageEnvelope} in the window output stream
+ * <p> A window is uniquely identified by its {@link WindowKey}. A window can 
have one or more associated {@link Trigger}s
+ * that determine when results from the {@link Window} are emitted.
+ *
+ * <p> Each emitted result contains one or more {@link MessageEnvelope}s in 
the window and is called a {@link WindowPane}.
+ * A pane can include all {@link MessageEnvelope}s collected for the window so 
far or only the new {@link MessageEnvelope}s
+ * since the last emitted pane. (as determined by the {@link AccumulationMode})
+ *
+ * <p> A window can have early triggers that allow emitting {@link 
WindowPane}s speculatively before all data for the window
+ * has arrived or late triggers that allow handling of late data arrivals.
+ *
+ * <p> A {@link Window} is defined as "keyed" when the incoming {@link 
org.apache.samza.operators.MessageStream} is first
+ * partitioned based on the provided key, and windowing is applied on the 
partitioned stream.
+ *
+ *                                     window wk1 (with its triggers)
+ *                                      +--------------------------------+
+ *                                      ------------+--------+-----------+
+ *                                      |           |        |           |
+ *                                      | pane 1    |pane2   |   pane3   |
+ *                                      +-----------+--------+-----------+
+ *
+ -----------------------------------
+ *incoming message stream ------+
+ -----------------------------------
+ *                                      window wk2
+ *                                      +---------------------+---------+
+ *                                      |   pane 1|   pane 2  |  pane 3 |
+ *                                      |         |           |         |
+ *                                      +---------+-----------+---------+
+ *
+ *                                      window wk3
+ *                                      +----------+-----------+---------+
+ *                                      |          |           |         |
+ *                                      | pane 1   |  pane 2   |   pane 3|
+ *                                      |          |           |         |
+ *                                      +----------+-----------+---------+
+ *
+ *
+ * <p> Use the {@link Windows} APIs to create various windows and the {@link 
org.apache.samza.operators.triggers.Triggers}
+ * APIs to create triggers.
+ *
+ * @param <M> the type of the input {@link MessageEnvelope}
+ * @param <K> the type of the key in the {@link MessageEnvelope} in this 
{@link org.apache.samza.operators.MessageStream}.
+ * @param <WV> the type of the value in the {@link WindowPane}.
+ * @param <WM> the type of the output.
  */
-public interface Window<M extends MessageEnvelope, WK, WV, WM extends 
WindowOutput<WK, WV>> {
+@InterfaceStability.Unstable
+public interface Window<M extends MessageEnvelope, K, WV, WM extends 
WindowPane<K, WV>> {
 
   /**
-   * Set the triggers for this {@link Window}
+   * Set the early triggers for this {@link Window}.
+   * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to 
create instances of {@link Trigger}
    *
-   * @param wndTrigger  trigger conditions set by the programmers
-   * @return  the {@link Window} function w/ the trigger {@code wndTrigger}
+   * @param trigger the early trigger
+   * @return the {@link Window} function with the early trigger
    */
-  Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger);
+  Window<M, K, WV, WM> setEarlyTrigger(Trigger<M> trigger);
 
   /**
-   * Internal implementation helper to get the functions associated with this 
Window.
+   * Set the late triggers for this {@link Window}.
+   * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to 
create instances of {@link Trigger}
    *
-   * <b>NOTE:</b> This is purely an internal API and should not be used 
directly by users.
+   * @param trigger the late trigger
+   * @return the {@link Window} function with the late trigger
+   */
+  Window<M, K, WV, WM> setLateTrigger(Trigger<M> trigger);
+
+  /**
+   * Specify how a {@link Window} should process its previously emitted {@link 
WindowPane}s.
+   *
+   * <p> There are two types of {@link AccumulationMode}s:
+   * <ul>
+   *  <li> ACCUMULATING: Specifies that window panes should include all 
messages collected for the window (key) so far, even if they were
+   * included in previously emitted window panes.
+   *  <li> DISCARDING: Specifies that window panes should only include 
messages collected for this window (key) since the last emitted
+   * window pane.
+   * </ul>
    *
-   * @return the functions associated with this Window.
+   * @param mode the accumulation mode
+   * @return the {@link Window} function with the specified {@link 
AccumulationMode}.
    */
-  WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn();
+  Window<M, K, WV, WM> setAccumulationMode(AccumulationMode mode);
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java
deleted file mode 100644
index 8878bf9..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.function.BiFunction;
-
-
-/**
- * Defines an internal representation of a window function.
- *
- * @param <M>  type of the input {@link MessageEnvelope} for the window
- * @param <WK>  type of the window key in the output {@link MessageEnvelope}
- * @param <WS>  type of the {@link WindowState} in the state store
- * @param <WM>  type of the {@link MessageEnvelope} in the output stream
- */
-public interface WindowFn<M extends MessageEnvelope, WK, WS extends 
WindowState, WM extends WindowOutput<WK, ?>> {
-
-  /**
-   * Get the transformation function of the {@link WindowFn}.
-   *
-   * @return  the transformation function which takes a {@link 
MessageEnvelope} of type {@code M} and its window state entry,
-   *          and transforms it to an {@link WindowOutput}
-   */
-  BiFunction<M, Entry<WK, WS>, WM> getTransformFn();
-
-  /**
-   * Get the state store functions for this {@link WindowFn}.
-   *
-   * @return  the state store functions
-   */
-  StoreFunctions<M, WK, WS> getStoreFns();
-
-  /**
-   * Get the trigger conditions for this {@link WindowFn}.
-   *
-   * @return  the trigger condition for this {@link WindowFn}
-   */
-  Trigger<M, WS> getTrigger();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
new file mode 100644
index 0000000..7edf3e1
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+/**
+ * Key for a {@link WindowPane} emitted from a {@link Window}.
+ *
+ * @param <K> the type of the key in the incoming {@link 
org.apache.samza.operators.data.MessageEnvelope}.
+ *            Windows that are not keyed have a {@link Void} key type.
+ *
+ */
+public class WindowKey<K> {
+
+  private final  K key;
+
+  private final String windowId;
+
+  public WindowKey(K key, String  windowId) {
+    this.key = key;
+    this.windowId = windowId;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public String getWindowId() {
+    return windowId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java
deleted file mode 100644
index 63e34c8..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
-/**
- * The type of output {@link MessageEnvelope}s in a window operator output 
stream.
- *
- * @param <K>  the type of key in the window output
- * @param <M>  the type of value in the window output
- */
-public final class WindowOutput<K, M> implements MessageEnvelope<K, M> {
-  private final K key;
-  private final M value;
-
-  WindowOutput(K key, M value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  @Override public M getMessage() {
-    return this.value;
-  }
-
-  @Override public K getKey() {
-    return this.key;
-  }
-
-  static public <K, M> WindowOutput<K, M> of(K key, M result) {
-    return new WindowOutput<>(key, result);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
new file mode 100644
index 0000000..0388048
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+/**
+ * Specifies the result emitted from a {@link Window}.
+ *
+ * @param <K>  the type of key in the window pane
+ * @param <V>  the type of value in the window pane.
+ */
+public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, 
V> {
+
+  private final WindowKey<K> key;
+
+  private final V value;
+
+  private final AccumulationMode mode;
+
+  WindowPane(WindowKey<K> key, V value, AccumulationMode mode) {
+    this.key = key;
+    this.value = value;
+    this.mode = mode;
+  }
+
+  @Override public V getMessage() {
+    return this.value;
+  }
+
+  @Override public WindowKey<K> getKey() {
+    return this.key;
+  }
+
+  static public <K, M> WindowPane<K, M> of(WindowKey<K> key, M result) {
+    return new WindowPane<>(key, result, AccumulationMode.DISCARDING);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java
deleted file mode 100644
index 835d749..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This interface defines the methods a window state class has to implement. 
The programmers are allowed to implement
- * customized window state to be stored in window state stores by implementing 
this interface class.
- *
- * @param <WV>  the type for window output value
- */
-@InterfaceStability.Unstable
-public interface WindowState<WV> {
-  /**
-   * Method to get the system time when the first {@link 
org.apache.samza.operators.data.MessageEnvelope}
-   * in the window is received
-   *
-   * @return  nano-second of system time for the first {@link 
org.apache.samza.operators.data.MessageEnvelope}
-   *          received in the window
-   */
-  long getFirstMessageTimeNs();
-
-  /**
-   * Method to get the system time when the last {@link 
org.apache.samza.operators.data.MessageEnvelope}
-   * in the window is received
-   *
-   * @return  nano-second of system time for the last {@link 
org.apache.samza.operators.data.MessageEnvelope}
-   *          received in the window
-   */
-  long getLastMessageTimeNs();
-
-  /**
-   * Method to get the earliest event time in the window
-   *
-   * @return  the earliest event time in nano-second in the window
-   */
-  long getEarliestEventTimeNs();
-
-  /**
-   * Method to get the latest event time in the window
-   *
-   * @return  the latest event time in nano-second in the window
-   */
-  long getLatestEventTimeNs();
-
-  /**
-   * Method to get the total number of {@link 
org.apache.samza.operators.data.MessageEnvelope}s received in the window
-   *
-   * @return  number of {@link 
org.apache.samza.operators.data.MessageEnvelope}s in the window
-   */
-  long getNumberMessages();
-
-  /**
-   * Method to get the corresponding window's output value
-   *
-   * @return  the corresponding window's output value
-   */
-  WV getOutputValue();
-
-  /**
-   * Method to set the corresponding window's output value
-   *
-   * @param value  the corresponding window's output value
-   */
-  void setOutputValue(WV value);
-
-}

Reply via email to