http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index efa6a96..c77914e 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,88 +18,60 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.data.Message;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
-import org.reactivestreams.Processor;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
 
 import java.util.HashSet;
 import java.util.Set;
 
 
 /**
- * Abstract base class for all stream operator implementation classes.
+ * Abstract base class for all stream operator implementations.
  */
-public abstract class OperatorImpl<M extends Message, RM extends Message>
-    implements Processor<ProcessorContext<M>, ProcessorContext<RM>> {
+public abstract class OperatorImpl<M extends MessageEnvelope, RM extends 
MessageEnvelope> {
 
-  private final Set<Subscriber<? super ProcessorContext<RM>>> subscribers = 
new HashSet<>();
-
-  @Override public void subscribe(Subscriber<? super ProcessorContext<RM>> s) {
-    // Only add once
-    subscribers.add(s);
-  }
-
-  @Override public void onSubscribe(Subscription s) {
-
-  }
-
-  @Override public void onNext(ProcessorContext<M> o) {
-
-    onNext(o.getMessage(), o.getCollector(), o.getCoordinator());
-  }
-
-  @Override public void onError(Throwable t) {
-
-  }
-
-  @Override public void onComplete() {
+  private final Set<OperatorImpl<RM, ? extends MessageEnvelope>> nextOperators 
= new HashSet<>();
 
+  /**
+   * Register the next operator in the chain that this operator should 
propagate its output to.
+   * @param nextOperator  the next operator in the chain.
+   */
+  void registerNextOperator(OperatorImpl<RM, ? extends MessageEnvelope> 
nextOperator) {
+    nextOperators.add(nextOperator);
   }
 
   /**
-   * Default method for timer event
+   * Initialize the initial state for stateful operators.
    *
-   * @param nanoTime  the system nano-second when the timer event is triggered
-   * @param collector  the {@link MessageCollector} in the context
-   * @param coordinator  the {@link TaskCoordinator} in the context
+   * @param source  the source that this {@link OperatorImpl} operator is 
registered with
+   * @param context  the task context to initialize the operator implementation
    */
-  public void onTimer(long nanoTime, MessageCollector collector, 
TaskCoordinator coordinator) {
-    this.subscribers.forEach(sub -> ((OperatorImpl) sub).onTimer(nanoTime, 
collector, coordinator));
-  }
+  public void init(MessageStream<M> source, TaskContext context) {}
 
   /**
-   * Each sub-class will implement this method to actually perform the 
transformation and call the downstream subscribers.
+   * Perform the transformation required for this operator and call the 
downstream operators.
+   *
+   * Must call {@link #propagateResult} to propage the output to registered 
downstream operators correctly.
    *
-   * @param message  the input {@link Message}
+   * @param message  the input {@link MessageEnvelope}
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
-  protected abstract void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator);
+  public abstract void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator);
 
   /**
-   * Stateful operators will need to override this method to initialize the 
operators
+   * Helper method to propagate the output of this operator to all registered 
downstream operators.
    *
-   * @param source  the source that this {@link OperatorImpl} object subscribe 
to
-   * @param context  the task context to initialize the operators within
-   */
-  protected void init(MessageStream<M> source, TaskContext context) {};
-
-  /**
-   * Method to trigger all downstream operators that consumes the output 
{@link MessageStream}
-   * from this operator
+   * This method <b>must</b> be called from {@link #onNext} to propagate the 
operator output correctly.
    *
-   * @param omsg  output {@link Message}
+   * @param outputMessage  output {@link MessageEnvelope}
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
-  protected void nextProcessors(RM omsg, MessageCollector collector, 
TaskCoordinator coordinator) {
-    subscribers.forEach(sub ->
-      sub.onNext(new ProcessorContext<>(omsg, collector, coordinator))
-    );
+  void propagateResult(RM outputMessage, MessageCollector collector, 
TaskCoordinator coordinator) {
+    nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, 
coordinator));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
new file mode 100644
index 0000000..79446be
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.task.TaskContext;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link 
OperatorSpec}s for a
+ * {@link MessageStreamImpl}
+ */
+public class OperatorImpls {
+
+  /**
+   * Holds the mapping between the {@link OperatorSpec} and {@link 
OperatorImpl}s instances.
+   */
+  private static final Map<OperatorSpec, OperatorImpl> OPERATOR_IMPLS = new 
ConcurrentHashMap<>();
+
+  /**
+   * Traverses the DAG of {@link OperatorSpec}s starting from the provided 
{@link MessageStreamImpl},
+   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its 
root {@link RootOperatorImpl} node.
+   *
+   * @param source  the input {@link MessageStreamImpl} to instantiate {@link 
OperatorImpl}s for
+   * @param <M>  the type of {@link MessageEnvelope}s in the {@code source} 
{@link MessageStream}
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  root node for the {@link OperatorImpl} DAG
+   */
+  public static <M extends MessageEnvelope> RootOperatorImpl 
createOperatorImpls(MessageStreamImpl<M> source, TaskContext context) {
+    // since the source message stream might have multiple operator specs 
registered on it,
+    // create a new root node as a single point of entry for the DAG.
+    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+    // create the pipeline/topology starting from the source
+    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
+        // pass in the source and context s.t. stateful stream operators can 
initialize their stores
+        OperatorImpl<M, ? extends MessageEnvelope> operatorImpl =
+            createAndRegisterOperatorImpl(registeredOperator, source, context);
+        rootOperator.registerNextOperator(operatorImpl);
+      });
+    return rootOperator;
+  }
+
+  /**
+   * Helper method to recursively traverse the {@link OperatorSpec} DAG and 
instantiate and link the corresponding
+   * {@link OperatorImpl}s.
+   *
+   * @param operatorSpec  the operatorSpec registered with the {@code source}
+   * @param source  the source {@link MessageStreamImpl}
+   * @param context  the context of the task
+   * @return  the operator implementation for the operatorSpec
+   */
+  private static <M extends MessageEnvelope> OperatorImpl<M, ? extends 
MessageEnvelope> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
+      MessageStream source, TaskContext context) {
+    if (!OPERATOR_IMPLS.containsKey(operatorSpec)) {
+      OperatorImpl<M, ? extends MessageEnvelope> operatorImpl = 
createOperatorImpl(operatorSpec);
+      if (OPERATOR_IMPLS.putIfAbsent(operatorSpec, operatorImpl) == null) {
+        // this is the first time we've added the operatorImpl corresponding 
to the operatorSpec,
+        // so traverse and initialize and register the rest of the DAG.
+        MessageStream<? extends MessageEnvelope> outStream = 
operatorSpec.getOutputStream();
+        Collection<OperatorSpec> registeredSpecs = ((MessageStreamImpl) 
outStream).getRegisteredOperatorSpecs();
+        registeredSpecs.forEach(registeredSpec -> {
+            OperatorImpl subImpl = 
createAndRegisterOperatorImpl(registeredSpec, outStream, context);
+            operatorImpl.registerNextOperator(subImpl);
+          });
+        operatorImpl.init(source, context);
+        return operatorImpl;
+      }
+    }
+
+    // the implementation corresponding to operatorSpec has already been 
instantiated
+    // and registered, so we do not need to traverse the DAG further.
+    return OPERATOR_IMPLS.get(operatorSpec);
+  }
+
+  /**
+   * Creates a new {@link OperatorImpl} instance for the provided {@link 
OperatorSpec}.
+   *
+   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
+   * @param <M>  type of input {@link MessageEnvelope}
+   * @return  the {@link OperatorImpl} implementation instance
+   */
+  protected static <M extends MessageEnvelope> OperatorImpl<M, ? extends 
MessageEnvelope> createOperatorImpl(OperatorSpec operatorSpec) {
+    if (operatorSpec instanceof StreamOperatorSpec) {
+      return new StreamOperatorImpl<>((StreamOperatorSpec<M, ? extends 
MessageEnvelope>) operatorSpec);
+    } else if (operatorSpec instanceof SinkOperatorSpec) {
+      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec);
+    } else if (operatorSpec instanceof WindowOperatorSpec) {
+      return new SessionWindowOperatorImpl<>((WindowOperatorSpec<M, ?, ? 
extends WindowState, ? extends WindowOutput>) operatorSpec);
+    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) 
operatorSpec);
+    }
+    throw new IllegalArgumentException(
+        String.format("Unsupported OperatorSpec: %s", 
operatorSpec.getClass().getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
new file mode 100644
index 0000000..90569b4
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a {@link PartialJoinOperatorSpec}. This class implements 
function
+ * that only takes in one input stream among all inputs to the join and 
generate the join output.
+ *
+ * @param <M>  type of {@link MessageEnvelope}s in the input stream
+ * @param <JM>  type of {@link MessageEnvelope}s in the stream to join with
+ * @param <RM>  type of {@link MessageEnvelope}s in the joined stream
+ */
+class PartialJoinOperatorImpl<M extends MessageEnvelope<K, ?>, K, JM extends 
MessageEnvelope<K, ?>, RM extends MessageEnvelope>
+    extends OperatorImpl<M, RM> {
+
+  PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp) {
+    // TODO: implement PartialJoinOperatorImpl constructor
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    // TODO: implement PartialJoinOperatorImpl processing logic
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
deleted file mode 100644
index cc7ef2b..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Wrapper class to be used by {@link OperatorImpl}
- *
- * @param <M>  Type of input stream {@link Message}
- */
-public class ProcessorContext<M extends Message> {
-  private final M message;
-  private final MessageCollector collector;
-  private final TaskCoordinator coordinator;
-
-  ProcessorContext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
-    this.message = message;
-    this.collector = collector;
-    this.coordinator = coordinator;
-  }
-
-  M getMessage() {
-    return this.message;
-  }
-
-  MessageCollector getCollector() {
-    return this.collector;
-  }
-
-  TaskCoordinator getCoordinator() {
-    return this.coordinator;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
new file mode 100644
index 0000000..7132b86
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * A no-op operator implementation that forwards incoming {@link 
MessageEnvelope}s to all of its subscribers.
+ * @param <M>  type of incoming {@link MessageEnvelope}s
+ */
+final class RootOperatorImpl<M extends MessageEnvelope> extends 
OperatorImpl<M, M> {
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    this.propagateResult(message, collector, coordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
new file mode 100644
index 0000000..e8a635c
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StateStoreImpl;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Default implementation class of a {@link WindowOperatorSpec} for a session 
window.
+ *
+ * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <RK>  the type of window key
+ * @param <WS>  the type of window state
+ * @param <RM>  the type of aggregated value of the window
+ */
+class SessionWindowOperatorImpl<M extends MessageEnvelope, RK, WS extends 
WindowState, RM extends WindowOutput<RK, ?>>
+    extends OperatorImpl<M, RM> {
+
+  private final WindowOperatorSpec<M, RK, WS, RM> windowSpec;
+  private StateStoreImpl<M, RK, WS> stateStore = null;
+
+  SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WS, RM> windowSpec) {
+    this.windowSpec = windowSpec;
+  }
+
+  @Override
+  public void init(MessageStream<M> source, TaskContext context) {
+    this.stateStore = new StateStoreImpl<>(this.windowSpec.getStoreFns(), 
windowSpec.getStoreName(source));
+    this.stateStore.init(context);
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    Entry<RK, WS> state = this.stateStore.getState(message);
+    this.propagateResult(this.windowSpec.getTransformFn().apply(message, 
state), collector, coordinator);
+    this.stateStore.updateState(message, state);
+  }
+
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
+    // This is to periodically check the timeout triggers to get the list of 
window states to be updated
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
deleted file mode 100644
index b0f4f27..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.StreamOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.Collection;
-import java.util.function.Function;
-
-
-/**
- * Base class for all implementation of operators
- *
- * @param <M>  type of message in the input stream
- * @param <RM>  type of message in the output stream
- */
-public class SimpleOperatorImpl<M extends Message, RM extends Message> extends 
OperatorImpl<M, RM> {
-
-  private final Function<M, Collection<RM>> transformFn;
-
-  SimpleOperatorImpl(StreamOperator<M, RM> op) {
-    super();
-    this.transformFn = op.getFunction();
-  }
-
-  @Override protected void onNext(M imsg, MessageCollector collector, 
TaskCoordinator coordinator) {
-    // actually calling the transform function and then for each output, call 
nextProcessors()
-    this.transformFn.apply(imsg).forEach(r -> this.nextProcessors(r, 
collector, coordinator));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index a8a639e..abed03f 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,24 +18,26 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.internal.Operators.SinkOperator;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * Implementation for {@link SinkOperator}
+ * Implementation for {@link SinkOperatorSpec}
  */
-public class SinkOperatorImpl<M extends Message> extends OperatorImpl<M, 
Message> {
-  private final MessageStream.VoidFunction3<M, MessageCollector, 
TaskCoordinator> sinkFunc;
+class SinkOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, 
MessageEnvelope> {
 
-  SinkOperatorImpl(SinkOperator<M> sinkOp) {
-    this.sinkFunc = sinkOp.getFunction();
+  private final SinkFunction<M> sinkFn;
+
+  SinkOperatorImpl(SinkOperatorSpec<M> sinkOp) {
+    this.sinkFn = sinkOp.getSinkFn();
   }
 
-  @Override protected void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
-    this.sinkFunc.apply(message, collector, coordinator);
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    this.sinkFn.apply(message, collector, coordinator);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
deleted file mode 100644
index 7840b5b..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.StoreFunctions;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * The base class for all state stores
- */
-public class StateStoreImpl<M extends Message, SK, SS> {
-  private final String storeName;
-  private final StoreFunctions<M, SK, SS> storeFunctions;
-  private KeyValueStore<SK, SS> kvStore = null;
-
-  public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) {
-    this.storeFunctions = store;
-    this.storeName = storeName;
-  }
-
-  public void init(TaskContext context) {
-    this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName);
-  }
-
-  public Entry<SK, SS> getState(M m) {
-    SK key = this.storeFunctions.getStoreKeyFinder().apply(m);
-    SS state = this.kvStore.get(key);
-    return new Entry<>(key, state);
-  }
-
-  public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) {
-    SS newValue = this.storeFunctions.getStateUpdater().apply(m, 
oldEntry.getValue());
-    this.kvStore.put(oldEntry.getKey(), newValue);
-    return new Entry<>(oldEntry.getKey(), newValue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
new file mode 100644
index 0000000..3a5c56e
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * A StreamOperator that accepts a 1:n transform function and applies it to 
each incoming {@link MessageEnvelope}.
+ *
+ * @param <M>  type of {@link MessageEnvelope} in the input stream
+ * @param <RM>  type of {@link MessageEnvelope} in the output stream
+ */
+class StreamOperatorImpl<M extends MessageEnvelope, RM extends 
MessageEnvelope> extends OperatorImpl<M, RM> {
+
+  private final FlatMapFunction<M, RM> transformFn;
+
+  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec) {
+    this.transformFn = streamOperatorSpec.getTransformFn();
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+    // call the transform function and then for each output call 
propagateResult()
+    this.transformFn.apply(message).forEach(r -> this.propagateResult(r, 
collector, coordinator));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
deleted file mode 100644
index 4238d45..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.join;
-
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Implementation of a {@link PartialJoinOperator}. This class implements 
function
- * that only takes in one input stream among all inputs to the join and 
generate the join output.
- *
- * @param <M>  Type of input stream {@link Message}
- * @param <RM>  Type of join output stream {@link Message}
- */
-public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends 
Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> {
-
-  public PartialJoinOpImpl(PartialJoinOperator<M, K, JM, RM> joinOp) {
-    // TODO: implement PartialJoinOpImpl constructor
-  }
-
-  @Override protected void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
-    // TODO: implement PartialJoinOpImpl processing logic
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
deleted file mode 100644
index 0d6141e..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.window;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.WindowOperator;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.operators.impl.StateStoreImpl;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Default implementation class of a {@link WindowOperator} for a session 
window.
- *
- * @param <M>  the type of input {@link Message}
- * @param <RK>  the type of window key
- * @param <WS>  the type of window state
- * @param <RM>  the type of aggregated value of the window
- */
-public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, 
RM extends WindowOutput<RK, ?>> extends
-    OperatorImpl<M, RM> {
-  private final WindowOperator<M, RK, WS, RM> sessWnd;
-  private StateStoreImpl<M, RK, WS> wndStore = null;
-
-  public SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd) {
-    this.sessWnd = sessWnd;
-  }
-
-  @Override protected void onNext(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
-    Entry<RK, WS> state = this.wndStore.getState(message);
-    this.nextProcessors(this.sessWnd.getFunction().apply(message, state), 
collector, coordinator);
-    this.wndStore.updateState(message, state);
-  }
-
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
-    // This is to periodically check the timeout triggers to get the list of 
window states to be updated
-  }
-
-  @Override protected void init(MessageStream<M> source, TaskContext context) {
-    this.wndStore = new StateStoreImpl<>(this.sessWnd.getStoreFunctions(), 
sessWnd.getStoreName(source));
-    this.wndStore.init(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
new file mode 100644
index 0000000..8b75cdc
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+/**
+ * A stateless serializable stream operator specification that holds all the 
information required
+ * to transform the input {@link MessageStream} and produce the output {@link 
MessageStream}.
+ */
+public interface OperatorSpec<OM extends MessageEnvelope> {
+
+  /**
+   * Get the output stream containing transformed {@link MessageEnvelope} 
produced by this operator.
+   * @return  the output stream containing transformed {@link MessageEnvelope} 
produced by this operator.
+   */
+  MessageStream<OM> getOutputStream();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
new file mode 100644
index 0000000..f622b34
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+
+/**
+ * Factory methods for creating {@link OperatorSpec} instances.
+ */
+public class OperatorSpecs {
+
+  private OperatorSpecs() {}
+
+  private static String getOperatorId() {
+    // TODO: need to change the IDs to be a consistent, durable IDs that can 
be recovered across container and job restarts
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Creates a {@link StreamOperatorSpec}.
+   *
+   * @param transformFn  the transformation function
+   * @param <M>  type of input {@link MessageEnvelope}
+   * @param <OM>  type of output {@link MessageEnvelope}
+   * @return  the {@link StreamOperatorSpec}
+   */
+  public static <M extends MessageEnvelope, OM extends MessageEnvelope> 
StreamOperatorSpec<M, OM> createStreamOperator(
+      FlatMapFunction<M, OM> transformFn) {
+    return new StreamOperatorSpec<>(transformFn);
+  }
+
+  /**
+   * Creates a {@link SinkOperatorSpec}.
+   *
+   * @param sinkFn  the sink function
+   * @param <M>  type of input {@link MessageEnvelope}
+   * @return  the {@link SinkOperatorSpec}
+   */
+  public static <M extends MessageEnvelope> SinkOperatorSpec<M> 
createSinkOperator(SinkFunction<M> sinkFn) {
+    return new SinkOperatorSpec<>(sinkFn);
+  }
+
+  /**
+   * Creates a {@link WindowOperatorSpec}.
+   *
+   * @param windowFn  the {@link WindowFn} function
+   * @param <M>  type of input {@link MessageEnvelope}
+   * @param <WK>  type of window key
+   * @param <WS>  type of {@link WindowState}
+   * @param <WM>  type of output {@link WindowOutput}
+   * @return  the {@link WindowOperatorSpec}
+   */
+  public static <M extends MessageEnvelope, WK, WS extends WindowState, WM 
extends WindowOutput<WK, ?>> WindowOperatorSpec<M, WK, WS, WM> 
createWindowOperator(
+      WindowFn<M, WK, WS, WM> windowFn) {
+    return new WindowOperatorSpec<>(windowFn, OperatorSpecs.getOperatorId());
+  }
+
+  /**
+   * Creates a {@link PartialJoinOperatorSpec}.
+   *
+   * @param partialJoinFn  the join function
+   * @param joinOutput  the output {@link MessageStreamImpl}
+   * @param <M>  type of input {@link MessageEnvelope}
+   * @param <K>  type of join key
+   * @param <JM>  the type of {@link MessageEnvelope} in the other join stream
+   * @param <OM>  the type of {@link MessageEnvelope} in the join output
+   * @return  the {@link PartialJoinOperatorSpec}
+   */
+  public static <M extends MessageEnvelope<K, ?>, K, JM extends 
MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, 
K, JM, OM> createPartialJoinOperator(
+      BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) {
+    return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, 
OperatorSpecs.getOperatorId());
+  }
+
+  /**
+   * Creates a {@link StreamOperatorSpec} with a merger function.
+   *
+   * @param mergeOutput  the output {@link MessageStreamImpl} from the merger
+   * @param <M>  the type of input {@link MessageEnvelope}
+   * @return  the {@link StreamOperatorSpec} for the merge
+   */
+  public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> 
createMergeOperator(MessageStreamImpl<M> mergeOutput) {
+    return new StreamOperatorSpec<M, M>(t ->
+      new ArrayList<M>() { {
+          this.add(t);
+        } },
+      mergeOutput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
new file mode 100644
index 0000000..f74f35d
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.windows.StoreFunctions;
+
+import java.util.function.BiFunction;
+
+
+/**
+ * Spec for the partial join operator that takes {@link MessageEnvelope}s from 
one input stream, joins with buffered
+ * {@link MessageEnvelope}s from another stream, and produces join results to 
an output {@link MessageStreamImpl}.
+ *
+ * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <K>  the type of join key
+ * @param <JM>  the type of {@link MessageEnvelope} in the other join stream
+ * @param <RM>  the type of {@link MessageEnvelope} in the join output stream
+ */
+public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM 
extends MessageEnvelope<K, ?>, RM extends MessageEnvelope>
+    implements OperatorSpec<RM> {
+
+  private final MessageStreamImpl<RM> joinOutput;
+
+  /**
+   * The transformation function of {@link PartialJoinOperatorSpec} that takes 
an input {@link MessageEnvelope} of
+   * type {@code M}, joins with a stream of buffered {@link MessageEnvelope}s 
of type {@code JM} from another stream,
+   * and generates a joined result {@link MessageEnvelope} of type {@code RM}.
+   */
+  private final BiFunction<M, JM, RM> transformFn;
+
+  /**
+   * The {@link MessageEnvelope} store functions that read the buffered {@link 
MessageEnvelope}s from the other
+   * stream in the join.
+   */
+  private final StoreFunctions<JM, K, JM> joinStoreFns;
+
+  /**
+   * The {@link MessageEnvelope} store functions that save the buffered {@link 
MessageEnvelope} of this
+   * {@link MessageStreamImpl} in the join.
+   */
+  private final StoreFunctions<M, K, M> selfStoreFns;
+
+  /**
+   * The unique ID for this operator.
+   */
+  private final String operatorId;
+
+  /**
+   * Default constructor for a {@link PartialJoinOperatorSpec}.
+   *
+   * @param partialJoinFn  partial join function that take type {@code M} of 
input {@link MessageEnvelope} and join
+   *                       w/ type {@code JM} of buffered {@link 
MessageEnvelope} from another stream
+   * @param joinOutput  the output {@link MessageStreamImpl} of the join 
results
+   */
+  PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, 
MessageStreamImpl<RM> joinOutput, String operatorId) {
+    this.joinOutput = joinOutput;
+    this.transformFn = partialJoinFn;
+    // Read-only join store, no creator/updater functions required.
+    this.joinStoreFns = new StoreFunctions<>(m -> m.getKey(), null);
+    // Buffered message envelope store for this input stream.
+    this.selfStoreFns = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m);
+    this.operatorId = operatorId;
+  }
+
+  @Override
+  public String toString() {
+    return this.operatorId;
+  }
+
+  @Override
+  public MessageStreamImpl<RM> getOutputStream() {
+    return this.joinOutput;
+  }
+
+  public StoreFunctions<JM, K, JM> getJoinStoreFns() {
+    return this.joinStoreFns;
+  }
+
+  public StoreFunctions<M, K, M> getSelfStoreFns() {
+    return this.selfStoreFns;
+  }
+
+  public BiFunction<M, JM, RM> getTransformFn() {
+    return this.transformFn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
new file mode 100644
index 0000000..4348bc0
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.MessageStreamImpl;
+
+
+/**
+ * The spec for a sink operator that accepts user-defined logic to output a 
{@link MessageStreamImpl} to an external
+ * system. This is a terminal operator and does allows further operator 
chaining.
+ *
+ * @param <M>  the type of input {@link MessageEnvelope}
+ */
+public class SinkOperatorSpec<M extends MessageEnvelope> implements 
OperatorSpec {
+
+  /**
+   * The user-defined sink function
+   */
+  private final SinkFunction<M> sinkFn;
+
+  /**
+   * Default constructor for a {@link SinkOperatorSpec}.
+   *
+   * @param sinkFn  a user defined {@link SinkFunction} that will be called 
with the output {@link MessageEnvelope},
+   *                the output {@link org.apache.samza.task.MessageCollector} 
and the
+   *                {@link org.apache.samza.task.TaskCoordinator}.
+   */
+  SinkOperatorSpec(SinkFunction<M> sinkFn) {
+    this.sinkFn = sinkFn;
+  }
+
+  /**
+   * This is a terminal operator and doesn't allow further operator chaining.
+   * @return  null
+   */
+  @Override
+  public MessageStreamImpl getOutputStream() {
+    return null;
+  }
+
+  public SinkFunction<M> getSinkFn() {
+    return this.sinkFn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
new file mode 100644
index 0000000..ed18da4
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.MessageStreamImpl;
+
+
+/**
+ * The spec for a linear stream operator that outputs 0 or more {@link 
MessageEnvelope}s for each input {@link MessageEnvelope}.
+ *
+ * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <OM>  the type of output {@link MessageEnvelope}
+ */
+public class StreamOperatorSpec<M extends MessageEnvelope, OM extends 
MessageEnvelope> implements OperatorSpec<OM> {
+
+  private final MessageStreamImpl<OM> outputStream;
+
+  private final FlatMapFunction<M, OM> transformFn;
+
+  /**
+   * Default constructor for a {@link StreamOperatorSpec}.
+   *
+   * @param transformFn  the transformation function that transforms each 
input {@link MessageEnvelope} into a collection
+   *                     of output {@link MessageEnvelope}s
+   */
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn) {
+    this(transformFn, new MessageStreamImpl<>());
+  }
+
+  /**
+   * Constructor for a {@link StreamOperatorSpec} that accepts an output 
{@link MessageStreamImpl}.
+   *
+   * @param transformFn  the transformation function
+   * @param outputStream  the output {@link MessageStreamImpl}
+   */
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> 
outputStream) {
+    this.outputStream = outputStream;
+    this.transformFn = transformFn;
+  }
+
+  @Override
+  public MessageStreamImpl<OM> getOutputStream() {
+    return this.outputStream;
+  }
+
+  public FlatMapFunction<M, OM> getTransformFn() {
+    return this.transformFn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
new file mode 100644
index 0000000..2f5b1e7
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.operators.windows.Trigger;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.function.BiFunction;
+
+
+/**
+ * Defines a window operator that takes one {@link MessageStreamImpl} as an 
input, accumulates the window state,
+ * and generates an output {@link MessageStreamImpl} with output type {@code 
WM} which extends {@link WindowOutput}
+ *
+ * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <WK>  the type of key in the output {@link MessageEnvelope} from the 
{@link WindowOperatorSpec} function
+ * @param <WS>  the type of window state in the {@link WindowOperatorSpec} 
function
+ * @param <WM>  the type of window output {@link MessageEnvelope}
+ */
+public class WindowOperatorSpec<M extends MessageEnvelope, WK, WS extends 
WindowState, WM extends WindowOutput<WK, ?>> implements
+    OperatorSpec<WM> {
+
+  /**
+   * The output {@link MessageStream}.
+   */
+  private final MessageStreamImpl<WM> outputStream;
+
+  /**
+   * The window transformation function that takes {@link MessageEnvelope}s 
from one input stream, aggregates with the window
+   * state(s) from the window state store, and generate output {@link 
MessageEnvelope}s for the output stream.
+   */
+  private final BiFunction<M, Entry<WK, WS>, WM> transformFn;
+
+  /**
+   * The state store functions for the {@link WindowOperatorSpec}.
+   */
+  private final StoreFunctions<M, WK, WS> storeFns;
+
+  /**
+   * The window trigger.
+   */
+  private final Trigger<M, WS> trigger;
+
+  /**
+   * The unique ID of this operator.
+   */
+  private final String operatorId;
+
+  /**
+   * Constructor for {@link WindowOperatorSpec}.
+   *
+   * @param windowFn  the window function
+   * @param operatorId  auto-generated unique ID of this operator
+   */
+  WindowOperatorSpec(WindowFn<M, WK, WS, WM> windowFn, String operatorId) {
+    this.outputStream = new MessageStreamImpl<>();
+    this.transformFn = windowFn.getTransformFn();
+    this.storeFns = windowFn.getStoreFns();
+    this.trigger = windowFn.getTrigger();
+    this.operatorId = operatorId;
+  }
+
+  @Override
+  public String toString() {
+    return this.operatorId;
+  }
+
+  @Override
+  public MessageStreamImpl<WM> getOutputStream() {
+    return this.outputStream;
+  }
+
+  public StoreFunctions<M, WK, WS> getStoreFns() {
+    return this.storeFns;
+  }
+
+  public BiFunction<M, Entry<WK, WS>, WM> getTransformFn() {
+    return this.transformFn;
+  }
+
+  public Trigger<M, WS> getTrigger() {
+    return this.trigger;
+  }
+
+  /**
+   * Method to generate the window operator's state store name
+   * TODO HIGH pmaheshw: should this be here?
+   *
+   * @param inputStream the input {@link MessageStreamImpl} to this state store
+   * @return   the persistent store name of the window operator
+   */
+  public String getStoreName(MessageStream<M> inputStream) {
+    //TODO: need to get the persistent name of ds and the operator in a 
serialized form
+    return String.format("input-%s-wndop-%s", inputStream.toString(), 
this.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
 
b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
deleted file mode 100644
index c2f780d..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreams;
-import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.data.IncomingSystemMessage;
-import org.apache.samza.operators.impl.ChainedOperators;
-import org.apache.samza.operators.task.StreamOperatorTask;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * An adaptor task class that invoke the user-implemented (@link 
StreamOperatorTask}s via {@link org.apache.samza.operators.MessageStream} 
programming APIs
- *
- */
-public final class StreamOperatorAdaptorTask implements StreamTask, 
InitableTask, WindowableTask {
-  /**
-   * A map with entries mapping {@link SystemStreamPartition} to {@link 
org.apache.samza.operators.impl.ChainedOperators} that takes the {@link 
SystemStreamPartition}
-   * as the input stream
-   */
-  private final Map<SystemStreamPartition, ChainedOperators> operatorChains = 
new HashMap<>();
-
-  /**
-   * Wrapped {@link StreamOperatorTask} class
-   */
-  private final StreamOperatorTask userTask;
-
-  /**
-   * Constructor that wraps the user-defined {@link StreamOperatorTask}
-   *
-   * @param userTask  the user-defined {@link StreamOperatorTask}
-   */
-  public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
-    this.userTask = userTask;
-  }
-
-  @Override
-  public final void init(Config config, TaskContext context) throws Exception {
-    if (this.userTask instanceof InitableTask) {
-      ((InitableTask) this.userTask).init(config, context);
-    }
-    Map<SystemStreamPartition, SystemMessageStream> sources = new HashMap<>();
-    context.getSystemStreamPartitions().forEach(ssp -> {
-        SystemMessageStream ds = MessageStreams.input(ssp);
-        sources.put(ssp, ds);
-      });
-    this.userTask.initOperators(sources.values());
-    sources.forEach((ssp, ds) -> operatorChains.put(ssp, 
ChainedOperators.create(ds, context)));
-  }
-
-  @Override
-  public final void process(IncomingMessageEnvelope ime, MessageCollector 
collector, TaskCoordinator coordinator) {
-    this.operatorChains.get(ime.getSystemStreamPartition()).onNext(new 
IncomingSystemMessage(ime), collector, coordinator);
-  }
-
-  @Override
-  public final void window(MessageCollector collector, TaskCoordinator 
coordinator) throws Exception {
-    this.operatorChains.forEach((ssp, chain) -> chain.onTimer(collector, 
coordinator));
-    if (this.userTask instanceof WindowableTask) {
-      ((WindowableTask) this.userTask).window(collector, coordinator);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java 
b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
new file mode 100644
index 0000000..e45d068
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.TriggerBuilder;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class BroadcastTask implements StreamOperatorTask {
+  class MessageType {
+    String field1;
+    String field2;
+    String field3;
+    String field4;
+    String parKey;
+    private long timestamp;
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  class JsonMessageEnvelope extends 
JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, 
SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override
+  public void transform(Map<SystemStreamPartition, 
MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
+    messageStreams.values().forEach(entry -> {
+        MessageStream<JsonMessageEnvelope> inputStream = 
entry.map(this::getInputMessage);
+
+        inputStream.filter(this::myFilter1).
+          window(Windows.<JsonMessageEnvelope, String>intoSessionCounter(
+              m -> String.format("%s-%s", m.getMessage().field1, 
m.getMessage().field2)).
+            setTriggers(TriggerBuilder.<JsonMessageEnvelope, 
Integer>earlyTriggerWhenExceedWndLen(100).
+              addLateTriggerOnSizeLimit(10).
+              addTimeoutSinceLastMessage(30000)));
+
+        inputStream.filter(this::myFilter2).
+          window(Windows.<JsonMessageEnvelope, String>intoSessions(
+              m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4)).
+            setTriggers(TriggerBuilder.<JsonMessageEnvelope, 
Collection<JsonMessageEnvelope>>earlyTriggerWhenExceedWndLen(100).
+              addTimeoutSinceLastMessage(30000)));
+
+        inputStream.filter(this::myFilter3).
+          window(Windows.<JsonMessageEnvelope, String, 
MessageType>intoSessions(
+              m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4), m -> m.getMessage()).
+            setTriggers(TriggerBuilder.<JsonMessageEnvelope, 
Collection<MessageType>>earlyTriggerOnEventTime(m -> 
m.getMessage().getTimestamp(), 30000).
+              addTimeoutSinceFirstMessage(60000)));
+      });
+  }
+
+  JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope m1) {
+    return (JsonMessageEnvelope) m1.getMessage();
+  }
+
+  boolean myFilter1(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key1");
+  }
+
+  boolean myFilter2(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key2");
+  }
+
+  boolean myFilter3(JsonMessageEnvelope m1) {
+    return m1.getMessage().parKey.equals("key3");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java 
b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
new file mode 100644
index 0000000..1b10609
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class JoinTask implements StreamOperatorTask {
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessageEnvelope extends 
JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, 
SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  MessageStream<JsonMessageEnvelope> joinOutput = null;
+
+  @Override
+  public void transform(Map<SystemStreamPartition, 
MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
+    messageStreams.values().forEach(messageStream -> {
+        MessageStream<JsonMessageEnvelope> newSource = 
messageStream.map(this::getInputMessage);
+        if (joinOutput == null) {
+          joinOutput = newSource;
+        } else {
+          joinOutput = joinOutput.join(newSource, (m1, m2) -> 
this.myJoinResult(m1, m2));
+        }
+      });
+  }
+
+  private JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope 
ism) {
+    return new JsonMessageEnvelope(
+        ((MessageType) ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getSystemStreamPartition());
+  }
+
+  JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope 
m2) {
+    MessageType newJoinMsg = new MessageType();
+    newJoinMsg.joinKey = m1.getKey();
+    newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+    newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+    return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, 
null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
new file mode 100644
index 0000000..61bb32a
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestFluentStreamAdaptorTask {
+  Field userTaskField = null;
+  Field operatorChainsField = null;
+
+  @Before
+  public void prep() throws NoSuchFieldException {
+    userTaskField = 
StreamOperatorAdaptorTask.class.getDeclaredField("userTask");
+    operatorChainsField = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    userTaskField.setAccessible(true);
+    operatorChainsField.setAccessible(true);
+  }
+
+  @Test
+  public void testConstructor() throws IllegalAccessException {
+    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
+    StreamOperatorTask taskMemberVar = (StreamOperatorTask) 
userTaskField.get(adaptorTask);
+    Map<SystemStreamPartition, OperatorImpl> chainsMap = 
(Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
+    assertEquals(taskMemberVar, userTask);
+    assertTrue(chainsMap.isEmpty());
+  }
+
+  @Test
+  public void testInit() throws Exception {
+    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    Set<SystemStreamPartition> testInputs = new HashSet() { {
+        this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(0)));
+        this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(1)));
+      } };
+    when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs);
+    adaptorTask.init(mockConfig, mockContext);
+    verify(userTask, times(1)).transform(Mockito.anyMap());
+    Map<SystemStreamPartition, OperatorImpl> chainsMap = 
(Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
+    assertTrue(chainsMap.size() == 2);
+    assertTrue(chainsMap.containsKey(testInputs.toArray()[0]));
+    assertTrue(chainsMap.containsKey(testInputs.toArray()[1]));
+  }
+
+  // TODO: window and process methods to be added after implementation of 
ChainedOperators.create()
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
new file mode 100644
index 0000000..d804bf8
--- /dev/null
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestFluentStreamTasks {
+
+  private final WindowTask userTask = new WindowTask();
+
+  private final BroadcastTask splitTask = new BroadcastTask();
+
+  private final JoinTask joinTask = new JoinTask();
+
+  private final Set<SystemStreamPartition> inputPartitions = new 
HashSet<SystemStreamPartition>() { {
+      for (int i = 0; i < 4; i++) {
+        this.add(new SystemStreamPartition("my-system", "my-topic1", new 
Partition(i)));
+      }
+    } };
+
+  @Test
+  public void testUserTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.userTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, OperatorImpl> pipelineMap =
+        (Map<SystemStreamPartition, OperatorImpl>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(pipelineMap.get(partition));
+      });
+  }
+
+  @Test
+  public void testSplitTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.splitTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, OperatorImpl> pipelineMap =
+        (Map<SystemStreamPartition, OperatorImpl>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(pipelineMap.get(partition));
+      });
+  }
+
+  @Test
+  public void testJoinTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.joinTask);
+    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
+    pipelineMapFld.setAccessible(true);
+    Map<SystemStreamPartition, OperatorImpl> pipelineMap =
+        (Map<SystemStreamPartition, OperatorImpl>) 
pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    assertEquals(pipelineMap.size(), 4);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(pipelineMap.get(partition));
+      });
+  }
+
+
+}

Reply via email to