http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
deleted file mode 100644
index 14e6562..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.Message;
-
-
-public class TestOutputMessage implements Message<String, Integer> {
-  private final String key;
-  private final Integer value;
-  private final long timestamp;
-
-  public TestOutputMessage(String key, Integer value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public Integer getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
 
b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
new file mode 100644
index 0000000..284b30b
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+public class TestOutputMessageEnvelope implements MessageEnvelope<String, 
Integer> {
+  private final String key;
+  private final Integer value;
+
+  public TestOutputMessageEnvelope(String key, Integer value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  @Override
+  public Integer getMessage() {
+    return this.value;
+  }
+
+  @Override
+  public String getKey() {
+    return this.key;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
deleted file mode 100644
index 927b14b..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTriggerBuilder {
-  private Field earlyTriggerField;
-  private Field lateTriggerField;
-  private Field timerTriggerField;
-  private Field earlyTriggerUpdater;
-  private Field lateTriggerUpdater;
-
-  @Before
-  public void testPrep() throws Exception {
-    this.earlyTriggerField = 
TriggerBuilder.class.getDeclaredField("earlyTrigger");
-    this.lateTriggerField = 
TriggerBuilder.class.getDeclaredField("lateTrigger");
-    this.timerTriggerField = 
TriggerBuilder.class.getDeclaredField("timerTrigger");
-    this.earlyTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
-    this.lateTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
-
-    this.earlyTriggerField.setAccessible(true);
-    this.lateTriggerField.setAccessible(true);
-    this.timerTriggerField.setAccessible(true);
-    this.earlyTriggerUpdater.setAccessible(true);
-    this.lateTriggerUpdater.setAccessible(true);
-  }
-
-  @Test public void testStaticCreators() throws NoSuchFieldException, 
IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    when(mockState.getNumberMessages()).thenReturn(2000L);
-    assertTrue(triggerField.apply(null, mockState));
-
-    Function<TestMessage, Boolean> tokenFunc = m -> true;
-    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    TestMessage m = mock(TestMessage.class);
-    assertTrue(triggerField.apply(m, mockState));
-
-    builder = 
TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
-    when(m.getTimestamp()).thenReturn(19999000000L);
-    assertFalse(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(1001000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
mockFunc = mock(BiFunction.class);
-    builder = TriggerBuilder.earlyTrigger(mockFunc);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    assertEquals(triggerField, mockFunc);
-
-    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second 
to fire up the timerTrigger before assertion
-    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddTimerTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addTimeoutSinceFirstMessage(10000L);
-    // exam that both earlyTrigger and timer triggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    // check the timer trigger
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    // exam that both early trigger and timer triggers are set up
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    builder.addTimeoutSinceLastMessage(20000L);
-    // check the timer trigger
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddLateTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTriggerOnSizeLimit(10000L);
-    // exam that both earlyTrigger and lateTriggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
earlyTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // check the late trigger
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
lateTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    // set the number of messages to 10001 to trigger the late trigger
-    when(mockState.getNumberMessages()).thenReturn(10001L);
-    assertTrue(lateTrigger.apply(null, mockState));
-
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
-    // exam that both earlyTrigger and lateTriggers are set up
-    earlyTrigger = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // exam the lateTrigger
-    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
-    lateTrigger = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    List<TestMessage> mockList = mock(ArrayList.class);
-    when(mockList.size()).thenReturn(200);
-    when(mockState.getOutputValue()).thenReturn(mockList);
-    assertTrue(lateTrigger.apply(null, mockState));
-  }
-
-  @Test public void testAddTriggerUpdater() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.onEarlyTrigger(c -> {
-        c.clear();
-        return c;
-      });
-    List<TestMessage> collection = new ArrayList<TestMessage>() { {
-        for (int i = 0; i < 10; i++) {
-          this.add(new TestMessage(String.format("key-%d", i), "string-value", 
System.nanoTime()));
-        }
-      } };
-    // exam that earlyTriggerUpdater is set up
-    Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    earlyTriggerUpdater.apply(mockState);
-    assertTrue(collection.isEmpty());
-
-    collection.add(new TestMessage("key-to-stay", "string-to-stay", 
System.nanoTime()));
-    collection.add(new TestMessage("key-to-remove", "string-to-remove", 
System.nanoTime()));
-    builder.onLateTrigger(c -> {
-        c.removeIf(t -> t.getKey().equals("key-to-remove"));
-        return c;
-      });
-    // check the late trigger updater
-    Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>> lateTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    lateTriggerUpdater.apply(mockState);
-    assertTrue(collection.size() == 1);
-    assertFalse(collection.get(0).isDelete());
-    assertEquals(collection.get(0).getKey(), "key-to-stay");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
deleted file mode 100644
index 8a25a96..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.Windows.Window;
-import org.apache.samza.operators.internal.Trigger;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestWindows {
-
-  @Test public void testSessionWindows() throws NoSuchFieldException, 
IllegalAccessException {
-    // test constructing the default session window
-    Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, 
Collection<TestMessage>>> testWnd = Windows.intoSessions(
-        TestMessage::getKey);
-    assertTrue(testWnd instanceof Windows.SessionWindow);
-    Field wndKeyFuncField = 
Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
-    Field aggregatorField = 
Windows.SessionWindow.class.getDeclaredField("aggregator");
-    wndKeyFuncField.setAccessible(true);
-    aggregatorField.setAccessible(true);
-    Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testWnd);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "test-key");
-    BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> 
aggrFunc =
-        (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd);
-    TestMessage mockMsg = mock(TestMessage.class);
-    Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new 
ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains(mockMsg));
-
-    // test constructing the session window w/ customized session info
-    Window<TestMessage, String, Collection<Character>, WindowOutput<String, 
Collection<Character>>> testWnd2 = Windows.intoSessions(
-        m -> String.format("key-%d", m.getTimestamp()), m -> 
m.getMessage().charAt(0));
-    assertTrue(testWnd2 instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
-    aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd2);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "key-0");
-    when(mockMsg.getMessage()).thenReturn("x-001");
-    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains('x'));
-
-    // test constructing session window w/ a default counter
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    assertTrue(testCounter instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testCounter);
-    BiFunction<TestMessage, Integer, Integer> counterFn = 
(BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
-    when(mockMsg.getTimestamp()).thenReturn(12345L);
-    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
-    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
-  }
-
-  @Test public void testSetTriggers() throws NoSuchFieldException, 
IllegalAccessException {
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    // test session window w/ a trigger
-    TriggerBuilder<TestMessage, Integer> triggerBuilder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
-    testCounter.setTriggers(triggerBuilder);
-    Trigger<TestMessage, WindowState<Integer>> expectedTrigger = 
triggerBuilder.build();
-    Trigger<TestMessage, WindowState<Integer>> actualTrigger = 
Windows.getInternalWindowFn(testCounter).getTrigger();
-    // examine all trigger fields are expected
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field earlyTriggerUpdater = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdater = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdater.setAccessible(true);
-    lateTriggerUpdater.setAccessible(true);
-    assertEquals(earlyTriggerField.get(expectedTrigger), 
earlyTriggerField.get(actualTrigger));
-    assertEquals(lateTriggerField.get(expectedTrigger), 
lateTriggerField.get(actualTrigger));
-    assertEquals(timerTriggerField.get(expectedTrigger), 
timerTriggerField.get(actualTrigger));
-    assertEquals(earlyTriggerUpdater.get(expectedTrigger), 
earlyTriggerUpdater.get(actualTrigger));
-    assertEquals(lateTriggerUpdater.get(expectedTrigger), 
lateTriggerUpdater.get(actualTrigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
index b734e87..7bd62a7 100644
--- 
a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -30,9 +30,10 @@ import static org.mockito.Mockito.when;
 
 public class TestIncomingSystemMessage {
 
-  @Test public void testConstructor() {
+  @Test
+  public void testConstructor() {
     IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
-    IncomingSystemMessage ism = new IncomingSystemMessage(ime);
+    IncomingSystemMessageEnvelope ism = new IncomingSystemMessageEnvelope(ime);
 
     Object mockKey = mock(Object.class);
     Object mockValue = mock(Object.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
index 943c47f..7838896 100644
--- 
a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
@@ -28,7 +28,8 @@ import static org.mockito.Mockito.mock;
 
 public class TestLongOffset {
 
-  @Test public void testConstructor() throws Exception {
+  @Test
+  public void testConstructor() throws Exception {
     LongOffset o1 = new LongOffset("12345");
     Field offsetField = LongOffset.class.getDeclaredField("offset");
     offsetField.setAccessible(true);
@@ -47,7 +48,8 @@ public class TestLongOffset {
     }
   }
 
-  @Test public void testComparator() {
+  @Test
+  public void testComparator() {
     LongOffset o1 = new LongOffset("11111");
     Offset other = mock(Offset.class);
     try {
@@ -65,7 +67,8 @@ public class TestLongOffset {
     assertEquals(o1.compareTo(o4), 0);
   }
 
-  @Test public void testEquals() {
+  @Test
+  public void testEquals() {
     LongOffset o1 = new LongOffset("12345");
     Offset other = mock(Offset.class);
     assertFalse(o1.equals(other));

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
deleted file mode 100644
index d994486..0000000
--- 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperators {
-
-  private class TestMessage implements Message<String, Object> {
-    private final long timestamp;
-    private final String key;
-    private final Object msg;
-
-
-    TestMessage(String key, Object msg, long timestamp) {
-      this.timestamp = timestamp;
-      this.key = key;
-      this.msg = msg;
-    }
-
-    @Override public Object getMessage() {
-      return this.msg;
-    }
-
-    @Override public String getKey() {
-      return this.key;
-    }
-
-    @Override public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  @Test public void testGetStreamOperator() {
-    Function<Message, Collection<TestMessage>> transformFn = m -> new 
ArrayList<TestMessage>() { {
-        this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 
12345L));
-      } };
-    Operators.StreamOperator<Message, TestMessage> strmOp = 
Operators.getStreamOperator(transformFn);
-    assertEquals(strmOp.getFunction(), transformFn);
-    assertTrue(strmOp.getOutputStream() instanceof MessageStream);
-  }
-
-  @Test public void testGetSinkOperator() {
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, 
TaskCoordinator> sinkFn = (m, c, t) -> { };
-    Operators.SinkOperator<TestMessage> sinkOp = 
Operators.getSinkOperator(sinkFn);
-    assertEquals(sinkOp.getFunction(), sinkFn);
-    assertTrue(sinkOp.getOutputStream() == null);
-  }
-
-  @Test public void testGetWindowOperator() {
-    WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, 
Integer>> windowFn = mock(WindowFn.class);
-    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, 
WindowOutput<String, Integer>> xFunction = (m, e) -> null;
-    Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> 
storeFns = mock(Operators.StoreFunctions.class);
-    Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
-    MessageStream<TestMessage> mockInput = mock(MessageStream.class);
-    when(windowFn.getTransformFunc()).thenReturn(xFunction);
-    when(windowFn.getStoreFuncs()).thenReturn(storeFns);
-    when(windowFn.getTrigger()).thenReturn(trigger);
-    when(mockInput.toString()).thenReturn("mockStream1");
-
-    Operators.WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
-    assertEquals(windowOp.getFunction(), xFunction);
-    assertEquals(windowOp.getStoreFunctions(), storeFns);
-    assertEquals(windowOp.getTrigger(), trigger);
-    assertEquals(windowOp.getStoreName(mockInput), 
String.format("input-mockStream1-wndop-%s", windowOp.toString()));
-  }
-
-  @Test public void testGetPartialJoinOperator() {
-    BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
-        (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
-            Math.max(m1.getTimestamp(), m2.getTimestamp()));
-    MessageStream<TestMessage> joinOutput = new MessageStream<>();
-    Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, 
?>, TestMessage> partialJoin =
-        Operators.getPartialJoinOperator(merger, joinOutput);
-
-    assertEquals(partialJoin.getOutputStream(), joinOutput);
-    Message<Object, Object> m = mock(Message.class);
-    Message<Object, Object> s = mock(Message.class);
-    assertEquals(partialJoin.getFunction(), merger);
-    
assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
-    
assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), 
m);
-    
assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
-    assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
-  }
-
-  @Test public void testGetMergeOperator() {
-    MessageStream<TestMessage> output = new MessageStream<>();
-    Operators.StreamOperator<TestMessage, TestMessage> mergeOp = 
Operators.getMergeOperator(output);
-    Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new 
ArrayList<TestMessage>() { {
-        this.add(t);
-      } };
-    TestMessage t = mock(TestMessage.class);
-    assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
-    assertEquals(mergeOp.getOutputStream(), output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
deleted file mode 100644
index 0f35a7c..0000000
--- 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestTrigger {
-
-  @Test public void testConstructor() throws Exception {
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
lateTrigger = (m, s) -> s.getOutputValue() > 1000;
-    Function<WindowState<Integer>, Boolean> timerTrigger = s -> 
TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < 
System.currentTimeMillis();
-    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = 
s -> {
-      s.setOutputValue(0);
-      return s;
-    };
-    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = 
s -> {
-      s.setOutputValue(1);
-      return s;
-    };
-
-    Trigger<Message<Object, Object>, WindowState<Integer>> trigger = 
Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
-        earlyTriggerUpdater, lateTriggerUpdater);
-
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field earlyTriggerUpdaterField = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdaterField = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdaterField.setAccessible(true);
-    lateTriggerUpdaterField.setAccessible(true);
-
-    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
-    assertEquals(timerTrigger, timerTriggerField.get(trigger));
-    assertEquals(lateTrigger, lateTriggerField.get(trigger));
-    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
-    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
deleted file mode 100644
index 268c9fc..0000000
--- 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-
-public class TestWindowOutput {
-
-  @Test public void testConstructor() {
-    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
-    assertEquals(wndOutput.getKey(), "testMsg");
-    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
-    assertFalse(wndOutput.isDelete());
-    assertEquals(wndOutput.getTimestamp(), 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
new file mode 100644
index 0000000..a91af24
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestTrigger {
+
+  @Test
+  public void testConstructor() throws Exception {
+    BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> 
earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
+    BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> 
lateTrigger = (m, s) -> s.getOutputValue() > 1000;
+    Function<WindowState<Integer>, Boolean> timerTrigger = s -> 
TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < 
System.currentTimeMillis();
+    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = 
s -> {
+      s.setOutputValue(0);
+      return s;
+    };
+    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = 
s -> {
+      s.setOutputValue(1);
+      return s;
+    };
+
+    Trigger<MessageEnvelope<Object, Object>, WindowState<Integer>> trigger = 
Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
+        earlyTriggerUpdater, lateTriggerUpdater);
+
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field earlyTriggerUpdaterField = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdaterField = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdaterField.setAccessible(true);
+    lateTriggerUpdaterField.setAccessible(true);
+
+    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
+    assertEquals(timerTrigger, timerTriggerField.get(trigger));
+    assertEquals(lateTrigger, lateTriggerField.get(trigger));
+    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
+    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
new file mode 100644
index 0000000..6a9b55d
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTriggerBuilder {
+  private Field earlyTriggerField;
+  private Field lateTriggerField;
+  private Field timerTriggerField;
+  private Field earlyTriggerUpdater;
+  private Field lateTriggerUpdater;
+
+  @Before
+  public void testPrep() throws Exception {
+    this.earlyTriggerField = 
TriggerBuilder.class.getDeclaredField("earlyTrigger");
+    this.lateTriggerField = 
TriggerBuilder.class.getDeclaredField("lateTrigger");
+    this.timerTriggerField = 
TriggerBuilder.class.getDeclaredField("timerTrigger");
+    this.earlyTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
+    this.lateTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
+
+    this.earlyTriggerField.setAccessible(true);
+    this.lateTriggerField.setAccessible(true);
+    this.timerTriggerField.setAccessible(true);
+    this.earlyTriggerUpdater.setAccessible(true);
+    this.lateTriggerUpdater.setAccessible(true);
+  }
+
+  @Test
+  public void testStaticCreators() throws NoSuchFieldException, 
IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
+        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    when(mockState.getNumberMessages()).thenReturn(2000L);
+    assertTrue(triggerField.apply(null, mockState));
+
+    Function<TestMessageEnvelope, Boolean> tokenFunc = m -> true;
+    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
+    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    TestMessageEnvelope m = mock(TestMessageEnvelope.class);
+    assertTrue(triggerField.apply(m, mockState));
+
+    builder = TriggerBuilder.earlyTriggerOnEventTime(tm -> 
tm.getMessage().getEventTime(), 30000L);
+    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
+    TestMessageEnvelope.MessageType mockInnerMessage;
+    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(mockInnerMessage.getEventTime()).thenReturn(19999000000L);
+    when(m.getMessage()).thenReturn(mockInnerMessage);
+    assertFalse(triggerField.apply(m, mockState));
+    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(mockInnerMessage.getEventTime()).thenReturn(32000000000L);
+    when(m.getMessage()).thenReturn(mockInnerMessage);
+    assertTrue(triggerField.apply(m, mockState));
+    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(m.getMessage()).thenReturn(mockInnerMessage);
+    when(mockInnerMessage.getEventTime()).thenReturn(1001000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+
+    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> mockFunc = 
mock(BiFunction.class);
+    builder = TriggerBuilder.earlyTrigger(mockFunc);
+    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    assertEquals(triggerField, mockFunc);
+
+    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
+    Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> 
timerTrigger =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
+    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
+    timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, 
Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second 
to fire up the timerTrigger before assertion
+    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test
+  public void testAddTimerTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addTimeoutSinceFirstMessage(10000L);
+    // exam that both earlyTrigger and timer triggers are set up
+    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
+        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    // check the timer trigger
+    Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> 
timerTrigger =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
+    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    // exam that both early trigger and timer triggers are set up
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    builder.addTimeoutSinceLastMessage(20000L);
+    // check the timer trigger
+    timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, 
Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
+    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test
+  public void testAddLateTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTriggerOnSizeLimit(10000L);
+    // exam that both earlyTrigger and lateTriggers are set up
+    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> earlyTrigger =
+        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // check the late trigger
+    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> lateTrigger =
+        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    // set the number of messages to 10001 to trigger the late trigger
+    when(mockState.getNumberMessages()).thenReturn(10001L);
+    assertTrue(lateTrigger.apply(null, mockState));
+
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
+    // exam that both earlyTrigger and lateTriggers are set up
+    earlyTrigger = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // exam the lateTrigger
+    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
+    lateTrigger = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    List<TestMessageEnvelope> mockList = mock(ArrayList.class);
+    when(mockList.size()).thenReturn(200);
+    when(mockState.getOutputValue()).thenReturn(mockList);
+    assertTrue(lateTrigger.apply(null, mockState));
+  }
+
+  @Test
+  public void testAddTriggerUpdater() throws IllegalAccessException {
+    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.onEarlyTrigger(c -> {
+        c.clear();
+        return c;
+      });
+    List<TestMessageEnvelope> collection = new 
ArrayList<TestMessageEnvelope>() { {
+        for (int i = 0; i < 10; i++) {
+          this.add(new TestMessageEnvelope(String.format("key-%d", i), 
"string-value", System.nanoTime()));
+        }
+      } };
+    // exam that earlyTriggerUpdater is set up
+    Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>> earlyTriggerUpdater =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>>) 
this.earlyTriggerUpdater.get(builder);
+    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    earlyTriggerUpdater.apply(mockState);
+    assertTrue(collection.isEmpty());
+
+    collection.add(new TestMessageEnvelope("key-to-stay", "string-to-stay", 
System.nanoTime()));
+    collection.add(new TestMessageEnvelope("key-to-remove", 
"string-to-remove", System.nanoTime()));
+    builder.onLateTrigger(c -> {
+        c.removeIf(t -> t.getKey().equals("key-to-remove"));
+        return c;
+      });
+    // check the late trigger updater
+    Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>> lateTriggerUpdater =
+        (Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>>) 
this.lateTriggerUpdater.get(builder);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    lateTriggerUpdater.apply(mockState);
+    assertTrue(collection.size() == 1);
+    assertFalse(collection.get(0).isDelete());
+    assertEquals(collection.get(0).getKey(), "key-to-stay");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
new file mode 100644
index 0000000..7f81fc9
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestWindowOutput {
+
+  @Test
+  public void testConstructor() {
+    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
+    assertEquals(wndOutput.getKey(), "testMsg");
+    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+    assertFalse(wndOutput.isDelete());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
new file mode 100644
index 0000000..26af26e
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWindows {
+
+  @Test
+  public void testSessionWindows() throws NoSuchFieldException, 
IllegalAccessException {
+    // test constructing the default session window
+    Window<TestMessageEnvelope, String, Collection<TestMessageEnvelope>, 
WindowOutput<String, Collection<TestMessageEnvelope>>> testWnd = Windows
+        .intoSessions(
+        TestMessageEnvelope::getKey);
+    assertTrue(testWnd instanceof SessionWindow);
+    Field wndKeyFuncField = 
SessionWindow.class.getDeclaredField("wndKeyFunction");
+    Field aggregatorField = SessionWindow.class.getDeclaredField("aggregator");
+    wndKeyFuncField.setAccessible(true);
+    aggregatorField.setAccessible(true);
+    Function<TestMessageEnvelope, String> wndKeyFunc = 
(Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd);
+    assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", 
"test-value", 0)), "test-key");
+    BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, 
Collection<TestMessageEnvelope>> aggrFunc =
+        (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, 
Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd);
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    Collection<TestMessageEnvelope> collection = aggrFunc.apply(mockMsg, new 
ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains(mockMsg));
+
+    // test constructing the session window w/ customized session info
+    Window<TestMessageEnvelope, String, Collection<Character>, 
WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
+        m -> String.format("key-%d", m.getMessage().getEventTime()), m -> 
m.getMessage().getValue().charAt(0));
+    assertTrue(testWnd2 instanceof SessionWindow);
+    wndKeyFunc = (Function<TestMessageEnvelope, String>) 
wndKeyFuncField.get(testWnd2);
+    aggrFunc = (BiFunction<TestMessageEnvelope, 
Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) 
aggregatorField.get(testWnd2);
+    assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", 
"test-value", 0)), "key-0");
+    TestMessageEnvelope.MessageType mockInnerMessage = 
mock(TestMessageEnvelope.MessageType.class);
+    when(mockMsg.getMessage()).thenReturn(mockInnerMessage);
+    when(mockInnerMessage.getValue()).thenReturn("x-001");
+    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains('x'));
+
+    // test constructing session window w/ a default counter
+    Window<TestMessageEnvelope, String, Integer, WindowOutput<String, 
Integer>> testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getMessage().getEventTime()));
+    assertTrue(testCounter instanceof SessionWindow);
+    wndKeyFunc = (Function<TestMessageEnvelope, String>) 
wndKeyFuncField.get(testCounter);
+    BiFunction<TestMessageEnvelope, Integer, Integer> counterFn = 
(BiFunction<TestMessageEnvelope, Integer, Integer>) 
aggregatorField.get(testCounter);
+    when(mockMsg.getMessage().getEventTime()).thenReturn(12345L);
+    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
+    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
+  }
+
+  @Test
+  public void testSetTriggers() throws NoSuchFieldException, 
IllegalAccessException {
+    Window<TestMessageEnvelope, String, Integer, WindowOutput<String, 
Integer>> testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getMessage().getEventTime()));
+    // test session window w/ a trigger
+    TriggerBuilder<TestMessageEnvelope, Integer> triggerBuilder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
+    testCounter.setTriggers(triggerBuilder);
+    Trigger<TestMessageEnvelope, WindowState<Integer>> expectedTrigger = 
triggerBuilder.build();
+    Trigger<TestMessageEnvelope, WindowState<Integer>> actualTrigger = 
Windows.getInternalWindowFn(testCounter).getTrigger();
+    // examine all trigger fields are expected
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field earlyTriggerUpdater = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdater = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdater.setAccessible(true);
+    lateTriggerUpdater.setAccessible(true);
+    assertEquals(earlyTriggerField.get(expectedTrigger), 
earlyTriggerField.get(actualTrigger));
+    assertEquals(lateTriggerField.get(expectedTrigger), 
lateTriggerField.get(actualTrigger));
+    assertEquals(timerTriggerField.get(expectedTrigger), 
timerTriggerField.get(actualTrigger));
+    assertEquals(earlyTriggerUpdater.get(expectedTrigger), 
earlyTriggerUpdater.get(actualTrigger));
+    assertEquals(lateTriggerUpdater.get(expectedTrigger), 
lateTriggerUpdater.get(actualTrigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
new file mode 100644
index 0000000..231d3f5
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+
+/**
+ * The implementation for input/output {@link MessageStream}s to/from the 
operators.
+ * Users use the {@link MessageStream} API methods to describe and chain the 
operators specs.
+ *
+ * @param <M>  type of {@link MessageEnvelope}s in this {@link MessageStream}
+ */
+public class MessageStreamImpl<M extends MessageEnvelope> implements 
MessageStream<M> {
+
+  /**
+   * The set of operators that consume the {@link MessageEnvelope}s in this 
{@link MessageStream}
+   */
+  private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
+
+  @Override
+  public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> 
mapFn) {
+    OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperator(m -> new 
ArrayList<OM>() { {
+        OM r = mapFn.apply(m);
+        if (r != null) {
+          this.add(r);
+        }
+      } });
+    this.registeredOperatorSpecs.add(op);
+    return op.getOutputStream();
+  }
+
+  @Override
+  public <OM extends MessageEnvelope> MessageStream<OM> 
flatMap(FlatMapFunction<M, OM> flatMapFn) {
+    OperatorSpec<OM> op = OperatorSpecs.createStreamOperator(flatMapFn);
+    this.registeredOperatorSpecs.add(op);
+    return op.getOutputStream();
+  }
+
+  @Override
+  public MessageStream<M> filter(FilterFunction<M> filterFn) {
+    OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperator(t -> new 
ArrayList<M>() { {
+        if (filterFn.apply(t)) {
+          this.add(t);
+        }
+      } });
+    this.registeredOperatorSpecs.add(op);
+    return op.getOutputStream();
+  }
+
+  @Override
+  public void sink(SinkFunction<M> sinkFn) {
+    this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperator(sinkFn));
+  }
+
+  @Override
+  public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> 
MessageStream<WM> window(
+      Window<M, WK, WV, WM> window) {
+    OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperator((WindowFn<M, 
WK, WS, WM>) window.getInternalWindowFn());
+    this.registeredOperatorSpecs.add(wndOp);
+    return wndOp.getOutputStream();
+  }
+
+  @Override
+  public <K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> 
MessageStream<RM> join(
+      MessageStream<JM> otherStream, JoinFunction<M, JM, RM> joinFn) {
+    MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>();
+
+    BiFunction<M, JM, RM> parJoin1 = joinFn::apply;
+    BiFunction<JM, M, RM> parJoin2 = (m, t1) -> joinFn.apply(t1, m);
+
+    // TODO: need to add default store functions for the two partial join 
functions
+
+    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(
+        OperatorSpecs.createPartialJoinOperator(parJoin2, outputStream));
+    
this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperator(parJoin1,
 outputStream));
+    return outputStream;
+  }
+
+  @Override
+  public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
+    MessageStreamImpl<M> outputStream = new MessageStreamImpl<>();
+
+    otherStreams.add(this);
+    otherStreams.forEach(other ->
+        ((MessageStreamImpl<M>) 
other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperator(outputStream)));
+    return outputStream;
+  }
+
+  /**
+   * Gets the operator specs registered to consume the output of this {@link 
MessageStream}. This is an internal API and
+   * should not be exposed to users.
+   *
+   * @return  a collection containing all {@link OperatorSpec}s that are 
registered with this {@link MessageStream}.
+   */
+  public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
+    return Collections.unmodifiableSet(this.registeredOperatorSpecs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java 
b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
new file mode 100644
index 0000000..2572f14
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The base class for all state stores
+ */
+public class StateStoreImpl<M extends MessageEnvelope, SK, SS> {
+  private final String storeName;
+  private final StoreFunctions<M, SK, SS> storeFunctions;
+  private KeyValueStore<SK, SS> kvStore = null;
+
+  public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) {
+    this.storeFunctions = store;
+    this.storeName = storeName;
+  }
+
+  public void init(TaskContext context) {
+    this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName);
+  }
+
+  public Entry<SK, SS> getState(M m) {
+    SK key = this.storeFunctions.getStoreKeyFn().apply(m);
+    SS state = this.kvStore.get(key);
+    return new Entry<>(key, state);
+  }
+
+  public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) {
+    SS newValue = this.storeFunctions.getStateUpdaterFn().apply(m, 
oldEntry.getValue());
+    this.kvStore.put(oldEntry.getKey(), newValue);
+    return new Entry<>(oldEntry.getKey(), newValue);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
new file mode 100644
index 0000000..152cd92
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.operators.impl.OperatorImpls;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * An {@link StreamTask} implementation that receives {@link 
IncomingSystemMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link 
StreamOperatorTask#transform(Map)} using the
+ * {@link MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together 
and feeds the
+ * {@link IncomingSystemMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented {@link StreamOperatorTask}. 
When its own {@link #init(Config, TaskContext)}
+ * method is called during startup, it creates a {@link MessageStreamImpl} 
corresponding to each of its input
+ * {@link SystemStreamPartition}s and then calls the user's {@link 
StreamOperatorTask#transform(Map)} method.
+ * <p>
+ * When users invoke the methods on the {@link MessageStream} API to describe 
their stream transformations in the
+ * {@link StreamOperatorTask#transform(Map)} method, the underlying {@link 
MessageStreamImpl} creates the
+ * corresponding {@link org.apache.samza.operators.spec.OperatorSpec} to 
record information about the desired
+ * transformation, and returns the output {@link MessageStream} to allow 
further transform chaining.
+ * <p>
+ * Once the user's transformation DAGs have been described for all {@link 
MessageStream}s (i.e., when the
+ * {@link StreamOperatorTask#transform(Map)} call returns), it calls
+ * {@link OperatorImpls#createOperatorImpls(MessageStreamImpl, TaskContext)} 
for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link 
org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link 
org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link 
SystemStreamPartition}, it can pass the message
+ * envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, 
MessageCollector, TaskCoordinator)} along
+ * to the appropriate root nodes. From then on, each {@link 
org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link 
org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorAdaptorTask implements StreamTask, 
InitableTask, WindowableTask {
+
+  /**
+   * A mapping from each {@link SystemStreamPartition} to the root node of its 
operator chain DAG.
+   */
+  private final Map<SystemStreamPartition, 
OperatorImpl<IncomingSystemMessageEnvelope, ? extends MessageEnvelope>> 
operatorChains = new HashMap<>();
+
+  private final StreamOperatorTask userTask;
+
+  public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
+    this.userTask = userTask;
+  }
+
+  @Override
+  public final void init(Config config, TaskContext context) throws Exception {
+    if (this.userTask instanceof InitableTask) {
+      ((InitableTask) this.userTask).init(config, context);
+    }
+    Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> 
messageStreams = new HashMap<>();
+    context.getSystemStreamPartitions().forEach(ssp -> messageStreams.put(ssp, 
new MessageStreamImpl<>()));
+    this.userTask.transform(messageStreams);
+    messageStreams.forEach((ssp, ms) ->
+        operatorChains.put(ssp, 
OperatorImpls.createOperatorImpls((MessageStreamImpl<IncomingSystemMessageEnvelope>)
 ms, context)));
+  }
+
+  @Override
+  public final void process(IncomingMessageEnvelope ime, MessageCollector 
collector, TaskCoordinator coordinator) {
+    this.operatorChains.get(ime.getSystemStreamPartition())
+        .onNext(new IncomingSystemMessageEnvelope(ime), collector, 
coordinator);
+  }
+
+  @Override
+  public final void window(MessageCollector collector, TaskCoordinator 
coordinator) throws Exception {
+    if (this.userTask instanceof WindowableTask) {
+      ((WindowableTask) this.userTask).window(collector, coordinator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
deleted file mode 100644
index 1eee2dc..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.Operator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * Implementation class for a chain of operators from the single input {@code 
source}
- *
- * @param <M>  type of message in the input stream {@code source}
- */
-public class ChainedOperators<M extends Message> {
-
-  private final Set<OperatorImpl> subscribers = new HashSet<>();
-
-  /**
-   * Private constructor
-   *
-   * @param source  the input source {@link MessageStream}
-   * @param context  the {@link TaskContext} object that we need to 
instantiate the state stores
-   */
-  private ChainedOperators(MessageStream<M> source, TaskContext context) {
-    // create the pipeline/topology starting from source
-    source.getSubscribers().forEach(sub -> {
-        // pass in the context s.t. stateful stream operators can initialize 
their stores
-        OperatorImpl subImpl = this.createAndSubscribe(sub, source, context);
-        this.subscribers.add(subImpl);
-      });
-  }
-
-  /**
-   * Private function to recursively instantiate the implementation of 
operators and the chains
-   *
-   * @param operator  the operator that subscribe to {@code source}
-   * @param source  the source {@link MessageStream}
-   * @param context  the context of the task
-   * @return  the implementation object of the corresponding {@code operator}
-   */
-  private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator 
operator, MessageStream source,
-      TaskContext context) {
-    Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = 
OperatorFactory.getOperator(operator);
-    if (factoryEntry.getValue()) {
-      // The operator has already been instantiated and we do not need to 
traverse and create the subscribers any more.
-      return factoryEntry.getKey();
-    }
-    OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey();
-    MessageStream outStream = operator.getOutputStream();
-    Collection<Operator> subs = outStream.getSubscribers();
-    subs.forEach(sub -> {
-        OperatorImpl subImpl = this.createAndSubscribe(sub, 
operator.getOutputStream(), context);
-        opImpl.subscribe(subImpl);
-      });
-    // initialize the operator's state store
-    opImpl.init(source, context);
-    return opImpl;
-  }
-
-  /**
-   * Static method to create a {@link ChainedOperators} from the {@code 
source} stream
-   *
-   * @param source  the input source {@link MessageStream}
-   * @param context  the {@link TaskContext} object used to initialize the 
{@link StateStoreImpl}
-   * @param <M>  the type of input {@link Message}
-   * @return a {@link ChainedOperators} object takes the {@code source} as 
input
-   */
-  public static <M extends Message> ChainedOperators create(MessageStream<M> 
source, TaskContext context) {
-    return new ChainedOperators<>(source, context);
-  }
-
-  /**
-   * Method to navigate the incoming {@code message} through the processing 
chains
-   *
-   * @param message  the incoming message to this {@link ChainedOperators}
-   * @param collector  the {@link MessageCollector} object within the process 
context
-   * @param coordinator  the {@link TaskCoordinator} object within the process 
context
-   */
-  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
-    this.subscribers.forEach(sub -> sub.onNext(message, collector, 
coordinator));
-  }
-
-  /**
-   * Method to handle timer events
-   *
-   * @param collector  the {@link MessageCollector} object within the process 
context
-   * @param coordinator  the {@link TaskCoordinator} object within the process 
context
-   */
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
-    long nanoTime = System.nanoTime();
-    this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, 
coordinator));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
deleted file mode 100644
index ea90878..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.*;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
-import org.apache.samza.operators.impl.window.SessionWindowImpl;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * The factory class that instantiates all implementation of {@link 
OperatorImpl} classes.
- */
-public class OperatorFactory {
-
-  /**
-   * the static operatorMap that includes all operator implementation instances
-   */
-  private static final Map<Operator, OperatorImpl<? extends Message, ? extends 
Message>> OPERATOR_MAP = new ConcurrentHashMap<>();
-
-  /**
-   * The method to actually create the implementation instances of operators
-   *
-   * @param operator  the immutable definition of {@link Operator}
-   * @param <M>  type of input {@link Message}
-   * @param <RM>  type of output {@link Message}
-   * @return  the implementation object of {@link OperatorImpl}
-   */
-  private static <M extends Message, RM extends Message> OperatorImpl<M, ? 
extends Message> createOperator(Operator<RM> operator) {
-    if (operator instanceof StreamOperator) {
-      return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator);
-    } else if (operator instanceof SinkOperator) {
-      return new SinkOperatorImpl<>((SinkOperator<M>) operator);
-    } else if (operator instanceof WindowOperator) {
-      return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends 
WindowState, ? extends WindowOutput>) operator);
-    } else if (operator instanceof PartialJoinOperator) {
-      return new PartialJoinOpImpl<>((PartialJoinOperator) operator);
-    }
-    throw new IllegalArgumentException(
-        String.format("The type of operator is not supported. Operator class 
name: %s", operator.getClass().getName()));
-  }
-
-  /**
-   * The method to get the unique implementation instance of {@link Operator}
-   *
-   * @param operator  the {@link Operator} to instantiate
-   * @param <M>  type of input {@link Message}
-   * @param <RM>  type of output {@link Message}
-   * @return  A pair of entry that include the unique implementation instance 
to the {@code operator} and a boolean value indicating whether
-   *          the operator instance has already been created or not. True 
means the operator instance has already created, false means the operator
-   *          was not created.
-   */
-  public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, 
? extends Message>, Boolean> getOperator(Operator<RM> operator) {
-    if (!OPERATOR_MAP.containsKey(operator)) {
-      OperatorImpl<M, ? extends Message> operatorImpl = 
OperatorFactory.createOperator(operator);
-      if (OPERATOR_MAP.putIfAbsent(operator, operatorImpl) == null) {
-        return new Entry<OperatorImpl<M, ? extends Message>, 
Boolean>(operatorImpl, false) { };
-      }
-    }
-    return new Entry<OperatorImpl<M, ? extends Message>, 
Boolean>((OperatorImpl<M, ? extends Message>) OPERATOR_MAP.get(operator), true) 
{ };
-  }
-
-}

Reply via email to