http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
new file mode 100644
index 0000000..47a37dc
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.Windows.Window;
+import org.apache.samza.operators.api.internal.Trigger;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWindows {
+
+  @Test public void testSessionWindows() throws NoSuchFieldException, 
IllegalAccessException {
+    // test constructing the default session window
+    Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, 
Collection<TestMessage>>> testWnd = Windows.intoSessions(
+        TestMessage::getKey);
+    assertTrue(testWnd instanceof Windows.SessionWindow);
+    Field wndKeyFuncField = 
Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
+    Field aggregatorField = 
Windows.SessionWindow.class.getDeclaredField("aggregator");
+    wndKeyFuncField.setAccessible(true);
+    aggregatorField.setAccessible(true);
+    Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testWnd);
+    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "test-key");
+    BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> 
aggrFunc =
+        (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd);
+    TestMessage mockMsg = mock(TestMessage.class);
+    Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new 
ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains(mockMsg));
+
+    // test constructing the session window w/ customized session info
+    Window<TestMessage, String, Collection<Character>, WindowOutput<String, 
Collection<Character>>> testWnd2 = Windows.intoSessions(
+        m -> String.format("key-%d", m.getTimestamp()), m -> 
m.getMessage().charAt(0));
+    assertTrue(testWnd2 instanceof Windows.SessionWindow);
+    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
+    aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd2);
+    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "key-0");
+    when(mockMsg.getMessage()).thenReturn("x-001");
+    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+    assertTrue(collection.size() == 1);
+    assertTrue(collection.contains('x'));
+
+    // test constructing session window w/ a default counter
+    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getTimestamp()));
+    assertTrue(testCounter instanceof Windows.SessionWindow);
+    wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testCounter);
+    BiFunction<TestMessage, Integer, Integer> counterFn = 
(BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
+    when(mockMsg.getTimestamp()).thenReturn(12345L);
+    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
+    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
+  }
+
+  @Test public void testSetTriggers() throws NoSuchFieldException, 
IllegalAccessException {
+    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
+        m -> String.format("key-%d", m.getTimestamp()));
+    // test session window w/ a trigger
+    TriggerBuilder<TestMessage, Integer> triggerBuilder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
+    testCounter.setTriggers(triggerBuilder);
+    Trigger<TestMessage, WindowState<Integer>> expectedTrigger = 
triggerBuilder.build();
+    Trigger<TestMessage, WindowState<Integer>> actualTrigger = 
Windows.getInternalWindowFn(testCounter).getTrigger();
+    // examine all trigger fields are expected
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field earlyTriggerUpdater = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdater = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdater.setAccessible(true);
+    lateTriggerUpdater.setAccessible(true);
+    assertEquals(earlyTriggerField.get(expectedTrigger), 
earlyTriggerField.get(actualTrigger));
+    assertEquals(lateTriggerField.get(expectedTrigger), 
lateTriggerField.get(actualTrigger));
+    assertEquals(timerTriggerField.get(expectedTrigger), 
timerTriggerField.get(actualTrigger));
+    assertEquals(earlyTriggerUpdater.get(expectedTrigger), 
earlyTriggerUpdater.get(actualTrigger));
+    assertEquals(lateTriggerUpdater.get(expectedTrigger), 
lateTriggerUpdater.get(actualTrigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
new file mode 100644
index 0000000..e953078
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestIncomingSystemMessage {
+
+  @Test public void testConstructor() {
+    IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
+    IncomingSystemMessage ism = new IncomingSystemMessage(ime);
+
+    Object mockKey = mock(Object.class);
+    Object mockValue = mock(Object.class);
+    LongOffset testOffset = new LongOffset("12345");
+    SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
+
+    when(ime.getKey()).thenReturn(mockKey);
+    when(ime.getMessage()).thenReturn(mockValue);
+    when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
+    when(ime.getOffset()).thenReturn("12345");
+
+    assertEquals(ism.getKey(), mockKey);
+    assertEquals(ism.getMessage(), mockValue);
+    assertEquals(ism.getSystemStreamPartition(), mockSsp);
+    assertEquals(ism.getOffset(), testOffset);
+    assertFalse(ism.isDelete());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
new file mode 100644
index 0000000..10775ec
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.data;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+
+public class TestLongOffset {
+
+  @Test public void testConstructor() throws Exception {
+    LongOffset o1 = new LongOffset("12345");
+    Field offsetField = LongOffset.class.getDeclaredField("offset");
+    offsetField.setAccessible(true);
+    Long x = (Long) offsetField.get(o1);
+    assertEquals(x.longValue(), 12345L);
+
+    o1 = new LongOffset("012345");
+    x = (Long) offsetField.get(o1);
+    assertEquals(x.longValue(), 12345L);
+
+    try {
+      o1 = new LongOffset("xyz");
+      fail("Constructor of LongOffset should have failed w/ mal-formatted 
numbers");
+    } catch (NumberFormatException nfe) {
+      // expected
+    }
+  }
+
+  @Test public void testComparator() {
+    LongOffset o1 = new LongOffset("11111");
+    Offset other = mock(Offset.class);
+    try {
+      o1.compareTo(other);
+      fail("compareTo() should have have failed when comparing to an object of 
a different class");
+    } catch (IllegalArgumentException iae) {
+      // expected
+    }
+
+    LongOffset o2 = new LongOffset("-10000");
+    assertEquals(o1.compareTo(o2), 1);
+    LongOffset o3 = new LongOffset("22222");
+    assertEquals(o1.compareTo(o3), -1);
+    LongOffset o4 = new LongOffset("11111");
+    assertEquals(o1.compareTo(o4), 0);
+  }
+
+  @Test public void testEquals() {
+    LongOffset o1 = new LongOffset("12345");
+    Offset other = mock(Offset.class);
+    assertFalse(o1.equals(other));
+
+    LongOffset o2 = new LongOffset("0012345");
+    assertTrue(o1.equals(o2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
new file mode 100644
index 0000000..65c37e9
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.internal;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperators {
+
+  private class TestMessage implements Message<String, Object> {
+    private final long timestamp;
+    private final String key;
+    private final Object msg;
+
+
+    TestMessage(String key, Object msg, long timestamp) {
+      this.timestamp = timestamp;
+      this.key = key;
+      this.msg = msg;
+    }
+
+    @Override public Object getMessage() {
+      return this.msg;
+    }
+
+    @Override public String getKey() {
+      return this.key;
+    }
+
+    @Override public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  @Test public void testGetStreamOperator() {
+    Function<Message, Collection<TestMessage>> transformFn = m -> new 
ArrayList<TestMessage>() {{
+      this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L));
+    }};
+    Operators.StreamOperator<Message, TestMessage> strmOp = 
Operators.getStreamOperator(transformFn);
+    assertEquals(strmOp.getFunction(), transformFn);
+    assertTrue(strmOp.getOutputStream() instanceof MessageStream);
+  }
+
+  @Test public void testGetSinkOperator() {
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, 
TaskCoordinator> sinkFn = (m, c, t) -> {};
+    Operators.SinkOperator<TestMessage> sinkOp = 
Operators.getSinkOperator(sinkFn);
+    assertEquals(sinkOp.getFunction(), sinkFn);
+    assertTrue(sinkOp.getOutputStream() == null);
+  }
+
+  @Test public void testGetWindowOperator() {
+    WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, 
Integer>> windowFn = mock(WindowFn.class);
+    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, 
WindowOutput<String, Integer>> xFunction = (m, e) -> null;
+    Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> 
storeFns = mock(Operators.StoreFunctions.class);
+    Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
+    MessageStream<TestMessage> mockInput = mock(MessageStream.class);
+    when(windowFn.getTransformFunc()).thenReturn(xFunction);
+    when(windowFn.getStoreFuncs()).thenReturn(storeFns);
+    when(windowFn.getTrigger()).thenReturn(trigger);
+    when(mockInput.toString()).thenReturn("mockStream1");
+
+    Operators.WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
+    assertEquals(windowOp.getFunction(), xFunction);
+    assertEquals(windowOp.getStoreFunctions(), storeFns);
+    assertEquals(windowOp.getTrigger(), trigger);
+    assertEquals(windowOp.getStoreName(mockInput), 
String.format("input-mockStream1-wndop-%s", windowOp.toString()));
+  }
+
+  @Test public void testGetPartialJoinOperator() {
+    BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
+        (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
+            Math.max(m1.getTimestamp(), m2.getTimestamp()));
+    MessageStream<TestMessage> joinOutput = new MessageStream<>();
+    Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, 
?>, TestMessage> partialJoin =
+        Operators.getPartialJoinOperator(merger, joinOutput);
+
+    assertEquals(partialJoin.getOutputStream(), joinOutput);
+    Message<Object, Object> m = mock(Message.class);
+    Message<Object, Object> s = mock(Message.class);
+    assertEquals(partialJoin.getFunction(), merger);
+    
assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
+    
assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), 
m);
+    
assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
+    assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
+  }
+
+  @Test public void testGetMergeOperator() {
+    MessageStream<TestMessage> output = new MessageStream<>();
+
+    Operators.StreamOperator<TestMessage, TestMessage> mergeOp = 
Operators.getMergeOperator(output);
+    Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new 
ArrayList<TestMessage>() {{
+      this.add(t);
+    }};
+    TestMessage t = mock(TestMessage.class);
+    assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
+    assertEquals(mergeOp.getOutputStream(), output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
new file mode 100644
index 0000000..727276a
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.internal;
+
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestTrigger {
+
+  @Test public void testConstructor() throws Exception {
+    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
+    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
lateTrigger = (m, s) -> s.getOutputValue() > 1000;
+    Function<WindowState<Integer>, Boolean> timerTrigger = s -> 
TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < 
System.currentTimeMillis();
+    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = 
s -> { s.setOutputValue(0); return s; };
+    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = 
s -> { s.setOutputValue(1); return s; };
+
+    Trigger<Message<Object, Object>, WindowState<Integer>> trigger = 
Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
+        earlyTriggerUpdater, lateTriggerUpdater);
+
+    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+    Field earlyTriggerUpdaterField = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
+    Field lateTriggerUpdaterField = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
+    earlyTriggerField.setAccessible(true);
+    lateTriggerField.setAccessible(true);
+    timerTriggerField.setAccessible(true);
+    earlyTriggerUpdaterField.setAccessible(true);
+    lateTriggerUpdaterField.setAccessible(true);
+
+    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
+    assertEquals(timerTrigger, timerTriggerField.get(trigger));
+    assertEquals(lateTrigger, lateTriggerField.get(trigger));
+    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
+    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
new file mode 100644
index 0000000..f3cf0e0
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.internal;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestWindowOutput {
+
+  @Test public void testConstructor() {
+    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
+    assertEquals(wndOutput.getKey(), "testMsg");
+    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+    assertFalse(wndOutput.isDelete());
+    assertEquals(wndOutput.getTimestamp(), 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
new file mode 100644
index 0000000..9445f3a
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.reactivestreams.Subscriber;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+
+public class TestOperatorImpl {
+
+  TestMessage curInputMsg;
+  MessageCollector curCollector;
+  TaskCoordinator curCoordinator;
+
+  @Test public void testSubscribers() {
+    this.curInputMsg = null;
+    this.curCollector = null;
+    this.curCoordinator = null;
+    OperatorImpl<TestMessage, TestOutputMessage> opImpl = new 
OperatorImpl<TestMessage, TestOutputMessage>() {
+      @Override protected void onNext(TestMessage message, MessageCollector 
collector, TaskCoordinator coordinator) {
+        TestOperatorImpl.this.curInputMsg = message;
+        TestOperatorImpl.this.curCollector = collector;
+        TestOperatorImpl.this.curCoordinator = coordinator;
+      }
+    };
+    // verify subscribe() added the mockSub and nextProcessors() invoked the 
mockSub.onNext()
+    Subscriber<ProcessorContext<TestOutputMessage>> mockSub = 
mock(Subscriber.class);
+    opImpl.subscribe(mockSub);
+    TestOutputMessage xOutput = mock(TestOutputMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.nextProcessors(xOutput, mockCollector, mockCoordinator);
+    verify(mockSub, times(1)).onNext(argThat(new 
ArgumentMatcher<ProcessorContext<TestOutputMessage>>() {
+      @Override public boolean matches(Object argument) {
+        ProcessorContext<TestOutputMessage> pCntx = 
(ProcessorContext<TestOutputMessage>) argument;
+        return pCntx.getMessage().equals(xOutput) && 
pCntx.getCoordinator().equals(mockCoordinator) && 
pCntx.getCollector().equals(mockCollector);
+      }
+    }));
+    // verify onNext() is invoked correctly
+    TestMessage mockInput = mock(TestMessage.class);
+    ProcessorContext<TestMessage> inCntx = new ProcessorContext<>(mockInput, 
mockCollector, mockCoordinator);
+    opImpl.onNext(inCntx);
+    assertEquals(mockInput, this.curInputMsg);
+    assertEquals(mockCollector, this.curCollector);
+    assertEquals(mockCoordinator, this.curCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
new file mode 100644
index 0000000..4bcf767
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.data.Message;
+
+
+class TestOutputMessage implements Message<String, Integer> {
+  private final String key;
+  private final Integer value;
+  private final long timestamp;
+
+  TestOutputMessage(String key, Integer value, long timestamp) {
+    this.key = key;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  @Override public Integer getMessage() {
+    return this.value;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
new file mode 100644
index 0000000..14796fc
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+
+public class TestProcessorContext {
+  @Test public void testConstructor() {
+    TestMessage mockMsg = mock(TestMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+    ProcessorContext<TestMessage> pCntx = new ProcessorContext<>(mockMsg, 
mockCollector, mockTaskCoordinator);
+    assertEquals(pCntx.getMessage(), mockMsg);
+    assertEquals(pCntx.getCollector(), mockCollector);
+    assertEquals(pCntx.getCoordinator(), mockTaskCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
new file mode 100644
index 0000000..50154f0
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestSimpleOperatorImpl {
+
+  @Test public void testSimpleOperator() {
+    StreamOperator<TestMessage, TestOutputMessage> mockOp = 
mock(StreamOperator.class);
+    Function<TestMessage, Collection<TestOutputMessage>> txfmFn = 
mock(Function.class);
+    when(mockOp.getFunction()).thenReturn(txfmFn);
+
+    SimpleOperatorImpl<TestMessage, TestOutputMessage> opImpl = spy(new 
SimpleOperatorImpl<>(mockOp));
+    TestMessage inMsg = mock(TestMessage.class);
+    TestOutputMessage outMsg = mock(TestOutputMessage.class);
+    Collection<TestOutputMessage> mockOutputs = new ArrayList() {{
+      this.add(outMsg);
+    }};
+    when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.onNext(inMsg, mockCollector, mockCoordinator);
+    verify(txfmFn, times(1)).apply(inMsg);
+    verify(opImpl, times(1)).nextProcessors(outMsg, mockCollector, 
mockCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
new file mode 100644
index 0000000..eb8a23a
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.internal.Operators.SinkOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestSinkOperatorImpl {
+
+  @Test public void testSinkOperator() {
+    SinkOperator<TestOutputMessage> sinkOp = mock(SinkOperator.class);
+    MessageStream.VoidFunction3<TestOutputMessage, MessageCollector, 
TaskCoordinator> sinkFn = mock(
+        MessageStream.VoidFunction3.class);
+    when(sinkOp.getFunction()).thenReturn(sinkFn);
+    SinkOperatorImpl<TestOutputMessage> sinkImpl = new 
SinkOperatorImpl<>(sinkOp);
+    TestOutputMessage mockMsg = mock(TestOutputMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+
+    sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator);
+    verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
new file mode 100644
index 0000000..eb8937a
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+
+public class TestStateStoreImpl {
+  @Test public void testStateStoreImpl() {
+    StoreFunctions<TestMessage, String, WindowState> mockStoreFunctions = 
mock(StoreFunctions.class);
+    // test constructor
+    StateStoreImpl<TestMessage, String, WindowState> storeImpl = new 
StateStoreImpl<>(mockStoreFunctions, "myStoreName");
+    TaskContext mockContext = mock(TaskContext.class);
+    KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class);
+    when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore);
+    // test init()
+    storeImpl.init(mockContext);
+    verify(mockContext, times(1)).getStore("myStoreName");
+    Function<TestMessage, String> wndKeyFn = mock(Function.class);
+    when(mockStoreFunctions.getStoreKeyFinder()).thenReturn(wndKeyFn);
+    TestMessage mockMsg = mock(TestMessage.class);
+    when(wndKeyFn.apply(mockMsg)).thenReturn("myKey");
+    WindowState mockState = mock(WindowState.class);
+    when(mockKvStore.get("myKey")).thenReturn(mockState);
+    // test getState()
+    Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg);
+    assertEquals(storeEntry.getKey(), "myKey");
+    assertEquals(storeEntry.getValue(), mockState);
+    verify(wndKeyFn, times(1)).apply(mockMsg);
+    verify(mockKvStore, times(1)).get("myKey");
+    Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState);
+    WindowState mockNewState = mock(WindowState.class);
+    BiFunction<TestMessage, WindowState, WindowState> mockUpdaterFn = 
mock(BiFunction.class);
+    when(mockStoreFunctions.getStateUpdater()).thenReturn(mockUpdaterFn);
+    when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState);
+    // test updateState()
+    Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, 
oldEntry);
+    assertEquals(newEntry.getKey(), "myKey");
+    assertEquals(newEntry.getValue(), mockNewState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
new file mode 100644
index 0000000..10ee2c7
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.operators.impl.data.avro.AvroData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SqlAvroSerdeTest {
+  public static final String ORDER_SCHEMA = "{\"namespace\": 
\"org.apache.samza.operators\",\n"+
+      " \"type\": \"record\",\n"+
+      " \"name\": \"Order\",\n"+
+      " \"fields\": [\n"+
+      "     {\"name\": \"id\", \"type\": \"int\"},\n"+
+      "     {\"name\": \"product\",  \"type\": \"string\"},\n"+
+      "     {\"name\": \"quantity\", \"type\": \"int\"}\n"+
+      " ]\n"+
+      "}";
+
+  public static Schema orderSchema = Schema.parse(ORDER_SCHEMA);
+
+  private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", 
sqlAvroSerdeTestConfig());
+
+  @Test
+  public void testSqlAvroSerdeDeserialization() throws IOException {
+    AvroData decodedDatum = 
(AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+
+    Assert.assertTrue(decodedDatum.schema().getType() == 
org.apache.samza.operators.api.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == 
org.apache.samza.operators.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() 
== org.apache.samza.operators.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() 
== org.apache.samza.operators.api.data.Schema.Type.STRING);
+  }
+
+  @Test
+  public void testSqlAvroSerialization() throws IOException {
+    AvroData decodedDatumOriginal = 
(AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+    byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
+
+    AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
+
+    Assert.assertTrue(decodedDatum.schema().getType() == 
org.apache.samza.operators.api.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == 
org.apache.samza.operators.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() 
== org.apache.samza.operators.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() 
== org.apache.samza.operators.api.data.Schema.Type.STRING);
+  }
+
+  private static Config sqlAvroSerdeTestConfig(){
+    Map<String, String> config = new HashMap<String, String>();
+    config.put("serializers.sqlAvro.schema", ORDER_SCHEMA);
+
+    return new MapConfig(config);
+  }
+
+  private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) 
throws IOException {
+    DatumWriter<GenericRecord> writer = new 
GenericDatumWriter<GenericRecord>(avroSchema);
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    BinaryEncoder encoder = new BinaryEncoder(output);
+    writer.write(datum, encoder);
+    encoder.flush();
+
+    return  output.toByteArray();
+  }
+
+  private static GenericRecord sampleOrderRecord(){
+    GenericData.Record datum = new GenericData.Record(orderSchema);
+    datum.put("id", 1);
+    datum.put("product", "paint");
+    datum.put("quantity", 3);
+
+    return datum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
new file mode 100644
index 0000000..6947464
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.window;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.api.internal.Operators.WindowOperator;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.impl.StateStoreImpl;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+import java.lang.reflect.Field;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+
+public class TestSessionWindowImpl {
+  Field wndStoreField = null;
+  Field txfmFnField = null;
+
+  @Before public void prep() throws NoSuchFieldException {
+    wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore");
+    txfmFnField = SessionWindowImpl.class.getDeclaredField("txfmFunction");
+    wndStoreField.setAccessible(true);
+    txfmFnField.setAccessible(true);
+  }
+
+  @Test public void testConstructor() throws IllegalAccessException, 
NoSuchFieldException {
+    // test constructing a SessionWindowImpl w/ expected mock functions
+    MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
+    WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
+    StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = 
mock(StoreFunctions.class);
+    when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
+    when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
+    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, 
WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
+    when(wndOp.getFunction()).thenReturn(mockTxfmFn);
+    SessionWindowImpl<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, 
mockInputStrm);
+    BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, 
Integer>> txfmFn =
+        (BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, 
Integer>>) txfmFnField.get(sessWnd);
+    assertEquals(mockTxfmFn, txfmFn);
+    StateStoreImpl<TestMessage, String, WindowState<Integer>> storeImpl =
+        (StateStoreImpl<TestMessage, String, WindowState<Integer>>) 
wndStoreField.get(sessWnd);
+
+    // test init() and make sure the wndStore is initialized as expected
+    TestMessage mockMsg = mock(TestMessage.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    KeyValueStore<String, WindowState<Integer>> mockKvStore = 
mock(KeyValueStore.class);
+    when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
+    Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
+    when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
+    WindowState<Integer> mockState = mock(WindowState.class);
+    when(mockKvStore.get("test-msg-key")).thenReturn(mockState);
+    storeImpl.init(mockContext);
+    Entry<String, WindowState<Integer>> stateEntry = 
storeImpl.getState(mockMsg);
+    verify(mockStoreFns, times(1)).getStoreKeyFinder();
+    verify(mockKvStore, times(1)).get("test-msg-key");
+    assertEquals(stateEntry.getKey(), "test-msg-key");
+    assertEquals(stateEntry.getValue(), mockState);
+  }
+
+  @Test public void testInitAndProcess() {
+    MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
+    WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
+    StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = 
mock(StoreFunctions.class);
+    Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
+    when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
+    when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
+    when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
+    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, 
WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
+    when(wndOp.getFunction()).thenReturn(mockTxfmFn);
+
+    // construct and init the SessionWindowImpl object
+    SessionWindowImpl<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, 
mockInputStrm);
+    TaskContext mockContext = mock(TaskContext.class);
+    KeyValueStore<String, WindowState<Integer>> mockKvStore = 
mock(KeyValueStore.class);
+    when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
+    sessWnd.init(mockContext);
+
+    // test onNext() method. Make sure the right methods are invoked.
+    TestMessage mockMsg = mock(TestMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    BiFunction<TestMessage, WindowState<Integer>, WindowState<Integer>> 
stateUpdaterFn = mock(BiFunction.class);
+    when(mockStoreFns.getStateUpdater()).thenReturn(stateUpdaterFn);
+    WindowState<Integer> mockNewState = mock(WindowState.class);
+    WindowState<Integer> oldState = mock(WindowState.class);
+    when(mockKvStore.get("test-msg-key")).thenReturn(oldState);
+    when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState);
+    sessWnd.onNext(mockMsg, mockCollector, mockCoordinator);
+    verify(mockTxfmFn, times(1)).apply(argThat(new 
ArgumentMatcher<TestMessage>() {
+      @Override public boolean matches(Object argument) {
+        TestMessage xIn = (TestMessage) argument;
+        return xIn.equals(mockMsg);
+      }
+    }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() {
+      @Override public boolean matches(Object argument) {
+        Entry<String, WindowState<Integer>> xIn = (Entry<String, 
WindowState<Integer>>) argument;
+        return xIn.getKey().equals("test-msg-key") && 
xIn.getValue().equals(oldState);
+      }
+    }));
+    verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState);
+    verify(mockKvStore, times(1)).put("test-msg-key", mockNewState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
new file mode 100644
index 0000000..91b0074
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.api.Windows;
+import org.apache.samza.operators.api.TriggerBuilder;
+import org.apache.samza.operators.api.data.IncomingSystemMessage;
+import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collection;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class BroadcastOperatorTask implements StreamOperatorTask {
+  class MessageType {
+    String field1;
+    String field2;
+    String field3;
+    String field4;
+    String parKey;
+    private long timestamp;
+
+    public long getTimestamp() { return this.timestamp; }
+  }
+
+  class JsonMessage extends InputJsonSystemMessage<MessageType> {
+
+    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+      super(key, data, offset, timestamp, partition);
+    }
+  }
+
+  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
+    sources.forEach(source -> {
+          MessageStream<JsonMessage> inputStream = 
source.map(this::getInputMessage);
+
+          inputStream.filter(this::myFilter1).
+              window(Windows.<JsonMessage, String>intoSessionCounter(
+                  m -> String.format("%s-%s", m.getMessage().field1, 
m.getMessage().field2)).
+                  setTriggers(TriggerBuilder.<JsonMessage, 
Integer>earlyTriggerWhenExceedWndLen(100).
+                      addLateTriggerOnSizeLimit(10).
+                      addTimeoutSinceLastMessage(30000)));
+
+          inputStream.filter(this::myFilter2).
+              window(Windows.<JsonMessage, String>intoSessions(
+                  m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4)).
+                  setTriggers(TriggerBuilder.<JsonMessage, 
Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
+                      addTimeoutSinceLastMessage(30000)));
+
+          inputStream.filter(this::myFilter3).
+              window(Windows.<JsonMessage, String, MessageType>intoSessions(
+                  m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4), m -> m.getMessage()).
+                  setTriggers(TriggerBuilder
+                      .<JsonMessage, 
Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
+                          addTimeoutSinceFirstMessage(60000)));
+    }
+    );
+  }
+
+  JsonMessage getInputMessage(IncomingSystemMessage m1) {
+    return new JsonMessage(
+        m1.getKey().toString(),
+        (MessageType) m1.getMessage(),
+        m1.getOffset(),
+        this.getEventTime((GenericRecord)m1.getMessage()),
+        m1.getSystemStreamPartition());
+  }
+
+  long getEventTime(GenericRecord msg) {
+    return (Long) msg.get("event_time");
+  }
+
+  boolean myFilter1(JsonMessage m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key1");
+  }
+
+  boolean myFilter2(JsonMessage m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key2");
+  }
+
+  boolean myFilter3(JsonMessage m1) {
+    return m1.getMessage().parKey.equals("key3");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
 
b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
new file mode 100644
index 0000000..5e710b2
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.operators.api.data.InputSystemMessage;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Example input message w/ avro message body and string as the key.
+ */
+
+public class InputJsonSystemMessage<T> implements Message<String, T>, 
InputSystemMessage<Offset> {
+
+  private final String key;
+  private final T data;
+  private final Offset offset;
+  private final long timestamp;
+  private final SystemStreamPartition partition;
+
+  InputJsonSystemMessage(String key, T data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+    this.key = key;
+    this.data = data;
+    this.offset = offset;
+    this.timestamp = timestamp;
+    this.partition = partition;
+  }
+
+  @Override public T getMessage() {
+    return this.data;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override public Offset getOffset() { return this.offset; }
+
+  @Override public SystemStreamPartition getSystemStreamPartition() { return 
this.partition; }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
new file mode 100644
index 0000000..825f4c4
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.api.data.IncomingSystemMessage;
+import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class JoinOperatorTask implements StreamOperatorTask {
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessage extends InputJsonSystemMessage<MessageType> {
+
+    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+      super(key, data, offset, timestamp, partition);
+    }
+  }
+
+  MessageStream<JsonMessage> joinOutput = null;
+
+  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
+    sources.forEach(source -> {
+      MessageStream<JsonMessage> newSource = source.map(this::getInputMessage);
+      if (joinOutput == null) {
+        joinOutput = newSource;
+      } else {
+        joinOutput = joinOutput.join(newSource, (m1, m2) -> 
this.myJoinResult(m1, m2));
+      }
+    });
+  }
+
+  private JsonMessage getInputMessage(IncomingSystemMessage ism) {
+    return new JsonMessage(
+        ((MessageType)ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getTimestamp(),
+        ism.getSystemStreamPartition());
+  }
+
+  JsonMessage myJoinResult(JsonMessage m1, JsonMessage m2) {
+    MessageType newJoinMsg = new MessageType();
+    newJoinMsg.joinKey = m1.getKey();
+    newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+    newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+    return new JsonMessage(m1.getMessage().joinKey, newJoinMsg, null, 
m1.getTimestamp(), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
new file mode 100644
index 0000000..306425e
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.Partition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+
+public class TestStreamOperatorAdaptorTask {
+  Field userTaskField = null;
+  Field chainedOpsField = null;
+
+  @Before public void prep() throws NoSuchFieldException {
+    userTaskField = 
StreamOperatorAdaptorTask.class.getDeclaredField("userTask");
+    chainedOpsField = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    userTaskField.setAccessible(true);
+    chainedOpsField.setAccessible(true);
+  }
+
+
+  @Test public void testConstructor() throws IllegalAccessException {
+    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
+    StreamOperatorTask taskMemberVar = (StreamOperatorTask) 
userTaskField.get(adaptorTask);
+    Map<SystemStreamPartition, ChainedOperators> chainsMap = 
(Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask);
+    assertEquals(taskMemberVar, userTask);
+    assertTrue(chainsMap.isEmpty());
+  }
+
+  @Test public void testInit() throws Exception {
+    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    Set<SystemStreamPartition> testInputs = new HashSet() {{
+      this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(0)));
+      this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(1)));
+    }};
+    when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs);
+    adaptorTask.init(mockConfig, mockContext);
+    verify(userTask, times(1)).initOperators(Mockito.anyCollection());
+    Map<SystemStreamPartition, ChainedOperators> chainsMap = 
(Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask);
+    assertTrue(chainsMap.size() == 2);
+    assertTrue(chainsMap.containsKey(testInputs.toArray()[0]));
+    assertTrue(chainsMap.containsKey(testInputs.toArray()[1]));
+  }
+
+  // TODO: window and process methods to be added after implementation of 
ChainedOperators.create()
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
new file mode 100644
index 0000000..d6181ea
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestStreamOperatorTasks {
+
+  private final WindowOperatorTask userTask = new WindowOperatorTask();
+
+  private final BroadcastOperatorTask splitTask = new BroadcastOperatorTask();
+
+  private final JoinOperatorTask joinTask = new JoinOperatorTask();
+
+  private final Set<SystemStreamPartition> inputPartitions = new 
HashSet<SystemStreamPartition>() {{
+    for (int i = 0; i < 4; i++) {
+      this.add(new SystemStreamPartition("my-system", "my-topic1", new 
Partition(i)));
+    }
+  }};
+
+  @Test public void testUserTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.userTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
+        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+      assertNotNull(pipelineMap.get(partition));
+    });
+  }
+
+  @Test public void testSplitTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.splitTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
+        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+      assertNotNull(pipelineMap.get(partition));
+    });
+  }
+
+  @Test public void testJoinTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.joinTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
+        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+      assertNotNull(pipelineMap.get(partition));
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
new file mode 100644
index 0000000..11186ea
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.api.TriggerBuilder;
+import org.apache.samza.operators.api.Windows;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collection;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class WindowOperatorTask implements StreamOperatorTask {
+  class MessageType {
+    String field1;
+    String field2;
+  }
+
+  class JsonMessage extends InputJsonSystemMessage<MessageType> {
+
+    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+      super(key, data, offset, timestamp, partition);
+    }
+  }
+
+  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
+    sources.forEach(source ->
+      source.map(m1 ->
+        new JsonMessage(
+          this.myMessageKeyFunction(m1),
+          (MessageType) m1.getMessage(),
+          m1.getOffset(),
+          m1.getTimestamp(),
+          m1.getSystemStreamPartition())).
+        window(
+          Windows.<JsonMessage, String>intoSessionCounter(
+              m -> String.format("%s-%s", m.getMessage().field1, 
m.getMessage().field2)).
+            setTriggers(TriggerBuilder.<JsonMessage, 
Integer>earlyTriggerWhenExceedWndLen(100).
+              addTimeoutSinceLastMessage(30000)))
+    );
+  }
+
+  String myMessageKeyFunction(Message<Object, Object> m) {
+    return m.getKey().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
 
b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
index fbb5c59..ea9ee57 100644
--- 
a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
+++ 
b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
@@ -38,7 +38,8 @@ public class TestAvroSchemaConverter {
       " ]\n" +
       "}";
 
-  public static final Schema simpleRecord = new 
Schema.Parser().parse(SIMPLE_RECORD_SCHEMA);
+  public static final Schema simpleRecord = Schema.parse(SIMPLE_RECORD_SCHEMA);
+
   @Test
   public void testSimpleAvroRecord(){
     RelDataTypeFactory relDataTypeFactory = new 
SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/README.md
----------------------------------------------------------------------
diff --git a/samza-sql-core/README.md b/samza-sql-core/README.md
deleted file mode 100644
index 72464dc..0000000
--- a/samza-sql-core/README.md
+++ /dev/null
@@ -1,17 +0,0 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-samza-sql is an experimental module that is under development (SAMZA-390).

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java 
b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java
deleted file mode 100644
index d1b8409..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.List;
-import java.util.Map;
-
-
-public interface Data {
-
-  Schema schema();
-
-  Object value();
-
-  int intValue();
-
-  long longValue();
-
-  float floatValue();
-
-  double doubleValue();
-
-  boolean booleanValue();
-
-  String strValue();
-
-  byte[] bytesValue();
-
-  List<Object> arrayValue();
-
-  Map<Object, Object> mapValue();
-
-  Data getElement(int index);
-
-  Data getFieldData(String fldName);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
deleted file mode 100644
index 80ba455..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * This class defines the name scheme for the collective data entities in 
Samza Stream SQL, i.e. tables and streams.
- */
-public class EntityName {
-  /**
-   * {@code EntityType} defines the types of the entity names
-   *
-   */
-  private enum EntityType {
-    TABLE,
-    STREAM
-  };
-
-  /**
-   * Type of the entity name
-   */
-  private final EntityType type;
-
-  /**
-   * Formatted name of the entity.
-   *
-   * <p>This formatted name of the entity should be unique identifier for the 
corresponding table/stream in the system.
-   * e.g. for a Kafka system stream named "mystream", the formatted name 
should be "kafka:mystream".
-   */
-  private final String name;
-
-  /**
-   * Static map of already allocated table names
-   */
-  private static Map<String, EntityName> tables = new HashMap<String, 
EntityName>();
-
-  /**
-   * Static map of already allocated stream names
-   */
-  private static Map<String, EntityName> streams = new HashMap<String, 
EntityName>();
-
-  /**
-   * Private ctor to create entity names
-   *
-   * @param type Type of the entity name
-   * @param name Formatted name of the entity
-   */
-  private EntityName(EntityType type, String name) {
-    this.type = type;
-    this.name = name;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%s:%s", this.type, this.name);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof EntityName) {
-      EntityName otherEntity = (EntityName) other;
-      return this.type.equals(otherEntity.type) && 
this.name.equals(otherEntity.name);
-    }
-    return false;
-  }
-
-  /**
-   * Check to see whether this entity name is for a table
-   *
-   * @return true if the entity type is {@code EntityType.TABLE}; false 
otherwise
-   */
-  public boolean isTable() {
-    return this.type.equals(EntityType.TABLE);
-  }
-
-  /**
-   * Check to see whether this entity name is for a stream
-   *
-   * @return true if the entity type is {@code EntityType.STREAM}; false 
otherwise
-   */
-  public boolean isStream() {
-    return this.type.equals(EntityType.STREAM);
-  }
-
-  /**
-   * Get the formatted entity name
-   *
-   * @return The formatted entity name
-   */
-  public String getName() {
-    return this.name;
-  }
-
-  /**
-   * Static method to get the instance of {@code EntityName} with type {@code 
EntityType.TABLE}
-   *
-   * @param name The formatted entity name of the relation
-   * @return A <code>EntityName</code> for a relation
-   */
-  public static EntityName getTableName(String name) {
-    if (tables.get(name) == null) {
-      tables.put(name, new EntityName(EntityType.TABLE, name));
-    }
-    return tables.get(name);
-  }
-
-  /**
-   * Static method to get the instance of <code>EntityName</code> with type 
<code>EntityType.STREAM</code>
-   *
-   * @param name The formatted entity name of the stream
-   * @return A <code>EntityName</code> for a stream
-   */
-  public static EntityName getStreamName(String name) {
-    if (streams.get(name) == null) {
-      streams.put(name, new EntityName(EntityType.STREAM, name));
-    }
-    return streams.get(name);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java 
b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
deleted file mode 100644
index 72816a3..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import org.apache.samza.storage.kv.KeyValueStore;
-
-
-/**
- * This class defines the general interface of {@code Relation}, which is 
defined as a map of {@link org.apache.samza.sql.api.data.Tuple}.
- *
- * <p>The interface is defined as an extension to {@link 
org.apache.samza.storage.kv.KeyValueStore}.
- *
- */
-
-public interface Relation<K> extends KeyValueStore<K, Tuple> {
-
-  /**
-   * Get the name of the relation
-   *
-   * @return The relation name
-   */
-  EntityName getName();
-}

Reply via email to