SAMZA-1054: Refactor Operator APIs

Some suggestions for an Operator API refactor and misc. cleanup. It does 
contain some implementation changes, mostly due to deleted, extracted or merged 
classes. (e.g. OperatorFactory + ChainedOperators == OperatorImpls).

Since git marked several moved classes as (delete + new) instead, it's probably 
best to apply the diff locally and  browse the code in an IDE.

Some of the changes, in no particular order:
* Extracted XFunction interfaces into a .functions package in -api.
* -api's internal.Operators is now the -operators's spec.* package. Extracted 
interfaces and classes. Factory methods are now in OperatorSpecs.
* -api's MessageStreams is now -api's MessageStream interface and -operators's 
MessageStreamImpl.
* -api's internal.Windows classes are now in -api's .window package. Extracted 
interfaces and classes, but no implementation changes.
* OperatorFactory + ChainedOperators is now OperatorImpls, which is used from 
StreamOperatorAdaptorTask.
* Added a NoOpOperatorImpl, which acts as the root node for the OperatorImpl 
DAG returned by OperatorImpls.
* Removed usages of reactivestreams APIs since current code looks simpler 
without them. We can add them back when we need features like backpressure etc.
* Removed the InputSystemMessage interface.
* Made field names consistent (e.g Fn suffix for functions everywhere etc.).
* Some method/class visibility changes due to moved classes.
* General documentation changes, mostly to make public APIs clearer.

There are additional questions/tasks that we can address in future RBs:
* Updating Window and Trigger APIs.
* Merging samza-operator into samza-core.
* Questions about Message timestamp and Offset comparison semantics.
* Questions about OperatorSpec serialization (e.g. ID generation).
* Questions about StateStoreImpl and StoreFunctions.

Author: Prateek Maheshwari <pmahe...@linkedin.com>

Reviewers: Yi Pan <nickpa...@gmail.com>, Jagadish <jagadish1...@gmail.com>

Closes #25 from prateekm/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/00543804
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/00543804
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/00543804

Branch: refs/heads/master
Commit: 00543804b3c32f1cbea0212e4a94e360b5a324cc
Parents: a980c96
Author: Prateek Maheshwari <pmahe...@linkedin.com>
Authored: Thu Dec 1 14:50:52 2016 -0800
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Thu Dec 1 14:50:52 2016 -0800

----------------------------------------------------------------------
 .../apache/samza/operators/MessageStream.java   | 197 +++-----
 .../apache/samza/operators/MessageStreams.java  |  81 ----
 .../samza/operators/StreamOperatorTask.java     |  51 ++
 .../apache/samza/operators/TriggerBuilder.java  | 323 -------------
 .../org/apache/samza/operators/WindowState.java |  81 ----
 .../org/apache/samza/operators/Windows.java     | 203 --------
 .../operators/data/IncomingSystemMessage.java   |  76 ---
 .../data/IncomingSystemMessageEnvelope.java     |  63 +++
 .../operators/data/InputSystemMessage.java      |  45 --
 .../apache/samza/operators/data/Message.java    |  64 ---
 .../samza/operators/data/MessageEnvelope.java   |  54 +++
 .../operators/functions/FilterFunction.java     |  40 ++
 .../operators/functions/FlatMapFunction.java    |  44 ++
 .../samza/operators/functions/JoinFunction.java |  44 ++
 .../samza/operators/functions/MapFunction.java  |  41 ++
 .../samza/operators/functions/SinkFunction.java |  46 ++
 .../samza/operators/internal/Operators.java     | 469 -------------------
 .../samza/operators/internal/Trigger.java       |  95 ----
 .../samza/operators/internal/WindowFn.java      |  60 ---
 .../samza/operators/internal/WindowOutput.java  |  55 ---
 .../operators/task/StreamOperatorTask.java      |  45 --
 .../samza/operators/windows/SessionWindow.java  | 102 ++++
 .../samza/operators/windows/StoreFunctions.java |  67 +++
 .../apache/samza/operators/windows/Trigger.java |  94 ++++
 .../samza/operators/windows/TriggerBuilder.java | 320 +++++++++++++
 .../apache/samza/operators/windows/Window.java  |  49 ++
 .../samza/operators/windows/WindowFn.java       |  59 +++
 .../samza/operators/windows/WindowOutput.java   |  51 ++
 .../samza/operators/windows/WindowState.java    |  85 ++++
 .../apache/samza/operators/windows/Windows.java | 100 ++++
 .../org/apache/samza/operators/TestMessage.java |  47 --
 .../samza/operators/TestMessageEnvelope.java    |  61 +++
 .../samza/operators/TestMessageStream.java      | 180 -------
 .../samza/operators/TestMessageStreams.java     |  35 --
 .../samza/operators/TestOutputMessage.java      |  47 --
 .../operators/TestOutputMessageEnvelope.java    |  43 ++
 .../samza/operators/TestTriggerBuilder.java     | 214 ---------
 .../org/apache/samza/operators/TestWindows.java | 106 -----
 .../data/TestIncomingSystemMessage.java         |   5 +-
 .../samza/operators/data/TestLongOffset.java    |   9 +-
 .../samza/operators/internal/TestOperators.java | 128 -----
 .../samza/operators/internal/TestTrigger.java   |  68 ---
 .../operators/internal/TestWindowOutput.java    |  36 --
 .../samza/operators/windows/TestTrigger.java    |  68 +++
 .../operators/windows/TestTriggerBuilder.java   | 226 +++++++++
 .../operators/windows/TestWindowOutput.java     |  36 ++
 .../samza/operators/windows/TestWindows.java    | 109 +++++
 .../samza/operators/MessageStreamImpl.java      | 134 ++++++
 .../apache/samza/operators/StateStoreImpl.java  |  56 +++
 .../operators/StreamOperatorAdaptorTask.java    | 105 +++++
 .../samza/operators/impl/ChainedOperators.java  | 119 -----
 .../samza/operators/impl/OperatorFactory.java   |  85 ----
 .../samza/operators/impl/OperatorImpl.java      |  76 +--
 .../samza/operators/impl/OperatorImpls.java     | 125 +++++
 .../operators/impl/PartialJoinOperatorImpl.java |  46 ++
 .../samza/operators/impl/ProcessorContext.java  |  53 ---
 .../samza/operators/impl/RootOperatorImpl.java  |  36 ++
 .../impl/SessionWindowOperatorImpl.java         |  67 +++
 .../operators/impl/SimpleOperatorImpl.java      |  49 --
 .../samza/operators/impl/SinkOperatorImpl.java  |  22 +-
 .../samza/operators/impl/StateStoreImpl.java    |  56 ---
 .../operators/impl/StreamOperatorImpl.java      |  47 ++
 .../operators/impl/join/PartialJoinOpImpl.java  |  44 --
 .../impl/window/SessionWindowImpl.java          |  65 ---
 .../samza/operators/spec/OperatorSpec.java      |  37 ++
 .../samza/operators/spec/OperatorSpecs.java     | 116 +++++
 .../operators/spec/PartialJoinOperatorSpec.java | 104 ++++
 .../samza/operators/spec/SinkOperatorSpec.java  |  62 +++
 .../operators/spec/StreamOperatorSpec.java      |  67 +++
 .../operators/spec/WindowOperatorSpec.java      | 119 +++++
 .../samza/task/StreamOperatorAdaptorTask.java   |  85 ----
 .../apache/samza/operators/BroadcastTask.java   | 101 ++++
 .../org/apache/samza/operators/JoinTask.java    |  77 +++
 .../operators/TestFluentStreamAdaptorTask.java  |  85 ++++
 .../samza/operators/TestFluentStreamTasks.java  | 112 +++++
 .../samza/operators/TestMessageStreamImpl.java  | 203 ++++++++
 .../samza/operators/TestStateStoreImpl.java     |  72 +++
 .../org/apache/samza/operators/WindowTask.java  |  70 +++
 .../data/JsonIncomingSystemMessageEnvelope.java |  60 +++
 .../operators/impl/TestChainedOperators.java    | 129 -----
 .../operators/impl/TestOperatorFactory.java     |  93 ----
 .../samza/operators/impl/TestOperatorImpl.java  |  48 +-
 .../samza/operators/impl/TestOperatorImpls.java | 183 ++++++++
 .../operators/impl/TestProcessorContext.java    |  40 --
 .../operators/impl/TestSessionWindowImpl.java   | 111 +++++
 .../operators/impl/TestSimpleOperatorImpl.java  |  55 ---
 .../operators/impl/TestSinkOperatorImpl.java    |  25 +-
 .../operators/impl/TestStateStoreImpl.java      |  69 ---
 .../operators/impl/TestStreamOperatorImpl.java  |  60 +++
 .../impl/window/TestSessionWindowImpl.java      | 105 -----
 .../samza/operators/spec/TestOperatorSpecs.java | 114 +++++
 .../samza/task/BroadcastOperatorTask.java       | 102 ----
 .../samza/task/InputJsonSystemMessage.java      |  67 ---
 .../org/apache/samza/task/JoinOperatorTask.java |  80 ----
 .../task/TestStreamOperatorAdaptorTask.java     |  80 ----
 .../samza/task/TestStreamOperatorTasks.java     | 105 -----
 .../apache/samza/task/WindowOperatorTask.java   |  71 ---
 97 files changed, 4374 insertions(+), 4240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index dede631..d18536b 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -16,173 +16,112 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators;
-import org.apache.samza.operators.internal.Operators.Operator;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 
 /**
- * This class defines either the input or output streams to/from the 
operators. Users use the API methods defined here to
- * directly program the stream processing stages that processes a stream and 
generate another one.
+ * Represents a stream of {@link MessageEnvelope}s.
+ * <p>
+ * A {@link MessageStream} can be transformed into another {@link 
MessageStream} by applying the transforms in this API.
  *
- * @param <M>  Type of message in this stream
+ * @param <M>  type of {@link MessageEnvelope}s in this stream
  */
 @InterfaceStability.Unstable
-public class MessageStream<M extends Message> {
-
-  private final Set<Operator> subscribers = new HashSet<>();
-
-  /**
-   * Helper method to get the corresponding list of subscribers to a specific 
{@link MessageStream}.
-   *
-   * NOTE: This is purely an internal API and should not be used directly by 
programmers.
-   *
-   * @return A unmodifiable set containing all {@link Operator}s that 
subscribe to this {@link MessageStream} object
-   */
-  public Collection<Operator> getSubscribers() {
-    return Collections.unmodifiableSet(this.subscribers);
-  }
-
-  /**
-   * Public API methods start here
-   */
+public interface MessageStream<M extends MessageEnvelope> {
 
   /**
-   * Defines a function API that takes three input parameters w/ types {@code 
A}, {@code B}, and {@code C} and w/o a return value
+   * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s 
in this {@link MessageStream} and returns the
+   * transformed {@link MessageStream}.
    *
-   * @param <A>  the type of input {@code a}
-   * @param <B>  the type of input {@code b}
-   * @param <C>  the type of input {@code c}
+   * @param mapFn  the function to transform a {@link MessageEnvelope} to 
another {@link MessageEnvelope}
+   * @param <TM>  the type of {@link MessageEnvelope}s in the transformed 
{@link MessageStream}
+   * @return the transformed {@link MessageStream}
    */
-  @FunctionalInterface
-  public interface VoidFunction3<A, B, C> {
-    public void apply(A a, B b, C c);
-  }
+  <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn);
 
   /**
-   * Method to apply a map function (1:1) on a {@link MessageStream}
+   * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link 
MessageEnvelope} in this {@link MessageStream}
+   * to n {@link MessageEnvelope}s in the transformed {@link MessageStream}
    *
-   * @param mapper  the mapper function to map one input {@link Message} to 
one output {@link Message}
-   * @param <OM>  the type of the output {@link Message} in the output {@link 
MessageStream}
-   * @return the output {@link MessageStream} by applying the map function on 
the input {@link MessageStream}
+   * @param flatMapFn  the function to transform a {@link MessageEnvelope} to 
zero or more {@link MessageEnvelope}s
+   * @param <TM>  the type of {@link MessageEnvelope}s in the transformed 
{@link MessageStream}
+   * @return the transformed {@link MessageStream}
    */
-  public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
-    Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new 
ArrayList<OM>() { {
-        OM r = mapper.apply(m);
-        if (r != null) {
-          this.add(r);
-        }
-      } });
-    this.subscribers.add(op);
-    return op.getOutputStream();
-  }
+  <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, 
TM> flatMapFn);
 
   /**
-   * Method to apply a flatMap function (1:n) on a {@link MessageStream}
+   * Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s 
in this {@link MessageStream} and returns the
+   * transformed {@link MessageStream}.
+   * <p>
+   * The {@link FilterFunction} is a predicate which determines whether a 
{@link MessageEnvelope} in this {@link MessageStream}
+   * should be retained in the transformed {@link MessageStream}.
    *
-   * @param flatMapper  the flat mapper function to map one input {@link 
Message} to zero or more output {@link Message}s
-   * @param <OM>  the type of the output {@link Message} in the output {@link 
MessageStream}
-   * @return the output {@link MessageStream} by applying the map function on 
the input {@link MessageStream}
+   * @param filterFn  the predicate to filter {@link MessageEnvelope}s from 
this {@link MessageStream}
+   * @return the transformed {@link MessageStream}
    */
-  public <OM extends Message> MessageStream<OM> flatMap(Function<M, 
Collection<OM>> flatMapper) {
-    Operator<OM> op = Operators.getStreamOperator(flatMapper);
-    this.subscribers.add(op);
-    return op.getOutputStream();
-  }
+  MessageStream<M> filter(FilterFunction<M> filterFn);
 
   /**
-   * Method to apply a filter function on a {@link MessageStream}
+   * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to 
an output
+   * {@link org.apache.samza.system.SystemStream} using the provided {@link 
SinkFunction}.
    *
-   * @param filter  the filter function to filter input {@link Message}s from 
the input {@link MessageStream}
-   * @return the output {@link MessageStream} after applying the filter 
function on the input {@link MessageStream}
+   * @param sinkFn  the function to send {@link MessageEnvelope}s in this 
stream to output systems
    */
-  public MessageStream<M> filter(Function<M, Boolean> filter) {
-    Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() 
{ {
-        if (filter.apply(t)) {
-          this.add(t);
-        }
-      } });
-    this.subscribers.add(op);
-    return op.getOutputStream();
-  }
+  void sink(SinkFunction<M> sinkFn);
 
   /**
-   * Method to send an input {@link MessageStream} to an output {@link 
org.apache.samza.system.SystemStream}, and allows the output {@link 
MessageStream}
-   * to be consumed by downstream stream operators again.
+   * Groups the {@link MessageEnvelope}s in this {@link MessageStream} 
according to the provided {@link Window} semantics
+   * (e.g. tumbling, sliding or session windows) and returns the transformed 
{@link MessageStream} of
+   * {@link WindowOutput}s.
+   * <p>
+   * Use the {@link org.apache.samza.operators.windows.Windows} helper methods 
to create the appropriate windows.
    *
-   * @param sink  the user-defined sink function to send the input {@link 
Message}s to the external output systems
+   * @param window  the {@link Window} to group and process {@link 
MessageEnvelope}s from this {@link MessageStream}
+   * @param <WK>  the type of key in the {@link WindowOutput} from the {@link 
Window}
+   * @param <WV>  the type of value in the {@link WindowOutput} from the 
{@link Window}
+   * @param <WS>  the type of window state kept in the {@link Window}
+   * @param <WM>  the type of {@link WindowOutput} in the transformed {@link 
MessageStream}
+   * @return  the transformed {@link MessageStream}
    */
-  public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
-    this.subscribers.add(Operators.getSinkOperator(sink));
-  }
+  <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> 
MessageStream<WM> window(
+      Window<M, WK, WV, WM> window);
 
   /**
-   * Method to perform a window function (i.e. a group-by, aggregate function) 
on a {@link MessageStream}
+   * Joins this {@link MessageStream} with another {@link MessageStream} using 
the provided pairwise {@link JoinFunction}.
+   * <p>
+   * We currently only support 2-way joins.
    *
-   * @param window  the window function to group and aggregate the input 
{@link Message}s from the input {@link MessageStream}
-   * @param <WK>  the type of key in the output {@link Message} from the 
{@link Windows.Window} function
-   * @param <WV>  the type of output value from
-   * @param <WS>  the type of window state kept in the {@link Windows.Window} 
function
-   * @param <WM>  the type of {@link 
org.apache.samza.operators.internal.WindowOutput} message from the {@link 
Windows.Window} function
-   * @return the output {@link MessageStream} after applying the window 
function on the input {@link MessageStream}
-   */
-  public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> 
MessageStream<WM> window(Windows.Window<M, WK, WV, WM> window) {
-    Operator<WM> wndOp = 
Operators.getWindowOperator(Windows.getInternalWindowFn(window));
-    this.subscribers.add(wndOp);
-    return wndOp.getOutputStream();
-  }
-
-  /**
-   * Method to add an input {@link MessageStream} to a join function. Note 
that we currently only support 2-way joins.
-   *
-   * @param other  the other stream to be joined w/
-   * @param merger  the common function to merge messages from this {@link 
MessageStream} and {@code other}
+   * @param otherStream  the other {@link MessageStream} to be joined with
+   * @param joinFn  the function to join {@link MessageEnvelope}s from this 
and the other {@link MessageStream}
    * @param <K>  the type of join key
-   * @param <JM>  the type of message in the {@link Message} from the other 
join stream
-   * @param <RM>  the type of message in the {@link Message} from the join 
function
-   * @return the output {@link MessageStream} from the join function {@code 
joiner}
+   * @param <OM>  the type of {@link MessageEnvelope}s in the other stream
+   * @param <RM>  the type of {@link MessageEnvelope}s resulting from the 
{@code joinFn}
+   * @return  the joined {@link MessageStream}
    */
-  public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> 
join(MessageStream<JM> other,
-      BiFunction<M, JM, RM> merger) {
-    MessageStream<RM> outputStream = new MessageStream<>();
-
-    BiFunction<M, JM, RM> parJoin1 = merger::apply;
-    BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m);
-
-    // TODO: need to add default store functions for the two partial join 
functions
-
-    other.subscribers.add(Operators.<JM, K, M, 
RM>getPartialJoinOperator(parJoin2, outputStream));
-    this.subscribers.add(Operators.<M, K, JM, 
RM>getPartialJoinOperator(parJoin1, outputStream));
-    return outputStream;
-  }
+  <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> 
MessageStream<RM> join(MessageStream<OM> otherStream,
+      JoinFunction<M, OM, RM> joinFn);
 
   /**
-   * Method to merge all {@code others} streams w/ this {@link MessageStream}. 
The merging streams must have the same type {@code M}
+   * Merge all {@code otherStreams} with this {@link MessageStream}.
+   * <p>
+   * The merging streams must have the same {@link MessageEnvelope} type 
{@code M}.
    *
-   * @param others  other streams to be merged w/ this one
-   * @return  the merged output stream
+   * @param otherStreams  other {@link MessageStream}s to be merged with this 
{@link MessageStream}
+   * @return  the merged {@link MessageStream}
    */
-  public MessageStream<M> merge(Collection<MessageStream<M>> others) {
-    MessageStream<M> outputStream = new MessageStream<>();
-
-    others.add(this);
-    others.forEach(other -> 
other.subscribers.add(Operators.getMergeOperator(outputStream)));
-    return outputStream;
-  }
-
+  MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
+  
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java
deleted file mode 100644
index 51bf482..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.IncomingSystemMessage;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This class defines all methods to create a {@link MessageStream} object. 
Users can use this to create an {@link MessageStream}
- * from a specific input source.
- *
- */
-@InterfaceStability.Unstable
-public final class MessageStreams {
-
-  /**
-   * private constructor to prevent instantiation
-   */
-  private MessageStreams() {}
-
-  /**
-   * private class for system input/output {@link MessageStream}
-   */
-  public static final class SystemMessageStream extends 
MessageStream<IncomingSystemMessage> {
-    /**
-     * The corresponding {@link org.apache.samza.system.SystemStream}
-     */
-    private final SystemStreamPartition ssp;
-
-    /**
-     * Constructor for input system stream
-     *
-     * @param ssp  the input {@link SystemStreamPartition} for the input 
{@link SystemMessageStream}
-     */
-    private SystemMessageStream(SystemStreamPartition ssp) {
-      this.ssp = ssp;
-    }
-
-    /**
-     * Getter for the {@link SystemStreamPartition} of the input
-     *
-     * @return the input {@link SystemStreamPartition}
-     */
-    public SystemStreamPartition getSystemStreamPartition() {
-      return this.ssp;
-    }
-  }
-
-  /**
-   * Public static API methods start here
-   */
-
-  /**
-   * Static API method to create a {@link MessageStream} from a system input 
stream
-   *
-   * @param ssp  the input {@link SystemStreamPartition}
-   * @return the {@link MessageStream} object takes {@code ssp} as the input
-   */
-  public static SystemMessageStream input(SystemStreamPartition ssp) {
-    return new SystemMessageStream(ssp);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java 
b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
new file mode 100644
index 0000000..16cf27a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+
+/**
+ * A {@link StreamOperatorTask} is the basic interface to implement for 
processing {@link MessageStream}s.
+ * Implementations can describe the transformation steps for each {@link 
MessageStream} in the
+ * {@link #transform} method using {@link MessageStream} APIs.
+ * <p>
+ * Implementations may be augmented by implementing {@link 
org.apache.samza.task.InitableTask},
+ * {@link org.apache.samza.task.WindowableTask} and {@link 
org.apache.samza.task.ClosableTask} interfaces,
+ * but should not implement {@link org.apache.samza.task.StreamTask} or {@link 
org.apache.samza.task.AsyncStreamTask}
+ * interfaces.
+ */
+@InterfaceStability.Unstable
+public interface StreamOperatorTask {
+
+  /**
+   * Describe the transformation steps for each {@link MessageStream}s for 
this task using the
+   * {@link MessageStream} APIs. Each {@link MessageStream} corresponds to one 
{@link SystemStreamPartition}
+   * in the input system.
+   *
+   * @param messageStreams the {@link MessageStream}s that receive {@link 
IncomingSystemMessageEnvelope}s
+   *                       from their corresponding {@link 
org.apache.samza.system.SystemStreamPartition}
+   */
+  void transform(Map<SystemStreamPartition, 
MessageStream<IncomingSystemMessageEnvelope>> messageStreams);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java 
b/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java
deleted file mode 100644
index 5b3f4d0..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Trigger;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines a builder of {@link 
org.apache.samza.operators.internal.Trigger} object for a {@link 
Windows.Window}. The triggers are categorized into
- * three types:
- *
- * <p>
- *   early trigger: defines the condition when the first output from the 
window function is sent.
- *   late trigger: defines the condition when the updated output after the 
first output is sent.
- *   timer trigger: defines a system timeout condition to trigger output if no 
more inputs are received to enable early/late triggers
- * </p>
- *
- * If multiple conditions are defined for a specific type of trigger, the 
aggregated trigger is the disjunction of the each individual trigger (i.e. OR).
- *
- * NOTE: Programmers should not use classes defined in {@link 
org.apache.samza.operators.internal} to create triggers
- *
- *
- * @param <M>  the type of input {@link Message} to the {@link Windows.Window}
- * @param <V>  the type of output value from the {@link Windows.Window}
- */
-@InterfaceStability.Unstable
-public final class TriggerBuilder<M extends Message, V> {
-
-  /**
-   * Predicate helper to OR multiple trigger conditions
-   */
-  static class PredicateHelper {
-    static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, 
BiFunction<M, S, Boolean> rhs) {
-      return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s);
-    }
-
-    static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, 
Boolean> rhs) {
-      return s -> lhs.apply(s) || rhs.apply(s);
-    }
-  }
-
-  /**
-   * The early trigger condition that determines the first output from the 
{@link Windows.Window}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null;
-
-  /**
-   * The late trigger condition that determines the late output(s) from the 
{@link Windows.Window}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null;
-
-  /**
-   * The system timer based trigger conditions that guarantees the {@link 
Windows.Window} proceeds forward
-   */
-  private Function<WindowState<V>, Boolean> timerTrigger = null;
-
-  /**
-   * The state updater function to be applied after the first output is 
triggered
-   */
-  private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = 
Function.identity();
-
-  /**
-   * The state updater function to be applied after the late output is 
triggered
-   */
-  private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = 
Function.identity();
-
-  /**
-   * Helper method to add a trigger condition
-   *
-   * @param currentTrigger  current trigger condition
-   * @param newTrigger  new trigger condition
-   * @return  combined trigger condition that is {@code currentTrigger} OR 
{@code newTrigger}
-   */
-  private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, 
WindowState<V>, Boolean> currentTrigger,
-      BiFunction<M, WindowState<V>, Boolean> newTrigger) {
-    if (currentTrigger == null) {
-      return newTrigger;
-    }
-
-    return PredicateHelper.or(currentTrigger, newTrigger);
-  }
-
-  /**
-   * Helper method to add a system timer trigger
-   *
-   * @param currentTimer  current timer condition
-   * @param newTimer  new timer condition
-   * @return  combined timer condition that is {@code currentTimer} OR {@code 
newTimer}
-   */
-  private Function<WindowState<V>, Boolean> 
addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer,
-      Function<WindowState<V>, Boolean> newTimer) {
-    if (currentTimer == null) {
-      return newTimer;
-    }
-
-    return PredicateHelper.or(currentTimer, newTimer);
-  }
-
-  /**
-   * default constructor to prevent instantiation
-   */
-  private TriggerBuilder() {}
-
-  /**
-   * Constructor that set the size limit as the early trigger for a window
-   *
-   * @param sizeLimit  the number of messages in a window that would trigger 
the first output
-   */
-  private TriggerBuilder(long sizeLimit) {
-    this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit;
-  }
-
-  /**
-   * Constructor that set the event time length as the early trigger
-   *
-   * @param eventTimeFunction  the function that calculate the event time in 
nano-second from the input {@link Message}
-   * @param wndLenMs  the window length in event time in milli-second
-   */
-  private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) {
-    this.earlyTrigger = (m, s) ->
-        TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - 
s.getEarliestEventTimeNs(),
-            eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > 
wndLenMs;
-  }
-
-  /**
-   * Constructor that set the special token message as the early trigger
-   *
-   * @param tokenFunc  the function that checks whether an input {@link 
Message} is a token message that triggers window output
-   */
-  private TriggerBuilder(Function<M, Boolean> tokenFunc) {
-    this.earlyTrigger = (m, s) -> tokenFunc.apply(m);
-  }
-
-  /**
-   * Build method that creates an {@link 
org.apache.samza.operators.internal.Trigger} object based on the trigger 
conditions set in {@link TriggerBuilder}
-   * This is kept package private and only used by {@link Windows} to convert 
the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object
-   *
-   * @return  the final {@link org.apache.samza.operators.internal.Trigger} 
object
-   */
-  Trigger<M, WindowState<V>> build() {
-    return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, 
this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater);
-  }
-
-  /**
-   * Public API methods start here
-   */
-
-
-  /**
-   * API method to allow users to set an update method to update the output 
value after the first window output is triggered
-   * by the early trigger condition
-   *
-   * @param onTriggerFunc  the method to update the output value after the 
early trigger
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) {
-    this.earlyTriggerUpdater = s -> {
-      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
-      return s;
-    };
-    return this;
-  }
-
-  /**
-   * API method to allow users to set an update method to update the output 
value after a late window output is triggered
-   * by the late trigger condition
-   *
-   * @param onTriggerFunc  the method to update the output value after the 
late trigger
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) {
-    this.lateTriggerUpdater = s -> {
-      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
-      return s;
-    };
-    return this;
-  }
-
-  /**
-   * API method to allow users to add a system timer trigger based on timeout 
after the last message received in the window
-   *
-   * @param timeoutMs  the timeout in ms after the last message received in 
the window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) {
-    this.timerTrigger = this.addTimerTrigger(this.timerTrigger,
-        s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 
timeoutMs < System.currentTimeMillis());
-    return this;
-  }
-
-  /**
-   * API method to allow users to add a system timer trigger based on the 
timeout after the first message received in the window
-   *
-   * @param timeoutMs  the timeout in ms after the first message received in 
the window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) {
-    this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s ->
-        TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < 
System.currentTimeMillis());
-    return this;
-  }
-
-  /**
-   * API method allow users to add a late trigger based on the window size 
limit
-   *
-   * @param sizeLimit  limit on the number of messages in window
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) {
-    this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> 
s.getNumberMessages() > sizeLimit);
-    return this;
-  }
-
-  /**
-   * API method to allow users to define a customized late trigger function 
based on input message and the window state
-   *
-   * @param lateTrigger  the late trigger condition based on input {@link 
Message} and the current {@link WindowState}
-   * @return  the {@link TriggerBuilder} object
-   */
-  public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, 
Boolean> lateTrigger) {
-    this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger);
-    return this;
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger 
condition based on window size limit
-   *
-   * @param sizeLimit  window size limit
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> 
earlyTriggerWhenExceedWndLen(long sizeLimit) {
-    return new TriggerBuilder<M, V>(sizeLimit);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger 
condition based on event time window
-   *
-   *
-   * @param eventTimeFunc  the function to get the event time from the input 
message
-   * @param eventTimeWndSizeMs  the event time window size in Ms
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> 
earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long 
eventTimeWndSizeMs) {
-    return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ early trigger 
condition based on token messages
-   *
-   * @param tokenFunc  the function to determine whether an input message is a 
window token or not
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> 
earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) {
-    return new TriggerBuilder<M, V>(tokenFunc);
-  }
-
-  /**
-   * Static API method to allow customized early trigger condition based on 
input {@link Message} and the corresponding {@link WindowState}
-   *
-   * @param earlyTrigger  the user defined early trigger condition
-   * @param <M>   the input message type
-   * @param <V>   the output value from the window
-   * @return   the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> 
earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) {
-    TriggerBuilder<M, V> newTriggers =  new TriggerBuilder<M, V>();
-    newTriggers.earlyTrigger = 
newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger);
-    return newTriggers;
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ system timeout 
after the last message received in the window
-   *
-   * @param timeoutMs  timeout in ms after the last message received
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> 
timeoutSinceLastMessage(long timeoutMs) {
-    return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs);
-  }
-
-  /**
-   * Static API method to create a {@link TriggerBuilder} w/ system timeout 
after the first message received in the window
-   *
-   * @param timeoutMs  timeout in ms after the first message received
-   * @param <M>  the type of input {@link Message}
-   * @param <V>  the type of {@link Windows.Window} output value
-   * @return  the {@link TriggerBuilder} object
-   */
-  public static <M extends Message, V> TriggerBuilder<M, V> 
timeoutSinceFirstMessage(long timeoutMs) {
-    return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/WindowState.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/WindowState.java 
b/samza-api/src/main/java/org/apache/samza/operators/WindowState.java
deleted file mode 100644
index 759f2d8..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/WindowState.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This interface defines the methods a window state class has to implement. 
The programmers are allowed to implement
- * customized window state to be stored in window state stores by implementing 
this interface class.
- *
- * @param <WV>  the type for window output value
- */
-@InterfaceStability.Unstable
-public interface WindowState<WV> {
-  /**
-   * Method to get the system time when the first message in the window is 
received
-   *
-   * @return  nano-second of system time for the first message received in the 
window
-   */
-  long getFirstMessageTimeNs();
-
-  /**
-   * Method to get the system time when the last message in the window is 
received
-   *
-   * @return  nano-second of system time for the last message received in the 
window
-   */
-  long getLastMessageTimeNs();
-
-  /**
-   * Method to get the earliest event time in the window
-   *
-   * @return  the earliest event time in nano-second in the window
-   */
-  long getEarliestEventTimeNs();
-
-  /**
-   * Method to get the latest event time in the window
-   *
-   * @return  the latest event time in nano-second in the window
-   */
-  long getLatestEventTimeNs();
-
-  /**
-   * Method to get the total number of messages received in the window
-   *
-   * @return  number of messages in the window
-   */
-  long getNumberMessages();
-
-  /**
-   * Method to get the corresponding window's output value
-   *
-   * @return  the corresponding window's output value
-   */
-  WV getOutputValue();
-
-  /**
-   * Method to set the corresponding window's output value
-   *
-   * @param value  the corresponding window's output value
-   */
-  void setOutputValue(WV value);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/Windows.java 
b/samza-api/src/main/java/org/apache/samza/operators/Windows.java
deleted file mode 100644
index 6619f41..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/Windows.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators;
-import org.apache.samza.operators.internal.Trigger;
-import org.apache.samza.operators.internal.WindowFn;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines a collection of {@link Window} functions. The public 
classes and methods here are intended to be
- * used by the user (i.e. programmers) to create {@link Window} function 
directly.
- *
- */
-@InterfaceStability.Unstable
-public final class Windows {
-
-  /**
-   * private constructor to prevent instantiation
-   */
-  private Windows() {}
-
-  /**
-   * This class defines a session window function class
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of session key in the session window
-   * @param <WV>  the type of output value in each session window
-   */
-  static class SessionWindow<M extends Message, WK, WV> implements Window<M, 
WK, WV, WindowOutput<WK, WV>> {
-
-    /**
-     * Constructor. Made private s.t. it can only be instantiated via the 
static API methods in {@link Windows}
-     *
-     * @param sessionKeyFunction  function to get the session key from the 
input {@link Message}
-     * @param aggregator  function to calculate the output value based on the 
input {@link Message} and current output value
-     */
-    private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, 
WV, WV> aggregator) {
-      this.wndKeyFunction = sessionKeyFunction;
-      this.aggregator = aggregator;
-    }
-
-    /**
-     * function to calculate the window key from input message
-     */
-    private final Function<M, WK> wndKeyFunction;
-
-    /**
-     * function to calculate the output value from the input message and the 
current output value
-     */
-    private final BiFunction<M, WV, WV> aggregator;
-
-    /**
-     * trigger condition that determines when to send out the output value in 
a {@link WindowOutput} message
-     */
-    private Trigger<M, WindowState<WV>> trigger = null;
-
-    //TODO: need to create a set of {@link StoreFunctions} that is default to 
input {@link Message} type for {@link Window}
-    private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = 
null;
-
-    /**
-     * Public API methods start here
-     */
-
-    /**
-     * Public API method to define the watermark trigger for the window 
operator
-     *
-     * @param wndTrigger {@link Trigger} function defines the watermark 
trigger for this {@link SessionWindow}
-     * @return The window operator w/ the defined watermark trigger
-     */
-    @Override
-    public Window<M, WK, WV, WindowOutput<WK, WV>> 
setTriggers(TriggerBuilder<M, WV> wndTrigger) {
-      this.trigger = wndTrigger.build();
-      return this;
-    }
-
-    private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> 
getTransformFunc() {
-      // TODO: actual implementation of the main session window logic, based 
on the wndKeyFunction, aggregator, and triggers;
-      return null;
-    }
-
-    private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> 
getInternalWindowFn() {
-      return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() {
-
-        @Override public BiFunction<M, Entry<WK, WindowState<WV>>, 
WindowOutput<WK, WV>> getTransformFunc() {
-          return SessionWindow.this.getTransformFunc();
-        }
-
-        @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> 
getStoreFuncs() {
-          return SessionWindow.this.storeFunctions;
-        }
-
-        @Override public Trigger<M, WindowState<WV>> getTrigger() {
-          return SessionWindow.this.trigger;
-        }
-      };
-    }
-  }
-
-  static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends 
WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn(
-      Window<M, WK, WV, WM> window) {
-    if (window instanceof SessionWindow) {
-      SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) 
window;
-      return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn();
-    }
-    throw new IllegalArgumentException("Input window type not supported.");
-  }
-
-  /**
-   * Public static API methods start here
-   *
-   */
-
-  /**
-   * The public programming interface class for window function
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of key to the {@link Window}
-   * @param <WV>  the type of output value in the {@link WindowOutput}
-   * @param <WM>  the type of message in the window output stream
-   */
-  @InterfaceStability.Unstable
-  public interface Window<M extends Message, WK, WV, WM extends 
WindowOutput<WK, WV>> {
-
-    /**
-     * Set the triggers for this {@link Window}
-     *
-     * @param wndTrigger  trigger conditions set by the programmers
-     * @return  the {@link Window} function w/ the trigger {@code wndTrigger}
-     */
-    Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger);
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} in which the output 
value is simply the collection of input messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of input {@link Message}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK> Window<M, WK, Collection<M>, 
WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> 
sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
-        c.add(m);
-        return c;
-      });
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} in which the output 
value is a collection of {@code SI} from the input messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param sessionInfoExtractor  function to retrieve session info of type 
{@code SI} from the input message of type {@code M}
-   * @param <M>  type of the input {@link Message}
-   * @param <WK>  type of the session window key
-   * @param <SI>  type of the session information retrieved from each input 
message of type {@code M}
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, 
WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> 
sessionKeyFunction,
-      Function<M, SI> sessionInfoExtractor) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
-        c.add(sessionInfoExtractor.apply(m));
-        return c;
-      });
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} as a counter of input 
messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of the input {@link Message}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK> Window<M, WK, Integer, 
WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> 
sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java
 
b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java
deleted file mode 100644
index 3c9874d..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This class implements a {@link Message} that encapsulates an {@link 
IncomingMessageEnvelope} from the system
- *
- */
-public class IncomingSystemMessage implements Message<Object, Object>, 
InputSystemMessage<Offset> {
-  /**
-   * Incoming message envelope
-   */
-  private final IncomingMessageEnvelope imsg;
-
-  /**
-   * The receive time of this incoming message
-   */
-  private final long recvTimeNano;
-
-  /**
-   * Ctor to create a {@code IncomingSystemMessage} from {@link 
IncomingMessageEnvelope}
-   *
-   * @param imsg The incoming system message
-   */
-  public IncomingSystemMessage(IncomingMessageEnvelope imsg) {
-    this.imsg = imsg;
-    this.recvTimeNano = System.nanoTime();
-  }
-
-  @Override
-  public Object getMessage() {
-    return this.imsg.getMessage();
-  }
-
-  @Override
-  public Object getKey() {
-    return this.imsg.getKey();
-  }
-
-  @Override
-  public long getTimestamp() {
-    return this.recvTimeNano;
-  }
-
-  @Override
-  public Offset getOffset() {
-    // TODO: need to add offset factory to generate different types of offset. 
This is just a placeholder,
-    // assuming incoming message carries long value as offset (i.e. Kafka case)
-    return new LongOffset(this.imsg.getOffset());
-  }
-
-  @Override
-  public SystemStreamPartition getSystemStreamPartition() {
-    return imsg.getSystemStreamPartition();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
 
b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
new file mode 100644
index 0000000..a65809c
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link MessageEnvelope} that provides additional information about its 
input {@link SystemStreamPartition}
+ * and its {@link Offset} within the {@link SystemStreamPartition}.
+ * <p>
+ * Note: the {@link Offset} is only unique and comparable within its {@link 
SystemStreamPartition}.
+ */
+public class IncomingSystemMessageEnvelope implements MessageEnvelope<Object, 
Object> {
+
+  private final IncomingMessageEnvelope ime;
+
+  /**
+   * Creates an {@code IncomingSystemMessageEnvelope} from the {@link 
IncomingMessageEnvelope}.
+   *
+   * @param ime  the {@link IncomingMessageEnvelope} from the input system.
+   */
+  public IncomingSystemMessageEnvelope(IncomingMessageEnvelope ime) {
+    this.ime = ime;
+  }
+
+  @Override
+  public Object getKey() {
+    return this.ime.getKey();
+  }
+
+  @Override
+  public Object getMessage() {
+    return this.ime.getMessage();
+  }
+
+  public Offset getOffset() {
+    // TODO: need to add offset factory to generate different types of offset. 
This is just a placeholder,
+    // assuming incoming message envelope carries long value as offset (i.e. 
Kafka case)
+    return new LongOffset(this.ime.getOffset());
+  }
+
+  public SystemStreamPartition getSystemStreamPartition() {
+    return this.ime.getSystemStreamPartition();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java
 
b/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java
deleted file mode 100644
index 5c23e74..0000000
--- 
a/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This interface defines additional methods a message from an system input 
should implement, including the methods to
- * get {@link SystemStreamPartition} and the {@link Offset} of the input 
system message.
- */
-@InterfaceStability.Unstable
-public interface InputSystemMessage<O extends Offset> {
-
-  /**
-   * Get the input message's {@link SystemStreamPartition}
-   *
-   * @return  the {@link SystemStreamPartition} this message is coming from
-   */
-  SystemStreamPartition getSystemStreamPartition();
-
-  /**
-   * Get the offset of the message in the input stream. This should be used to 
uniquely identify a message in an input stream.
-   *
-   * @return The offset of the message in the input stream.
-   */
-  O getOffset();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/Message.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/data/Message.java 
b/samza-api/src/main/java/org/apache/samza/operators/data/Message.java
deleted file mode 100644
index 8441682..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/Message.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.data;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This class defines the generic interface of {@link Message}, which is a 
entry in the input/output stream.
- *
- * <p>The {@link Message} models the basic operatible unit in streaming SQL 
processes in Samza.
- *
- */
-@InterfaceStability.Unstable
-public interface Message<K, M> {
-
-  /**
-   * Access method to get the corresponding message body in {@link Message}
-   *
-   * @return Message object in this {@link Message}
-   */
-  M getMessage();
-
-  /**
-   * Method to indicate whether this {@link Message} indicates deletion of a 
message w/ the message key
-   *
-   * @return A boolean value indicates whether the current message is a delete 
or insert message
-   */
-  default boolean isDelete() {
-    return false;
-  };
-
-  /**
-   * Access method to the key of the message
-   *
-   * @return The key of the message
-   */
-  K getKey();
-
-  /**
-   * Get the message creation timestamp of the message.
-   *
-   * @return The message's timestamp in nano seconds.
-   */
-  long getTimestamp();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java 
b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
new file mode 100644
index 0000000..ad64231
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.data;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * An entry in the input/output {@link 
org.apache.samza.operators.MessageStream}s.
+ */
+@InterfaceStability.Unstable
+public interface MessageEnvelope<K, M> {
+
+  /**
+   * Get the key for this {@link MessageEnvelope}.
+   *
+   * @return  the key for this {@link MessageEnvelope}
+   */
+  K getKey();
+
+  /**
+   * Get the message in this {@link MessageEnvelope}.
+   *
+   * @return  the message in this {@link MessageEnvelope}
+   */
+  M getMessage();
+
+  /**
+   * Whether this {@link MessageEnvelope} indicates deletion of a previous 
message with this key.
+   *
+   * @return  true if the current {@link MessageEnvelope} indicates deletion 
of a previous message with this key
+   */
+  default boolean isDelete() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
new file mode 100644
index 0000000..e611cd0
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+/**
+ * A function that specifies whether a {@link MessageEnvelope} should be 
retained for further processing or filtered out.
+ * @param <M>  type of the input {@link MessageEnvelope}
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface FilterFunction<M extends MessageEnvelope> {
+
+  /**
+   * Returns a boolean indicating whether this {@link MessageEnvelope} should 
be retained or filtered out.
+   * @param message  the {@link MessageEnvelope} to be checked
+   * @return  true if {@link MessageEnvelope} should be retained
+   */
+  boolean apply(M message);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
new file mode 100644
index 0000000..dbc0bd9
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.util.Collection;
+
+
+/**
+ * A function that transforms a {@link MessageEnvelope} into a collection of 0 
or more {@link MessageEnvelope}s,
+ * possibly of a different type.
+ * @param <M>  type of the input {@link MessageEnvelope}
+ * @param <OM>  type of the transformed {@link MessageEnvelope}s
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface FlatMapFunction<M extends MessageEnvelope, OM extends 
MessageEnvelope> {
+
+  /**
+   * Transforms the provided {@link MessageEnvelope} into a collection of 0 or 
more {@link MessageEnvelope}s.
+   * @param message  the {@link MessageEnvelope} to be transformed
+   * @return  a collection of 0 or more transformed {@link MessageEnvelope}s
+   */
+  Collection<OM> apply(M message);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
new file mode 100644
index 0000000..8cb1fce
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+/**
+ * A function that joins {@link MessageEnvelope}s from two {@link 
org.apache.samza.operators.MessageStream}s and produces
+ * a joined {@link MessageEnvelope}.
+ * @param <M>  type of the input {@link MessageEnvelope}
+ * @param <JM>  type of the {@link MessageEnvelope} to join with
+ * @param <RM>  type of the joined {@link MessageEnvelope}
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface JoinFunction<M extends MessageEnvelope, JM extends 
MessageEnvelope, RM extends MessageEnvelope> {
+
+  /**
+   * Join the provided {@link MessageEnvelope}s and produces the joined {@link 
MessageEnvelope}.
+   * @param message  the input {@link MessageEnvelope}
+   * @param otherMessage  the {@link MessageEnvelope} to join with
+   * @return  the joined {@link MessageEnvelope}
+   */
+  RM apply(M message, JM otherMessage);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java 
b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
new file mode 100644
index 0000000..04919a7
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+/**
+ * A function that transforms a {@link MessageEnvelope} into another {@link 
MessageEnvelope}, possibly of a different type.
+ * @param <M>  type of the input {@link MessageEnvelope}
+ * @param <OM>  type of the transformed {@link MessageEnvelope}
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface MapFunction<M extends MessageEnvelope, OM extends 
MessageEnvelope> {
+
+  /**
+   * Transforms the provided {@link MessageEnvelope} into another {@link 
MessageEnvelope}
+   * @param message  the {@link MessageEnvelope} to be transformed
+   * @return  the transformed {@link MessageEnvelope}
+   */
+  OM apply(M message);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
new file mode 100644
index 0000000..505da92
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * A function that allows sending a {@link MessageEnvelope} to an output 
system.
+ * @param <M>  type of the input {@link MessageEnvelope}
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface SinkFunction<M extends MessageEnvelope> {
+
+  /**
+   * Allows sending the provided {@link MessageEnvelope} to an output {@link 
org.apache.samza.system.SystemStream} using
+   * the provided {@link MessageCollector}. Also provides access to the {@link 
TaskCoordinator} to request commits
+   * or shut the container down.
+   *
+   * @param message  the {@link MessageEnvelope} to be sent to an output 
{@link org.apache.samza.system.SystemStream}
+   * @param messageCollector  the {@link MessageCollector} to use to send the 
{@link MessageEnvelope}
+   * @param taskCoordinator  the {@link TaskCoordinator} to request commits or 
shutdown
+   */
+  void apply(M message, MessageCollector messageCollector, TaskCoordinator 
taskCoordinator);
+
+}

Reply via email to