http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java 
b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
new file mode 100644
index 0000000..489e5b8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.function.BiFunction;
+
+
+/**
+ * Defines an internal representation of a window function. This class SHOULD 
NOT be used by the programmer directly. It is used
+ * by the internal representation and implementation classes in operators.
+ *
+ * @param <M> type of input stream {@link Message} for window
+ * @param <WK>  type of window key in the output {@link Message}
+ * @param <WS>  type of {@link WindowState} variable in the state store
+ * @param <WM>  type of the message in the output stream
+ */
+public interface WindowFn<M extends Message, WK, WS extends WindowState, WM 
extends WindowOutput<WK, ?>> {
+
+  /**
+   * get the transformation function of the {@link WindowFn}
+   *
+   * @return  the transformation function takes type {@code M} message and the 
window state entry, then transform to an {@link WindowOutput}
+   */
+  BiFunction<M, Entry<WK, WS>, WM> getTransformFunc();
+
+  /**
+   * get the state store functions for this {@link WindowFn}
+   *
+   * @return  the collection of state store methods
+   */
+  Operators.StoreFunctions<M, WK, WS> getStoreFuncs();
+
+  /**
+   * get the trigger conditions for this {@link WindowFn}
+   *
+   * @return  the trigger condition for the {@link WindowFn} function
+   */
+  Trigger<M, WS> getTrigger();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java 
b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
new file mode 100644
index 0000000..643b703
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.data.Message;
+
+
+/**
+ * This class defines the specific type of output messages from a {@link 
Operators.WindowOperator} function
+ *
+ * @param <K>  the type of key in the output window result
+ * @param <M>  the type of value in the output window result
+ */
+public final class WindowOutput<K, M> implements Message<K, M> {
+  private final K key;
+  private final M value;
+
+  WindowOutput(K key, M aggregated) {
+    this.key = key;
+    this.value = aggregated;
+  }
+
+  @Override public M getMessage() {
+    return this.value;
+  }
+
+  @Override public K getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return 0;
+  }
+
+  static public <K, M> WindowOutput<K, M> of(K key, M result) {
+    return new WindowOutput<>(key, result);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
 
b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
new file mode 100644
index 0000000..42c8f74
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.task;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+
+import java.util.Collection;
+
+/**
+ * This interface defines the methods that user needs to implement via the 
operator programming APIs.
+ */
[email protected]
+public interface StreamOperatorTask {
+
+  /**
+   * Defines the method for users to initialize the operator chains consuming 
from all {@link SystemMessageStream}s.
+   * Users have to implement this function to define their transformation 
logic on each of the incoming
+   * {@link SystemMessageStream}.
+   *
+   * Note that each {@link SystemMessageStream} corresponds to an input {@link 
org.apache.samza.system.SystemStreamPartition}
+   *
+   * @param sources  the collection of {@link SystemMessageStream}s that takes 
{@link org.apache.samza.operators.data.IncomingSystemMessage}
+   *                 from a {@link 
org.apache.samza.system.SystemStreamPartition}
+   */
+  void initOperators(Collection<SystemMessageStream> sources);
+
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java 
b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
index 5d066c5..7d9d56c 100644
--- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
+++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
@@ -19,12 +19,13 @@
 
 package org.apache.samza.config;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
 
-import java.util.Map;
 import java.util.HashMap;
+import java.util.Map;
 
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class TestConfig {
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
new file mode 100644
index 0000000..8c56287
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+
+
+public class TestMessage implements Message<String, String> {
+
+  private final String key;
+  private final String value;
+  private final long timestamp;
+
+  TestMessage(String key, String value, long timestamp) {
+    this.key = key;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  @Override public String getMessage() {
+    return this.value;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
new file mode 100644
index 0000000..4dbe233
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.internal.Operators.*;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStream {
+
+  @Test public void testMap() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Function<TestMessage, TestOutputMessage> xMap = m -> new 
TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 
2);
+    MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestOutputMessage> mapOp = subs.iterator().next();
+    assertTrue(mapOp instanceof StreamOperator);
+    assertEquals(mapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    TestMessage xTestMsg = mock(TestMessage.class);
+    when(xTestMsg.getKey()).thenReturn("test-msg-key");
+    when(xTestMsg.getMessage()).thenReturn("123456789");
+    when(xTestMsg.getTimestamp()).thenReturn(12345L);
+    Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, 
TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
+    assertEquals(cOutputMsg.size(), 1);
+    TestOutputMessage outputMessage = cOutputMsg.iterator().next();
+    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+    assertEquals(outputMessage.getMessage(), 
Integer.valueOf(xTestMsg.getMessage().length() + 1));
+    assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
+  }
+
+  @Test public void testFlatMap() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() { {
+        this.add(mock(TestOutputMessage.class));
+        this.add(mock(TestOutputMessage.class));
+        this.add(mock(TestOutputMessage.class));
+      } };
+    Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> 
flatOuts;
+    MessageStream<TestOutputMessage> outputStream = 
inputStream.flatMap(xFlatMap);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
+    assertTrue(flatMapOp instanceof StreamOperator);
+    assertEquals(flatMapOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) 
flatMapOp).getFunction(), xFlatMap);
+  }
+
+  @Test public void testFilter() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
+    MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> filterOp = subs.iterator().next();
+    assertTrue(filterOp instanceof StreamOperator);
+    assertEquals(filterOp.getOutputStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    Function<TestMessage, Collection<TestMessage>> txfmFn = 
((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
+    TestMessage mockMsg = mock(TestMessage.class);
+    when(mockMsg.getTimestamp()).thenReturn(11111L);
+    Collection<TestMessage> output = txfmFn.apply(mockMsg);
+    assertTrue(output.isEmpty());
+    when(mockMsg.getTimestamp()).thenReturn(999999L);
+    output = txfmFn.apply(mockMsg);
+    assertEquals(output.size(), 1);
+    assertEquals(output.iterator().next(), mockMsg);
+  }
+
+  @Test public void testSink() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, 
TaskCoordinator> xSink = (m, mc, tc) -> {
+      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", 
"test-stream"), m.getMessage()));
+      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    };
+    inputStream.sink(xSink);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> sinkOp = subs.iterator().next();
+    assertTrue(sinkOp instanceof SinkOperator);
+    assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
+    assertNull(((SinkOperator) sinkOp).getOutputStream());
+  }
+
+  @Test public void testWindow() {
+    MessageStream<TestMessage> inputStream = new MessageStream<>();
+    Windows.SessionWindow<TestMessage, String, Integer> window = 
mock(Windows.SessionWindow.class);
+    MessageStream<WindowOutput<String, Integer>> outStream = 
inputStream.window(window);
+    Collection<Operator> subs = inputStream.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> wndOp = subs.iterator().next();
+    assertTrue(wndOp instanceof WindowOperator);
+    assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
+  }
+
+  @Test public void testJoin() {
+    MessageStream<TestMessage> source1 = new MessageStream<>();
+    MessageStream<TestMessage> source2 = new MessageStream<>();
+    BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) 
-> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + 
m2.getMessage().length(), m1.getTimestamp());
+    MessageStream<TestOutputMessage> joinOutput = source1.join(source2, 
joiner);
+    Collection<Operator> subs = source1.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> joinOp1 = subs.iterator().next();
+    assertTrue(joinOp1 instanceof PartialJoinOperator);
+    assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), 
joinOutput);
+    subs = source2.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> joinOp2 = subs.iterator().next();
+    assertTrue(joinOp2 instanceof PartialJoinOperator);
+    assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), 
joinOutput);
+    TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 
11111L);
+    TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 
22222L);
+    TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) 
joinOp1).getFunction().apply(joinMsg1, joinMsg2);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    assertEquals(xOut.getTimestamp(), 11111L);
+    xOut = (TestOutputMessage) ((PartialJoinOperator) 
joinOp2).getFunction().apply(joinMsg2, joinMsg1);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    assertEquals(xOut.getTimestamp(), 11111L);
+  }
+
+  @Test public void testMerge() {
+    MessageStream<TestMessage> merge1 = new MessageStream<>();
+    Collection<MessageStream<TestMessage>> others = new 
ArrayList<MessageStream<TestMessage>>() { {
+        this.add(new MessageStream<>());
+        this.add(new MessageStream<>());
+      } };
+    MessageStream<TestMessage> mergeOutput = merge1.merge(others);
+    validateMergeOperator(merge1, mergeOutput);
+
+    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  }
+
+  private void validateMergeOperator(MessageStream<TestMessage> mergeSource, 
MessageStream<TestMessage> mergeOutput) {
+    Collection<Operator> subs = mergeSource.getSubscribers();
+    assertEquals(subs.size(), 1);
+    Operator<TestMessage> mergeOp = subs.iterator().next();
+    assertTrue(mergeOp instanceof StreamOperator);
+    assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
+    TestMessage mockMsg = mock(TestMessage.class);
+    Collection<TestMessage> outputs = ((StreamOperator<TestMessage, 
TestMessage>) mergeOp).getFunction().apply(mockMsg);
+    assertEquals(outputs.size(), 1);
+    assertEquals(outputs.iterator().next(), mockMsg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
new file mode 100644
index 0000000..c5fcceb
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestMessageStreams {
+
+  @Test public void testInput() {
+    SystemStreamPartition ssp = new SystemStreamPartition("my-system", 
"my-stream", new Partition(0));
+    MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp);
+    assertEquals(mSysStream.getSystemStreamPartition(), ssp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
new file mode 100644
index 0000000..14e6562
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+
+
+public class TestOutputMessage implements Message<String, Integer> {
+  private final String key;
+  private final Integer value;
+  private final long timestamp;
+
+  public TestOutputMessage(String key, Integer value, long timestamp) {
+    this.key = key;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  @Override public Integer getMessage() {
+    return this.value;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
new file mode 100644
index 0000000..927b14b
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTriggerBuilder {
+  private Field earlyTriggerField;
+  private Field lateTriggerField;
+  private Field timerTriggerField;
+  private Field earlyTriggerUpdater;
+  private Field lateTriggerUpdater;
+
+  @Before
+  public void testPrep() throws Exception {
+    this.earlyTriggerField = 
TriggerBuilder.class.getDeclaredField("earlyTrigger");
+    this.lateTriggerField = 
TriggerBuilder.class.getDeclaredField("lateTrigger");
+    this.timerTriggerField = 
TriggerBuilder.class.getDeclaredField("timerTrigger");
+    this.earlyTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
+    this.lateTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
+
+    this.earlyTriggerField.setAccessible(true);
+    this.lateTriggerField.setAccessible(true);
+    this.timerTriggerField.setAccessible(true);
+    this.earlyTriggerUpdater.setAccessible(true);
+    this.lateTriggerUpdater.setAccessible(true);
+  }
+
+  @Test public void testStaticCreators() throws NoSuchFieldException, 
IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
triggerField =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    when(mockState.getNumberMessages()).thenReturn(2000L);
+    assertTrue(triggerField.apply(null, mockState));
+
+    Function<TestMessage, Boolean> tokenFunc = m -> true;
+    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
+    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    TestMessage m = mock(TestMessage.class);
+    assertTrue(triggerField.apply(m, mockState));
+
+    builder = 
TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
+    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
+    when(m.getTimestamp()).thenReturn(19999000000L);
+    assertFalse(triggerField.apply(m, mockState));
+    when(m.getTimestamp()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+    when(m.getTimestamp()).thenReturn(1001000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
mockFunc = mock(BiFunction.class);
+    builder = TriggerBuilder.earlyTrigger(mockFunc);
+    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    assertEquals(triggerField, mockFunc);
+
+    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
+    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
+    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
+    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second 
to fire up the timerTrigger before assertion
+    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test public void testAddTimerTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addTimeoutSinceFirstMessage(10000L);
+    // exam that both earlyTrigger and timer triggers are set up
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
triggerField =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    // check the timer trigger
+    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
+    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    // exam that both early trigger and timer triggers are set up
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    builder.addTimeoutSinceLastMessage(20000L);
+    // check the timer trigger
+    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
+    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test public void testAddLateTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTriggerOnSizeLimit(10000L);
+    // exam that both earlyTrigger and lateTriggers are set up
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
earlyTrigger =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // check the late trigger
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
lateTrigger =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    // set the number of messages to 10001 to trigger the late trigger
+    when(mockState.getNumberMessages()).thenReturn(10001L);
+    assertTrue(lateTrigger.apply(null, mockState));
+
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
+    // exam that both earlyTrigger and lateTriggers are set up
+    earlyTrigger = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // exam the lateTrigger
+    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
+    lateTrigger = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    List<TestMessage> mockList = mock(ArrayList.class);
+    when(mockList.size()).thenReturn(200);
+    when(mockState.getOutputValue()).thenReturn(mockList);
+    assertTrue(lateTrigger.apply(null, mockState));
+  }
+
+  @Test public void testAddTriggerUpdater() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.onEarlyTrigger(c -> {
+        c.clear();
+        return c;
+      });
+    List<TestMessage> collection = new ArrayList<TestMessage>() { {
+        for (int i = 0; i < 10; i++) {
+          this.add(new TestMessage(String.format("key-%d", i), "string-value", 
System.nanoTime()));
+        }
+      } };
+    // exam that earlyTriggerUpdater is set up
+    Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
+        (Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    earlyTriggerUpdater.apply(mockState);
+    assertTrue(collection.isEmpty());
+
+    collection.add(new TestMessage("key-to-stay", "string-to-stay", 
System.nanoTime()));
+    collection.add(new TestMessage("key-to-remove", "string-to-remove", 
System.nanoTime()));
+    builder.onLateTrigger(c -> {
+        c.removeIf(t -> t.getKey().equals("key-to-remove"));
+        return c;
+      });
+    // check the late trigger updater
+    Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>> lateTriggerUpdater =
+        (Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    lateTriggerUpdater.apply(mockState);
+    assertTrue(collection.size() == 1);
+    assertFalse(collection.get(0).isDelete());
+    assertEquals(collection.get(0).getKey(), "key-to-stay");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java 
b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
new file mode 100644
index 0000000..8a25a96
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.Windows.Window;
+import org.apache.samza.operators.internal.Trigger;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWindows {
+
+  @Test public void testSessionWindows() throws NoSuchFieldException, 
IllegalAccessException {
+    // test constructing the default session window
+    Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, 
Collection<TestMessage>>> testWnd = Windows.intoSessions(
+        TestMessage::getKey);
+    assertTrue(testWnd instanceof Windows.SessionWindow);
+    Field wndKeyFuncField = 
Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
+    Field aggregatorField = 
Windows.SessionWindow.class.getDeclaredField("aggregator");
+    wndKeyFuncField.setAccessible(true);
+    aggregatorField.setAccessible(true);
+    Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testWnd);
+    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "test-key");
+    BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> 
aggrFunc =
+        (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd);
+    TestMessage mockMsg = mock(TestMessage.class);
+    Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new 
ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains(mockMsg));
+
+    // test constructing the session window w/ customized session info
+    Window<TestMessage, String, Collection<Character>, WindowOutput<String, 
Collection<Character>>> testWnd2 = Windows.intoSessions(
+        m -> String.format("key-%d", m.getTimestamp()), m -> 
m.getMessage().charAt(0));
+    assertTrue(testWnd2 instanceof Windows.SessionWindow);
+    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
+    aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd2);
+    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "key-0");
+    when(mockMsg.getMessage()).thenReturn("x-001");
+    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains('x'));
+
+    // test constructing session window w/ a default counter
+    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getTimestamp()));
+    assertTrue(testCounter instanceof Windows.SessionWindow);
+    wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testCounter);
+    BiFunction<TestMessage, Integer, Integer> counterFn = 
(BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
+    when(mockMsg.getTimestamp()).thenReturn(12345L);
+    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
+    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
+  }
+
+  @Test public void testSetTriggers() throws NoSuchFieldException, 
IllegalAccessException {
+    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getTimestamp()));
+    // test session window w/ a trigger
+    TriggerBuilder<TestMessage, Integer> triggerBuilder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
+    testCounter.setTriggers(triggerBuilder);
+    Trigger<TestMessage, WindowState<Integer>> expectedTrigger = 
triggerBuilder.build();
+    Trigger<TestMessage, WindowState<Integer>> actualTrigger = 
Windows.getInternalWindowFn(testCounter).getTrigger();
+    // examine all trigger fields are expected
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field earlyTriggerUpdater = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdater = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdater.setAccessible(true);
+    lateTriggerUpdater.setAccessible(true);
+    assertEquals(earlyTriggerField.get(expectedTrigger), 
earlyTriggerField.get(actualTrigger));
+    assertEquals(lateTriggerField.get(expectedTrigger), 
lateTriggerField.get(actualTrigger));
+    assertEquals(timerTriggerField.get(expectedTrigger), 
timerTriggerField.get(actualTrigger));
+    assertEquals(earlyTriggerUpdater.get(expectedTrigger), 
earlyTriggerUpdater.get(actualTrigger));
+    assertEquals(lateTriggerUpdater.get(expectedTrigger), 
lateTriggerUpdater.get(actualTrigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
new file mode 100644
index 0000000..b734e87
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestIncomingSystemMessage {
+
+  @Test public void testConstructor() {
+    IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
+    IncomingSystemMessage ism = new IncomingSystemMessage(ime);
+
+    Object mockKey = mock(Object.class);
+    Object mockValue = mock(Object.class);
+    LongOffset testOffset = new LongOffset("12345");
+    SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
+
+    when(ime.getKey()).thenReturn(mockKey);
+    when(ime.getMessage()).thenReturn(mockValue);
+    when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
+    when(ime.getOffset()).thenReturn("12345");
+
+    assertEquals(ism.getKey(), mockKey);
+    assertEquals(ism.getMessage(), mockValue);
+    assertEquals(ism.getSystemStreamPartition(), mockSsp);
+    assertEquals(ism.getOffset(), testOffset);
+    assertFalse(ism.isDelete());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
new file mode 100644
index 0000000..943c47f
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+
+public class TestLongOffset {
+
+  @Test public void testConstructor() throws Exception {
+    LongOffset o1 = new LongOffset("12345");
+    Field offsetField = LongOffset.class.getDeclaredField("offset");
+    offsetField.setAccessible(true);
+    Long x = (Long) offsetField.get(o1);
+    assertEquals(x.longValue(), 12345L);
+
+    o1 = new LongOffset("012345");
+    x = (Long) offsetField.get(o1);
+    assertEquals(x.longValue(), 12345L);
+
+    try {
+      o1 = new LongOffset("xyz");
+      fail("Constructor of LongOffset should have failed w/ mal-formatted 
numbers");
+    } catch (NumberFormatException nfe) {
+      // expected
+    }
+  }
+
+  @Test public void testComparator() {
+    LongOffset o1 = new LongOffset("11111");
+    Offset other = mock(Offset.class);
+    try {
+      o1.compareTo(other);
+      fail("compareTo() should have have failed when comparing to an object of 
a different class");
+    } catch (IllegalArgumentException iae) {
+      // expected
+    }
+
+    LongOffset o2 = new LongOffset("-10000");
+    assertEquals(o1.compareTo(o2), 1);
+    LongOffset o3 = new LongOffset("22222");
+    assertEquals(o1.compareTo(o3), -1);
+    LongOffset o4 = new LongOffset("11111");
+    assertEquals(o1.compareTo(o4), 0);
+  }
+
+  @Test public void testEquals() {
+    LongOffset o1 = new LongOffset("12345");
+    Offset other = mock(Offset.class);
+    assertFalse(o1.equals(other));
+
+    LongOffset o2 = new LongOffset("0012345");
+    assertTrue(o1.equals(o2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
new file mode 100644
index 0000000..d994486
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperators {
+
+  private class TestMessage implements Message<String, Object> {
+    private final long timestamp;
+    private final String key;
+    private final Object msg;
+
+
+    TestMessage(String key, Object msg, long timestamp) {
+      this.timestamp = timestamp;
+      this.key = key;
+      this.msg = msg;
+    }
+
+    @Override public Object getMessage() {
+      return this.msg;
+    }
+
+    @Override public String getKey() {
+      return this.key;
+    }
+
+    @Override public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  @Test public void testGetStreamOperator() {
+    Function<Message, Collection<TestMessage>> transformFn = m -> new 
ArrayList<TestMessage>() { {
+        this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 
12345L));
+      } };
+    Operators.StreamOperator<Message, TestMessage> strmOp = 
Operators.getStreamOperator(transformFn);
+    assertEquals(strmOp.getFunction(), transformFn);
+    assertTrue(strmOp.getOutputStream() instanceof MessageStream);
+  }
+
+  @Test public void testGetSinkOperator() {
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, 
TaskCoordinator> sinkFn = (m, c, t) -> { };
+    Operators.SinkOperator<TestMessage> sinkOp = 
Operators.getSinkOperator(sinkFn);
+    assertEquals(sinkOp.getFunction(), sinkFn);
+    assertTrue(sinkOp.getOutputStream() == null);
+  }
+
+  @Test public void testGetWindowOperator() {
+    WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, 
Integer>> windowFn = mock(WindowFn.class);
+    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, 
WindowOutput<String, Integer>> xFunction = (m, e) -> null;
+    Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> 
storeFns = mock(Operators.StoreFunctions.class);
+    Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
+    MessageStream<TestMessage> mockInput = mock(MessageStream.class);
+    when(windowFn.getTransformFunc()).thenReturn(xFunction);
+    when(windowFn.getStoreFuncs()).thenReturn(storeFns);
+    when(windowFn.getTrigger()).thenReturn(trigger);
+    when(mockInput.toString()).thenReturn("mockStream1");
+
+    Operators.WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
+    assertEquals(windowOp.getFunction(), xFunction);
+    assertEquals(windowOp.getStoreFunctions(), storeFns);
+    assertEquals(windowOp.getTrigger(), trigger);
+    assertEquals(windowOp.getStoreName(mockInput), 
String.format("input-mockStream1-wndop-%s", windowOp.toString()));
+  }
+
+  @Test public void testGetPartialJoinOperator() {
+    BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
+        (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
+            Math.max(m1.getTimestamp(), m2.getTimestamp()));
+    MessageStream<TestMessage> joinOutput = new MessageStream<>();
+    Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, 
?>, TestMessage> partialJoin =
+        Operators.getPartialJoinOperator(merger, joinOutput);
+
+    assertEquals(partialJoin.getOutputStream(), joinOutput);
+    Message<Object, Object> m = mock(Message.class);
+    Message<Object, Object> s = mock(Message.class);
+    assertEquals(partialJoin.getFunction(), merger);
+    
assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
+    
assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), 
m);
+    
assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
+    assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
+  }
+
+  @Test public void testGetMergeOperator() {
+    MessageStream<TestMessage> output = new MessageStream<>();
+    Operators.StreamOperator<TestMessage, TestMessage> mergeOp = 
Operators.getMergeOperator(output);
+    Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new 
ArrayList<TestMessage>() { {
+        this.add(t);
+      } };
+    TestMessage t = mock(TestMessage.class);
+    assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
+    assertEquals(mergeOp.getOutputStream(), output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
new file mode 100644
index 0000000..0f35a7c
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestTrigger {
+
+  @Test public void testConstructor() throws Exception {
+    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
+    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
lateTrigger = (m, s) -> s.getOutputValue() > 1000;
+    Function<WindowState<Integer>, Boolean> timerTrigger = s -> 
TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < 
System.currentTimeMillis();
+    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = 
s -> {
+      s.setOutputValue(0);
+      return s;
+    };
+    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = 
s -> {
+      s.setOutputValue(1);
+      return s;
+    };
+
+    Trigger<Message<Object, Object>, WindowState<Integer>> trigger = 
Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
+        earlyTriggerUpdater, lateTriggerUpdater);
+
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field earlyTriggerUpdaterField = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdaterField = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdaterField.setAccessible(true);
+    lateTriggerUpdaterField.setAccessible(true);
+
+    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
+    assertEquals(timerTrigger, timerTriggerField.get(trigger));
+    assertEquals(lateTrigger, lateTriggerField.get(trigger));
+    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
+    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
new file mode 100644
index 0000000..268c9fc
--- /dev/null
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestWindowOutput {
+
+  @Test public void testConstructor() {
+    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
+    assertEquals(wndOutput.getKey(), "testMsg");
+    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+    assertFalse(wndOutput.isDelete());
+    assertEquals(wndOutput.getTimestamp(), 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/README.md
----------------------------------------------------------------------
diff --git a/samza-operator/README.md b/samza-operator/README.md
new file mode 100644
index 0000000..15d2092
--- /dev/null
+++ b/samza-operator/README.md
@@ -0,0 +1,17 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+samza-operator is an experimental module that is under development (SAMZA-552).

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
new file mode 100644
index 0000000..1eee2dc
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.Operator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Implementation class for a chain of operators from the single input {@code 
source}
+ *
+ * @param <M>  type of message in the input stream {@code source}
+ */
+public class ChainedOperators<M extends Message> {
+
+  private final Set<OperatorImpl> subscribers = new HashSet<>();
+
+  /**
+   * Private constructor
+   *
+   * @param source  the input source {@link MessageStream}
+   * @param context  the {@link TaskContext} object that we need to 
instantiate the state stores
+   */
+  private ChainedOperators(MessageStream<M> source, TaskContext context) {
+    // create the pipeline/topology starting from source
+    source.getSubscribers().forEach(sub -> {
+        // pass in the context s.t. stateful stream operators can initialize 
their stores
+        OperatorImpl subImpl = this.createAndSubscribe(sub, source, context);
+        this.subscribers.add(subImpl);
+      });
+  }
+
+  /**
+   * Private function to recursively instantiate the implementation of 
operators and the chains
+   *
+   * @param operator  the operator that subscribe to {@code source}
+   * @param source  the source {@link MessageStream}
+   * @param context  the context of the task
+   * @return  the implementation object of the corresponding {@code operator}
+   */
+  private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator 
operator, MessageStream source,
+      TaskContext context) {
+    Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = 
OperatorFactory.getOperator(operator);
+    if (factoryEntry.getValue()) {
+      // The operator has already been instantiated and we do not need to 
traverse and create the subscribers any more.
+      return factoryEntry.getKey();
+    }
+    OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey();
+    MessageStream outStream = operator.getOutputStream();
+    Collection<Operator> subs = outStream.getSubscribers();
+    subs.forEach(sub -> {
+        OperatorImpl subImpl = this.createAndSubscribe(sub, 
operator.getOutputStream(), context);
+        opImpl.subscribe(subImpl);
+      });
+    // initialize the operator's state store
+    opImpl.init(source, context);
+    return opImpl;
+  }
+
+  /**
+   * Static method to create a {@link ChainedOperators} from the {@code 
source} stream
+   *
+   * @param source  the input source {@link MessageStream}
+   * @param context  the {@link TaskContext} object used to initialize the 
{@link StateStoreImpl}
+   * @param <M>  the type of input {@link Message}
+   * @return a {@link ChainedOperators} object takes the {@code source} as 
input
+   */
+  public static <M extends Message> ChainedOperators create(MessageStream<M> 
source, TaskContext context) {
+    return new ChainedOperators<>(source, context);
+  }
+
+  /**
+   * Method to navigate the incoming {@code message} through the processing 
chains
+   *
+   * @param message  the incoming message to this {@link ChainedOperators}
+   * @param collector  the {@link MessageCollector} object within the process 
context
+   * @param coordinator  the {@link TaskCoordinator} object within the process 
context
+   */
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    this.subscribers.forEach(sub -> sub.onNext(message, collector, 
coordinator));
+  }
+
+  /**
+   * Method to handle timer events
+   *
+   * @param collector  the {@link MessageCollector} object within the process 
context
+   * @param coordinator  the {@link TaskCoordinator} object within the process 
context
+   */
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
+    long nanoTime = System.nanoTime();
+    this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, 
coordinator));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
new file mode 100644
index 0000000..ea90878
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.*;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
+import org.apache.samza.operators.impl.window.SessionWindowImpl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * The factory class that instantiates all implementation of {@link 
OperatorImpl} classes.
+ */
+public class OperatorFactory {
+
+  /**
+   * the static operatorMap that includes all operator implementation instances
+   */
+  private static final Map<Operator, OperatorImpl<? extends Message, ? extends 
Message>> OPERATOR_MAP = new ConcurrentHashMap<>();
+
+  /**
+   * The method to actually create the implementation instances of operators
+   *
+   * @param operator  the immutable definition of {@link Operator}
+   * @param <M>  type of input {@link Message}
+   * @param <RM>  type of output {@link Message}
+   * @return  the implementation object of {@link OperatorImpl}
+   */
+  private static <M extends Message, RM extends Message> OperatorImpl<M, ? 
extends Message> createOperator(Operator<RM> operator) {
+    if (operator instanceof StreamOperator) {
+      return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator);
+    } else if (operator instanceof SinkOperator) {
+      return new SinkOperatorImpl<>((SinkOperator<M>) operator);
+    } else if (operator instanceof WindowOperator) {
+      return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends 
WindowState, ? extends WindowOutput>) operator);
+    } else if (operator instanceof PartialJoinOperator) {
+      return new PartialJoinOpImpl<>((PartialJoinOperator) operator);
+    }
+    throw new IllegalArgumentException(
+        String.format("The type of operator is not supported. Operator class 
name: %s", operator.getClass().getName()));
+  }
+
+  /**
+   * The method to get the unique implementation instance of {@link Operator}
+   *
+   * @param operator  the {@link Operator} to instantiate
+   * @param <M>  type of input {@link Message}
+   * @param <RM>  type of output {@link Message}
+   * @return  A pair of entry that include the unique implementation instance 
to the {@code operator} and a boolean value indicating whether
+   *          the operator instance has already been created or not. True 
means the operator instance has already created, false means the operator
+   *          was not created.
+   */
+  public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, 
? extends Message>, Boolean> getOperator(Operator<RM> operator) {
+    if (!OPERATOR_MAP.containsKey(operator)) {
+      OperatorImpl<M, ? extends Message> operatorImpl = 
OperatorFactory.createOperator(operator);
+      if (OPERATOR_MAP.putIfAbsent(operator, operatorImpl) == null) {
+        return new Entry<OperatorImpl<M, ? extends Message>, 
Boolean>(operatorImpl, false) { };
+      }
+    }
+    return new Entry<OperatorImpl<M, ? extends Message>, 
Boolean>((OperatorImpl<M, ? extends Message>) OPERATOR_MAP.get(operator), true) 
{ };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
new file mode 100644
index 0000000..efa6a96
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.reactivestreams.Processor;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Abstract base class for all stream operator implementation classes.
+ */
+public abstract class OperatorImpl<M extends Message, RM extends Message>
+    implements Processor<ProcessorContext<M>, ProcessorContext<RM>> {
+
+  private final Set<Subscriber<? super ProcessorContext<RM>>> subscribers = 
new HashSet<>();
+
+  @Override public void subscribe(Subscriber<? super ProcessorContext<RM>> s) {
+    // Only add once
+    subscribers.add(s);
+  }
+
+  @Override public void onSubscribe(Subscription s) {
+
+  }
+
+  @Override public void onNext(ProcessorContext<M> o) {
+
+    onNext(o.getMessage(), o.getCollector(), o.getCoordinator());
+  }
+
+  @Override public void onError(Throwable t) {
+
+  }
+
+  @Override public void onComplete() {
+
+  }
+
+  /**
+   * Default method for timer event
+   *
+   * @param nanoTime  the system nano-second when the timer event is triggered
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  public void onTimer(long nanoTime, MessageCollector collector, 
TaskCoordinator coordinator) {
+    this.subscribers.forEach(sub -> ((OperatorImpl) sub).onTimer(nanoTime, 
collector, coordinator));
+  }
+
+  /**
+   * Each sub-class will implement this method to actually perform the 
transformation and call the downstream subscribers.
+   *
+   * @param message  the input {@link Message}
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  protected abstract void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator);
+
+  /**
+   * Stateful operators will need to override this method to initialize the 
operators
+   *
+   * @param source  the source that this {@link OperatorImpl} object subscribe 
to
+   * @param context  the task context to initialize the operators within
+   */
+  protected void init(MessageStream<M> source, TaskContext context) {};
+
+  /**
+   * Method to trigger all downstream operators that consumes the output 
{@link MessageStream}
+   * from this operator
+   *
+   * @param omsg  output {@link Message}
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  protected void nextProcessors(RM omsg, MessageCollector collector, 
TaskCoordinator coordinator) {
+    subscribers.forEach(sub ->
+      sub.onNext(new ProcessorContext<>(omsg, collector, coordinator))
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
new file mode 100644
index 0000000..cc7ef2b
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Wrapper class to be used by {@link OperatorImpl}
+ *
+ * @param <M>  Type of input stream {@link Message}
+ */
+public class ProcessorContext<M extends Message> {
+  private final M message;
+  private final MessageCollector collector;
+  private final TaskCoordinator coordinator;
+
+  ProcessorContext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    this.message = message;
+    this.collector = collector;
+    this.coordinator = coordinator;
+  }
+
+  M getMessage() {
+    return this.message;
+  }
+
+  MessageCollector getCollector() {
+    return this.collector;
+  }
+
+  TaskCoordinator getCoordinator() {
+    return this.coordinator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
new file mode 100644
index 0000000..b0f4f27
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.function.Function;
+
+
+/**
+ * Base class for all implementation of operators
+ *
+ * @param <M>  type of message in the input stream
+ * @param <RM>  type of message in the output stream
+ */
+public class SimpleOperatorImpl<M extends Message, RM extends Message> extends 
OperatorImpl<M, RM> {
+
+  private final Function<M, Collection<RM>> transformFn;
+
+  SimpleOperatorImpl(StreamOperator<M, RM> op) {
+    super();
+    this.transformFn = op.getFunction();
+  }
+
+  @Override protected void onNext(M imsg, MessageCollector collector, 
TaskCoordinator coordinator) {
+    // actually calling the transform function and then for each output, call 
nextProcessors()
+    this.transformFn.apply(imsg).forEach(r -> this.nextProcessors(r, 
collector, coordinator));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
new file mode 100644
index 0000000..a8a639e
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation for {@link SinkOperator}
+ */
+public class SinkOperatorImpl<M extends Message> extends OperatorImpl<M, 
Message> {
+  private final MessageStream.VoidFunction3<M, MessageCollector, 
TaskCoordinator> sinkFunc;
+
+  SinkOperatorImpl(SinkOperator<M> sinkOp) {
+    this.sinkFunc = sinkOp.getFunction();
+  }
+
+  @Override protected void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
+    this.sinkFunc.apply(message, collector, coordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
new file mode 100644
index 0000000..7840b5b
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The base class for all state stores
+ */
+public class StateStoreImpl<M extends Message, SK, SS> {
+  private final String storeName;
+  private final StoreFunctions<M, SK, SS> storeFunctions;
+  private KeyValueStore<SK, SS> kvStore = null;
+
+  public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) {
+    this.storeFunctions = store;
+    this.storeName = storeName;
+  }
+
+  public void init(TaskContext context) {
+    this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName);
+  }
+
+  public Entry<SK, SS> getState(M m) {
+    SK key = this.storeFunctions.getStoreKeyFinder().apply(m);
+    SS state = this.kvStore.get(key);
+    return new Entry<>(key, state);
+  }
+
+  public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) {
+    SS newValue = this.storeFunctions.getStateUpdater().apply(m, 
oldEntry.getValue());
+    this.kvStore.put(oldEntry.getKey(), newValue);
+    return new Entry<>(oldEntry.getKey(), newValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
new file mode 100644
index 0000000..4238d45
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.join;
+
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a {@link PartialJoinOperator}. This class implements 
function
+ * that only takes in one input stream among all inputs to the join and 
generate the join output.
+ *
+ * @param <M>  Type of input stream {@link Message}
+ * @param <RM>  Type of join output stream {@link Message}
+ */
+public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends 
Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> {
+
+  public PartialJoinOpImpl(PartialJoinOperator<M, K, JM, RM> joinOp) {
+    // TODO: implement PartialJoinOpImpl constructor
+  }
+
+  @Override protected void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
+    // TODO: implement PartialJoinOpImpl processing logic
+  }
+}

Reply via email to