Repository: samza
Updated Branches:
  refs/heads/master 202a15809 -> 1e5f30f38


http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
new file mode 100644
index 0000000..0d6141e
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.window;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.operators.impl.StateStoreImpl;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Default implementation class of a {@link WindowOperator} for a session 
window.
+ *
+ * @param <M>  the type of input {@link Message}
+ * @param <RK>  the type of window key
+ * @param <WS>  the type of window state
+ * @param <RM>  the type of aggregated value of the window
+ */
+public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, 
RM extends WindowOutput<RK, ?>> extends
+    OperatorImpl<M, RM> {
+  private final WindowOperator<M, RK, WS, RM> sessWnd;
+  private StateStoreImpl<M, RK, WS> wndStore = null;
+
+  public SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd) {
+    this.sessWnd = sessWnd;
+  }
+
+  @Override protected void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
+    Entry<RK, WS> state = this.wndStore.getState(message);
+    this.nextProcessors(this.sessWnd.getFunction().apply(message, state), 
collector, coordinator);
+    this.wndStore.updateState(message, state);
+  }
+
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
+    // This is to periodically check the timeout triggers to get the list of 
window states to be updated
+  }
+
+  @Override protected void init(MessageStream<M> source, TaskContext context) {
+    this.wndStore = new StateStoreImpl<>(this.sessWnd.getStoreFunctions(), 
sessWnd.getStoreName(source));
+    this.wndStore.init(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
 
b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
new file mode 100644
index 0000000..c2f780d
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreams;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.operators.task.StreamOperatorTask;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * An adaptor task class that invoke the user-implemented (@link 
StreamOperatorTask}s via {@link org.apache.samza.operators.MessageStream} 
programming APIs
+ *
+ */
+public final class StreamOperatorAdaptorTask implements StreamTask, 
InitableTask, WindowableTask {
+  /**
+   * A map with entries mapping {@link SystemStreamPartition} to {@link 
org.apache.samza.operators.impl.ChainedOperators} that takes the {@link 
SystemStreamPartition}
+   * as the input stream
+   */
+  private final Map<SystemStreamPartition, ChainedOperators> operatorChains = 
new HashMap<>();
+
+  /**
+   * Wrapped {@link StreamOperatorTask} class
+   */
+  private final StreamOperatorTask userTask;
+
+  /**
+   * Constructor that wraps the user-defined {@link StreamOperatorTask}
+   *
+   * @param userTask  the user-defined {@link StreamOperatorTask}
+   */
+  public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
+    this.userTask = userTask;
+  }
+
+  @Override
+  public final void init(Config config, TaskContext context) throws Exception {
+    if (this.userTask instanceof InitableTask) {
+      ((InitableTask) this.userTask).init(config, context);
+    }
+    Map<SystemStreamPartition, SystemMessageStream> sources = new HashMap<>();
+    context.getSystemStreamPartitions().forEach(ssp -> {
+        SystemMessageStream ds = MessageStreams.input(ssp);
+        sources.put(ssp, ds);
+      });
+    this.userTask.initOperators(sources.values());
+    sources.forEach((ssp, ds) -> operatorChains.put(ssp, 
ChainedOperators.create(ds, context)));
+  }
+
+  @Override
+  public final void process(IncomingMessageEnvelope ime, MessageCollector 
collector, TaskCoordinator coordinator) {
+    this.operatorChains.get(ime.getSystemStreamPartition()).onNext(new 
IncomingSystemMessage(ime), collector, coordinator);
+  }
+
+  @Override
+  public final void window(MessageCollector collector, TaskCoordinator 
coordinator) throws Exception {
+    this.operatorChains.forEach((ssp, chain) -> chain.onTimer(collector, 
coordinator));
+    if (this.userTask instanceof WindowableTask) {
+      ((WindowableTask) this.userTask).window(collector, coordinator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
new file mode 100644
index 0000000..e3a70e8
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.Windows;
+import org.apache.samza.task.TaskContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+
+public class TestChainedOperators {
+  Field subsField = null;
+  Field opSubsField = null;
+
+  @Before public void prep() throws NoSuchFieldException {
+    subsField = ChainedOperators.class.getDeclaredField("subscribers");
+    subsField.setAccessible(true);
+    opSubsField = OperatorImpl.class.getDeclaredField("subscribers");
+    opSubsField.setAccessible(true);
+  }
+
+  @Test public void testCreate() {
+    // test creation of empty chain
+    MessageStream<TestMessage> testStream = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    ChainedOperators<TestMessage> operatorChain = 
ChainedOperators.create(testStream, mockContext);
+    assertTrue(operatorChain != null);
+  }
+
+  @Test public void testLinearChain() throws IllegalAccessException {
+    // test creation of linear chain
+    MessageStream<TestMessage> testInput = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    testInput.map(m -> 
m).window(Windows.intoSessionCounter(TestMessage::getKey));
+    ChainedOperators<TestMessage> operatorChain = 
ChainedOperators.create(testInput, mockContext);
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) 
subsField.get(operatorChain);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessage, TestMessage> firstOpImpl = 
subsSet.iterator().next();
+    Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = 
(Set<Subscriber<? super ProcessorContext<TestMessage>>>) 
opSubsField.get(firstOpImpl);
+    assertEquals(subsOps.size(), 1);
+    Subscriber<? super ProcessorContext<TestMessage>> wndOpImpl = 
subsOps.iterator().next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) 
opSubsField.get(wndOpImpl);
+    assertEquals(subsOps.size(), 0);
+  }
+
+  @Test public void testBroadcastChain() throws IllegalAccessException {
+    // test creation of broadcast chain
+    MessageStream<TestMessage> testInput = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    testInput.filter(m -> m.getTimestamp() > 123456L).flatMap(m -> new 
ArrayList() { { this.add(m); this.add(m); } });
+    testInput.filter(m -> m.getTimestamp() < 123456L).map(m -> m);
+    ChainedOperators<TestMessage> operatorChain = 
ChainedOperators.create(testInput, mockContext);
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) 
subsField.get(operatorChain);
+    assertEquals(subsSet.size(), 2);
+    Iterator<OperatorImpl> iter = subsSet.iterator();
+    // check the first branch w/ flatMap
+    OperatorImpl<TestMessage, TestMessage> opImpl = iter.next();
+    Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = 
(Set<Subscriber<? super ProcessorContext<TestMessage>>>) 
opSubsField.get(opImpl);
+    assertEquals(subsOps.size(), 1);
+    Subscriber<? super ProcessorContext<TestMessage>> flatMapImpl = 
subsOps.iterator().next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) 
opSubsField.get(flatMapImpl);
+    assertEquals(subsOps.size(), 0);
+    // check the second branch w/ map
+    opImpl = iter.next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) 
opSubsField.get(opImpl);
+    assertEquals(subsOps.size(), 1);
+    Subscriber<? super ProcessorContext<TestMessage>> mapImpl = 
subsOps.iterator().next();
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) 
opSubsField.get(mapImpl);
+    assertEquals(subsOps.size(), 0);
+  }
+
+  @Test public void testJoinChain() throws IllegalAccessException {
+    // test creation of join chain
+    MessageStream<TestMessage> input1 = new MessageStream<>();
+    MessageStream<TestMessage> input2 = new MessageStream<>();
+    TaskContext mockContext = mock(TaskContext.class);
+    input1.join(input2, (m1, m2) -> new TestOutputMessage(m1.getKey(), 
m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp())).map(m 
-> m);
+    // now, we create chained operators from each input sources
+    ChainedOperators<TestMessage> chain1 = ChainedOperators.create(input1, 
mockContext);
+    ChainedOperators<TestMessage> chain2 = ChainedOperators.create(input2, 
mockContext);
+    // check that those two chains will merge at map operator
+    // first branch of the join
+    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(chain1);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessage, TestOutputMessage> joinOp1 = 
subsSet.iterator().next();
+    Set<Subscriber<? super ProcessorContext<TestOutputMessage>>> subsOps = 
(Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) 
opSubsField.get(joinOp1);
+    assertEquals(subsOps.size(), 1);
+    // the map operator consumes the common join output, where two branches 
merge
+    Subscriber<? super ProcessorContext<TestOutputMessage>> mapImpl = 
subsOps.iterator().next();
+    // second branch of the join
+    subsSet = (Set<OperatorImpl>) subsField.get(chain2);
+    assertEquals(subsSet.size(), 1);
+    OperatorImpl<TestMessage, TestOutputMessage> joinOp2 = 
subsSet.iterator().next();
+    assertNotSame(joinOp1, joinOp2);
+    subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) 
opSubsField.get(joinOp2);
+    assertEquals(subsOps.size(), 1);
+    // make sure that the map operator is the same
+    assertEquals(mapImpl, subsOps.iterator().next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
new file mode 100644
index 0000000..cb4576c
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
+import org.apache.samza.operators.impl.window.SessionWindowImpl;
+import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorFactory {
+
+  @Test public void testGetOperator() throws NoSuchFieldException, 
IllegalAccessException {
+    // get window operator
+    WindowOperator mockWnd = mock(WindowOperator.class);
+    Entry<OperatorImpl<TestMessage, ? extends Message>, Boolean>
+        factoryEntry = OperatorFactory.<TestMessage, 
TestOutputMessage>getOperator(mockWnd);
+    assertFalse(factoryEntry.getValue());
+    OperatorImpl<TestMessage, TestOutputMessage> opImpl = 
(OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+    assertTrue(opImpl instanceof SessionWindowImpl);
+    Field sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
+    sessWndField.setAccessible(true);
+    WindowOperator sessWnd = (WindowOperator) sessWndField.get(opImpl);
+    assertEquals(sessWnd, mockWnd);
+
+    // get simple operator
+    StreamOperator<TestMessage, TestOutputMessage> mockSimpleOp = 
mock(StreamOperator.class);
+    Function<TestMessage, Collection<TestOutputMessage>>  mockTxfmFn = 
mock(Function.class);
+    when(mockSimpleOp.getFunction()).thenReturn(mockTxfmFn);
+    factoryEntry = OperatorFactory.<TestMessage, 
TestOutputMessage>getOperator(mockSimpleOp);
+    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) 
factoryEntry.getKey();
+    assertTrue(opImpl instanceof SimpleOperatorImpl);
+    Field txfmFnField = 
SimpleOperatorImpl.class.getDeclaredField("transformFn");
+    txfmFnField.setAccessible(true);
+    assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
+
+    // get sink operator
+    MessageStream.VoidFunction3<TestMessage, MessageCollector, 
TaskCoordinator> sinkFn = (m, mc, tc) -> { };
+    SinkOperator<TestMessage> sinkOp = mock(SinkOperator.class);
+    when(sinkOp.getFunction()).thenReturn(sinkFn);
+    factoryEntry = OperatorFactory.<TestMessage, 
TestOutputMessage>getOperator(sinkOp);
+    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) 
factoryEntry.getKey();
+    assertTrue(opImpl instanceof SinkOperatorImpl);
+    Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFunc");
+    sinkFnField.setAccessible(true);
+    assertEquals(sinkFn, sinkFnField.get(opImpl));
+
+    // get join operator
+    PartialJoinOperator<TestMessage, String, TestMessage, TestOutputMessage> 
joinOp = mock(PartialJoinOperator.class);
+    TestOutputMessage mockOutput = mock(TestOutputMessage.class);
+    BiFunction<TestMessage, TestMessage, TestOutputMessage> joinFn = (m1, m2) 
-> mockOutput;
+    when(joinOp.getFunction()).thenReturn(joinFn);
+    factoryEntry = OperatorFactory.<TestMessage, 
TestOutputMessage>getOperator(joinOp);
+    opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) 
factoryEntry.getKey();
+    assertTrue(opImpl instanceof PartialJoinOpImpl);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
new file mode 100644
index 0000000..4bd467d
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.reactivestreams.Subscriber;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+
+public class TestOperatorImpl {
+
+  TestMessage curInputMsg;
+  MessageCollector curCollector;
+  TaskCoordinator curCoordinator;
+
+  @Test public void testSubscribers() {
+    this.curInputMsg = null;
+    this.curCollector = null;
+    this.curCoordinator = null;
+    OperatorImpl<TestMessage, TestOutputMessage> opImpl = new 
OperatorImpl<TestMessage, TestOutputMessage>() {
+      @Override protected void onNext(TestMessage message, MessageCollector 
collector, TaskCoordinator coordinator) {
+        TestOperatorImpl.this.curInputMsg = message;
+        TestOperatorImpl.this.curCollector = collector;
+        TestOperatorImpl.this.curCoordinator = coordinator;
+      }
+    };
+    // verify subscribe() added the mockSub and nextProcessors() invoked the 
mockSub.onNext()
+    Subscriber<ProcessorContext<TestOutputMessage>> mockSub = 
mock(Subscriber.class);
+    opImpl.subscribe(mockSub);
+    TestOutputMessage xOutput = mock(TestOutputMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.nextProcessors(xOutput, mockCollector, mockCoordinator);
+    verify(mockSub, times(1)).onNext(argThat(new 
ArgumentMatcher<ProcessorContext<TestOutputMessage>>() {
+      @Override public boolean matches(Object argument) {
+        ProcessorContext<TestOutputMessage> pCntx = 
(ProcessorContext<TestOutputMessage>) argument;
+        return pCntx.getMessage().equals(xOutput) && 
pCntx.getCoordinator().equals(mockCoordinator) && 
pCntx.getCollector().equals(mockCollector);
+      }
+    }));
+    // verify onNext() is invoked correctly
+    TestMessage mockInput = mock(TestMessage.class);
+    ProcessorContext<TestMessage> inCntx = new ProcessorContext<>(mockInput, 
mockCollector, mockCoordinator);
+    opImpl.onNext(inCntx);
+    assertEquals(mockInput, this.curInputMsg);
+    assertEquals(mockCollector, this.curCollector);
+    assertEquals(mockCoordinator, this.curCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
new file mode 100644
index 0000000..224245e
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+
+public class TestProcessorContext {
+  @Test public void testConstructor() {
+    TestMessage mockMsg = mock(TestMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+    ProcessorContext<TestMessage> pCntx = new ProcessorContext<>(mockMsg, 
mockCollector, mockTaskCoordinator);
+    assertEquals(pCntx.getMessage(), mockMsg);
+    assertEquals(pCntx.getCollector(), mockCollector);
+    assertEquals(pCntx.getCoordinator(), mockTaskCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
new file mode 100644
index 0000000..de029ea
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestSimpleOperatorImpl {
+
+  @Test public void testSimpleOperator() {
+    StreamOperator<TestMessage, TestOutputMessage> mockOp = 
mock(StreamOperator.class);
+    Function<TestMessage, Collection<TestOutputMessage>> txfmFn = 
mock(Function.class);
+    when(mockOp.getFunction()).thenReturn(txfmFn);
+
+    SimpleOperatorImpl<TestMessage, TestOutputMessage> opImpl = spy(new 
SimpleOperatorImpl<>(mockOp));
+    TestMessage inMsg = mock(TestMessage.class);
+    TestOutputMessage outMsg = mock(TestOutputMessage.class);
+    Collection<TestOutputMessage> mockOutputs = new ArrayList() { {
+        this.add(outMsg);
+      } };
+    when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.onNext(inMsg, mockCollector, mockCoordinator);
+    verify(txfmFn, times(1)).apply(inMsg);
+    verify(opImpl, times(1)).nextProcessors(outMsg, mockCollector, 
mockCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
new file mode 100644
index 0000000..cdac3fc
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestSinkOperatorImpl {
+
+  @Test public void testSinkOperator() {
+    SinkOperator<TestOutputMessage> sinkOp = mock(SinkOperator.class);
+    MessageStream.VoidFunction3<TestOutputMessage, MessageCollector, 
TaskCoordinator> sinkFn = mock(
+        MessageStream.VoidFunction3.class);
+    when(sinkOp.getFunction()).thenReturn(sinkFn);
+    SinkOperatorImpl<TestOutputMessage> sinkImpl = new 
SinkOperatorImpl<>(sinkOp);
+    TestOutputMessage mockMsg = mock(TestOutputMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+
+    sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator);
+    verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
new file mode 100644
index 0000000..5ede757
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+
+public class TestStateStoreImpl {
+  @Test public void testStateStoreImpl() {
+    StoreFunctions<TestMessage, String, WindowState> mockStoreFunctions = 
mock(StoreFunctions.class);
+    // test constructor
+    StateStoreImpl<TestMessage, String, WindowState> storeImpl = new 
StateStoreImpl<>(mockStoreFunctions, "myStoreName");
+    TaskContext mockContext = mock(TaskContext.class);
+    KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class);
+    when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore);
+    // test init()
+    storeImpl.init(mockContext);
+    verify(mockContext, times(1)).getStore("myStoreName");
+    Function<TestMessage, String> wndKeyFn = mock(Function.class);
+    when(mockStoreFunctions.getStoreKeyFinder()).thenReturn(wndKeyFn);
+    TestMessage mockMsg = mock(TestMessage.class);
+    when(wndKeyFn.apply(mockMsg)).thenReturn("myKey");
+    WindowState mockState = mock(WindowState.class);
+    when(mockKvStore.get("myKey")).thenReturn(mockState);
+    // test getState()
+    Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg);
+    assertEquals(storeEntry.getKey(), "myKey");
+    assertEquals(storeEntry.getValue(), mockState);
+    verify(wndKeyFn, times(1)).apply(mockMsg);
+    verify(mockKvStore, times(1)).get("myKey");
+    Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState);
+    WindowState mockNewState = mock(WindowState.class);
+    BiFunction<TestMessage, WindowState, WindowState> mockUpdaterFn = 
mock(BiFunction.class);
+    when(mockStoreFunctions.getStateUpdater()).thenReturn(mockUpdaterFn);
+    when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState);
+    // test updateState()
+    Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, 
oldEntry);
+    assertEquals(newEntry.getKey(), "myKey");
+    assertEquals(newEntry.getValue(), mockNewState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
new file mode 100644
index 0000000..719ab99
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.window;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+import java.lang.reflect.Field;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+
+public class TestSessionWindowImpl {
+  Field wndStoreField = null;
+  Field sessWndField = null;
+
+  @Before public void prep() throws NoSuchFieldException {
+    wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore");
+    sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
+    wndStoreField.setAccessible(true);
+    sessWndField.setAccessible(true);
+  }
+
+  @Test public void testConstructor() throws IllegalAccessException, 
NoSuchFieldException {
+    // test constructing a SessionWindowImpl w/ expected mock functions
+    WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
+    SessionWindowImpl<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
+    assertEquals(wndOp, sessWndField.get(sessWnd));
+  }
+
+  @Test public void testInitAndProcess() throws IllegalAccessException {
+    WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
+    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, 
WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
+    SessionWindowImpl<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
+
+    // construct and init the SessionWindowImpl object
+    MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
+    StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = 
mock(StoreFunctions.class);
+    Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
+    when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
+    when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
+    when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
+    when(wndOp.getFunction()).thenReturn(mockTxfmFn);
+    TaskContext mockContext = mock(TaskContext.class);
+    KeyValueStore<String, WindowState<Integer>> mockKvStore = 
mock(KeyValueStore.class);
+    when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
+    sessWnd.init(mockInputStrm, mockContext);
+
+    // test onNext() method. Make sure the transformation function and the 
state update functions are invoked.
+    TestMessage mockMsg = mock(TestMessage.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    BiFunction<TestMessage, WindowState<Integer>, WindowState<Integer>> 
stateUpdaterFn = mock(BiFunction.class);
+    when(mockStoreFns.getStateUpdater()).thenReturn(stateUpdaterFn);
+    WindowState<Integer> mockNewState = mock(WindowState.class);
+    WindowState<Integer> oldState = mock(WindowState.class);
+    when(mockKvStore.get("test-msg-key")).thenReturn(oldState);
+    when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState);
+    sessWnd.onNext(mockMsg, mockCollector, mockCoordinator);
+    verify(mockTxfmFn, times(1)).apply(argThat(new 
ArgumentMatcher<TestMessage>() {
+      @Override public boolean matches(Object argument) {
+        TestMessage xIn = (TestMessage) argument;
+        return xIn.equals(mockMsg);
+      }
+    }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() {
+      @Override public boolean matches(Object argument) {
+        Entry<String, WindowState<Integer>> xIn = (Entry<String, 
WindowState<Integer>>) argument;
+        return xIn.getKey().equals("test-msg-key") && 
xIn.getValue().equals(oldState);
+      }
+    }));
+    verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState);
+    verify(mockKvStore, times(1)).put("test-msg-key", mockNewState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
new file mode 100644
index 0000000..d1b0a88
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collection;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class BroadcastOperatorTask implements StreamOperatorTask {
+  class MessageType {
+    String field1;
+    String field2;
+    String field3;
+    String field4;
+    String parKey;
+    private long timestamp;
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  class JsonMessage extends InputJsonSystemMessage<MessageType> {
+
+    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+      super(key, data, offset, timestamp, partition);
+    }
+  }
+
+  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
+    sources.forEach(source -> {
+        MessageStream<JsonMessage> inputStream = 
source.map(this::getInputMessage);
+
+        inputStream.filter(this::myFilter1).
+          window(Windows.<JsonMessage, String>intoSessionCounter(
+              m -> String.format("%s-%s", m.getMessage().field1, 
m.getMessage().field2)).
+            setTriggers(TriggerBuilder.<JsonMessage, 
Integer>earlyTriggerWhenExceedWndLen(100).
+              addLateTriggerOnSizeLimit(10).
+              addTimeoutSinceLastMessage(30000)));
+
+        inputStream.filter(this::myFilter2).
+          window(Windows.<JsonMessage, String>intoSessions(
+              m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4)).
+            setTriggers(TriggerBuilder.<JsonMessage, 
Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
+              addTimeoutSinceLastMessage(30000)));
+
+        inputStream.filter(this::myFilter3).
+          window(Windows.<JsonMessage, String, MessageType>intoSessions(
+              m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4), m -> m.getMessage()).
+            setTriggers(TriggerBuilder.<JsonMessage, 
Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
+              addTimeoutSinceFirstMessage(60000)));
+      }
+    );
+  }
+
+  JsonMessage getInputMessage(IncomingSystemMessage m1) {
+    return (JsonMessage) m1.getMessage();
+  }
+
+  boolean myFilter1(JsonMessage m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key1");
+  }
+
+  boolean myFilter2(JsonMessage m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key2");
+  }
+
+  boolean myFilter3(JsonMessage m1) {
+    return m1.getMessage().parKey.equals("key3");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
 
b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
new file mode 100644
index 0000000..88aa159
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.operators.data.InputSystemMessage;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Example input message w/ Json message body and string as the key.
+ */
+
+public class InputJsonSystemMessage<T> implements Message<String, T>, 
InputSystemMessage<Offset> {
+
+  private final String key;
+  private final T data;
+  private final Offset offset;
+  private final long timestamp;
+  private final SystemStreamPartition partition;
+
+  InputJsonSystemMessage(String key, T data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+    this.key = key;
+    this.data = data;
+    this.offset = offset;
+    this.timestamp = timestamp;
+    this.partition = partition;
+  }
+
+  @Override public T getMessage() {
+    return this.data;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override public Offset getOffset() {
+    return this.offset;
+  }
+
+  @Override public SystemStreamPartition getSystemStreamPartition() {
+    return this.partition;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
new file mode 100644
index 0000000..f6b3ff8
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class JoinOperatorTask implements StreamOperatorTask {
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessage extends InputJsonSystemMessage<MessageType> {
+
+    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+      super(key, data, offset, timestamp, partition);
+    }
+  }
+
+  MessageStream<JsonMessage> joinOutput = null;
+
+  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
+    sources.forEach(source -> {
+        MessageStream<JsonMessage> newSource = 
source.map(this::getInputMessage);
+        if (joinOutput == null) {
+          joinOutput = newSource;
+        } else {
+          joinOutput = joinOutput.join(newSource, (m1, m2) -> 
this.myJoinResult(m1, m2));
+        }
+      });
+  }
+
+  private JsonMessage getInputMessage(IncomingSystemMessage ism) {
+    return new JsonMessage(
+        ((MessageType) ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getTimestamp(),
+        ism.getSystemStreamPartition());
+  }
+
+  JsonMessage myJoinResult(JsonMessage m1, JsonMessage m2) {
+    MessageType newJoinMsg = new MessageType();
+    newJoinMsg.joinKey = m1.getKey();
+    newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+    newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+    return new JsonMessage(m1.getMessage().joinKey, newJoinMsg, null, 
m1.getTimestamp(), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
new file mode 100644
index 0000000..47d6b3a
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.operators.task.StreamOperatorTask;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.Partition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+
+public class TestStreamOperatorAdaptorTask {
+  Field userTaskField = null;
+  Field chainedOpsField = null;
+
+  @Before public void prep() throws NoSuchFieldException {
+    userTaskField = 
StreamOperatorAdaptorTask.class.getDeclaredField("userTask");
+    chainedOpsField = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    userTaskField.setAccessible(true);
+    chainedOpsField.setAccessible(true);
+  }
+
+
+  @Test public void testConstructor() throws IllegalAccessException {
+    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
+    StreamOperatorTask taskMemberVar = (StreamOperatorTask) 
userTaskField.get(adaptorTask);
+    Map<SystemStreamPartition, ChainedOperators> chainsMap = 
(Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask);
+    assertEquals(taskMemberVar, userTask);
+    assertTrue(chainsMap.isEmpty());
+  }
+
+  @Test public void testInit() throws Exception {
+    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    Set<SystemStreamPartition> testInputs = new HashSet() { {
+        this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(0)));
+        this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(1)));
+      } };
+    when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs);
+    adaptorTask.init(mockConfig, mockContext);
+    verify(userTask, times(1)).initOperators(Mockito.anyCollection());
+    Map<SystemStreamPartition, ChainedOperators> chainsMap = 
(Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask);
+    assertTrue(chainsMap.size() == 2);
+    assertTrue(chainsMap.containsKey(testInputs.toArray()[0]));
+    assertTrue(chainsMap.containsKey(testInputs.toArray()[1]));
+  }
+
+  // TODO: window and process methods to be added after implementation of 
ChainedOperators.create()
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
new file mode 100644
index 0000000..44efa6d
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link org.apache.samza.operators.task.StreamOperatorTask}
+ */
+public class TestStreamOperatorTasks {
+
+  private final WindowOperatorTask userTask = new WindowOperatorTask();
+
+  private final BroadcastOperatorTask splitTask = new BroadcastOperatorTask();
+
+  private final JoinOperatorTask joinTask = new JoinOperatorTask();
+
+  private final Set<SystemStreamPartition> inputPartitions = new 
HashSet<SystemStreamPartition>() { {
+      for (int i = 0; i < 4; i++) {
+        this.add(new SystemStreamPartition("my-system", "my-topic1", new 
Partition(i)));
+      }
+    } };
+
+  @Test public void testUserTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.userTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
+        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(pipelineMap.get(partition));
+      });
+  }
+
+  @Test public void testSplitTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.splitTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
+        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(pipelineMap.get(partition));
+      });
+  }
+
+  @Test public void testJoinTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.joinTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
+        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(pipelineMap.get(partition));
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
new file mode 100644
index 0000000..de7bba5
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collection;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class WindowOperatorTask implements StreamOperatorTask {
+  class MessageType {
+    String field1;
+    String field2;
+  }
+
+  class JsonMessage extends InputJsonSystemMessage<MessageType> {
+
+    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
+      super(key, data, offset, timestamp, partition);
+    }
+  }
+
+  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
+    sources.forEach(source ->
+      source.map(m1 ->
+        new JsonMessage(
+          this.myMessageKeyFunction(m1),
+          (MessageType) m1.getMessage(),
+          m1.getOffset(),
+          m1.getTimestamp(),
+          m1.getSystemStreamPartition())).
+        window(
+          Windows.<JsonMessage, String>intoSessionCounter(
+              m -> String.format("%s-%s", m.getMessage().field1, 
m.getMessage().field2)).
+            setTriggers(TriggerBuilder.<JsonMessage, 
Integer>earlyTriggerWhenExceedWndLen(100).
+              addTimeoutSinceLastMessage(30000)))
+    );
+  }
+
+  String myMessageKeyFunction(Message<Object, Object> m) {
+    return m.getKey().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e5f30f3/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 6ea62b4..813882c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,8 +20,9 @@ include \
   'samza-api',
   'samza-elasticsearch',
   'samza-log4j',
-  'samza-shell',
-  'samza-rest'
+  'samza-operator',
+  'samza-rest',
+  'samza-shell'
 
 def scalaModules = [
         'samza-core',

Reply via email to