http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java 
b/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
deleted file mode 100644
index e557b34..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.operators.api.internal.Trigger;
-import org.apache.samza.operators.api.internal.Operators;
-import org.apache.samza.operators.api.internal.WindowFn;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines a collection of {@link Window} functions. The public 
classes and methods here are intended to be
- * used by the user (i.e. programmers) to create {@link Window} function 
directly.
- *
- */
-public final class Windows {
-
-  /**
-   * private constructor to prevent instantiation
-   */
-  private Windows() {}
-
-  /**
-   * This class defines a session window function class
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of session key in the session window
-   * @param <WV>  the type of output value in each session window
-   */
-  static class SessionWindow<M extends Message, WK, WV> implements Window<M, 
WK, WV, WindowOutput<WK, WV>> {
-
-    /**
-     * Constructor. Made private s.t. it can only be instantiated via the 
static API methods in {@link Windows}
-     *
-     * @param sessionKeyFunction  function to get the session key from the 
input {@link Message}
-     * @param aggregator  function to calculate the output value based on the 
input {@link Message} and current output value
-     */
-    private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, 
WV, WV> aggregator) {
-      this.wndKeyFunction = sessionKeyFunction;
-      this.aggregator = aggregator;
-    }
-
-    /**
-     * function to calculate the window key from input message
-     */
-    private final Function<M, WK> wndKeyFunction;
-
-    /**
-     * function to calculate the output value from the input message and the 
current output value
-     */
-    private final BiFunction<M, WV, WV> aggregator;
-
-    /**
-     * trigger condition that determines when to send out the output value in 
a {@link WindowOutput} message
-     */
-    private Trigger<M, WindowState<WV>> trigger = null;
-
-    //TODO: need to create a set of {@link StoreFunctions} that is default to 
input {@link Message} type for {@link Window}
-    private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = 
null;
-
-    /**
-     * Public API methods start here
-     */
-
-    /**
-     * Public API method to define the watermark trigger for the window 
operator
-     *
-     * @param wndTrigger {@link Trigger} function defines the watermark 
trigger for this {@link SessionWindow}
-     * @return The window operator w/ the defined watermark trigger
-     */
-    @Override
-    public Window<M, WK, WV, WindowOutput<WK, WV>> 
setTriggers(TriggerBuilder<M, WV> wndTrigger) {
-      this.trigger = wndTrigger.build();
-      return this;
-    }
-
-    private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> 
getTransformFunc() {
-      // TODO: actual implementation of the main session window logic, based 
on the wndKeyFunction, aggregator, and triggers;
-      return null;
-    }
-
-    private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> 
getInternalWindowFn() {
-      return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() {
-
-        @Override public BiFunction<M, Entry<WK, WindowState<WV>>, 
WindowOutput<WK, WV>> getTransformFunc() {
-          return SessionWindow.this.getTransformFunc();
-        }
-
-        @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> 
getStoreFuncs() {
-          return SessionWindow.this.storeFunctions;
-        }
-
-        @Override public Trigger<M, WindowState<WV>> getTrigger() {
-          return SessionWindow.this.trigger;
-        }
-      };
-    }
-  }
-
-  static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends 
WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn(
-      Window<M, WK, WV, WM> window) {
-    if (window instanceof SessionWindow) {
-      SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) 
window;
-      return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn();
-    }
-    throw new IllegalArgumentException("Input window type not supported.");
-  }
-
-  /**
-   * Public static API methods start here
-   *
-   */
-
-  /**
-   * The public programming interface class for window function
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of key to the {@link Window}
-   * @param <WV>  the type of output value in the {@link WindowOutput}
-   * @param <WM>  the type of message in the window output stream
-   */
-  public interface Window<M extends Message, WK, WV, WM extends 
WindowOutput<WK, WV>> {
-
-    /**
-     * Set the triggers for this {@link Window}
-     *
-     * @param wndTrigger  trigger conditions set by the programmers
-     * @return  the {@link Window} function w/ the trigger {@code wndTrigger}
-     */
-    Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger);
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} in which the output 
value is simply the collection of input messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of input {@link Message}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK> Window<M, WK, Collection<M>, 
WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> 
sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> { c.add(m); 
return c; });
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} in which the output 
value is a collection of {@code SI} from the input messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param sessionInfoExtractor  function to retrieve session info of type 
{@code SI} from the input message of type {@code M}
-   * @param <M>  type of the input {@link Message}
-   * @param <WK>  type of the session window key
-   * @param <SI>  type of the session information retrieved from each input 
message of type {@code M}
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, 
WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> 
sessionKeyFunction,
-      Function<M, SI> sessionInfoExtractor) {
-    return new SessionWindow<>(sessionKeyFunction,
-        (m, c) -> { c.add(sessionInfoExtractor.apply(m)); return c; } );
-  }
-
-  /**
-   * Static API method to create a {@link SessionWindow} as a counter of input 
messages
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of the input {@link Message}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
-   */
-  public static <M extends Message, WK> Window<M, WK, Integer, 
WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> 
sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
deleted file mode 100644
index ba74618..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This class implements a {@link Message} that encapsulates an {@link 
IncomingMessageEnvelope} from the system
- *
- */
-public class IncomingSystemMessage implements Message<Object, Object>, 
InputSystemMessage<Offset> {
-  /**
-   * Incoming message envelope
-   */
-  private final IncomingMessageEnvelope imsg;
-
-  /**
-   * The receive time of this incoming message
-   */
-  private final long recvTimeNano;
-
-  /**
-   * Ctor to create a {@code IncomingSystemMessage} from {@link 
IncomingMessageEnvelope}
-   *
-   * @param imsg The incoming system message
-   */
-  public IncomingSystemMessage(IncomingMessageEnvelope imsg) {
-    this.imsg = imsg;
-    this.recvTimeNano = System.nanoTime();
-  }
-
-  @Override
-  public Object getMessage() {
-    return this.imsg.getMessage();
-  }
-
-  @Override
-  public Object getKey() {
-    return this.imsg.getKey();
-  }
-
-  @Override
-  public long getTimestamp() {
-    return this.recvTimeNano;
-  }
-
-  @Override
-  public Offset getOffset() {
-    // TODO: need to add offset factory to generate different types of offset. 
This is just a placeholder,
-    // assuming incoming message carries long value as offset (i.e. Kafka case)
-    return new LongOffset(this.imsg.getOffset());
-  }
-
-  @Override
-  public SystemStreamPartition getSystemStreamPartition() {
-    return imsg.getSystemStreamPartition();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
deleted file mode 100644
index c786025..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This interface defines additional methods a message from an system input 
should implement, including the methods to
- * get {@link SystemStreamPartition} and the {@link Offset} of the input 
system message.
- */
-public interface InputSystemMessage<O extends Offset> {
-
-  /**
-   * Get the input message's {@link SystemStreamPartition}
-   *
-   * @return  the {@link SystemStreamPartition} this message is coming from
-   */
-  SystemStreamPartition getSystemStreamPartition();
-
-  /**
-   * Get the offset of the message in the input stream. This should be used to 
uniquely identify a message in an input stream.
-   *
-   * @return The offset of the message in the input stream.
-   */
-  O getOffset();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
deleted file mode 100644
index f059b33..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-/**
- * An implementation of {@link org.apache.samza.operators.api.data.Offset}, w/ 
{@code long} value as the offset
- */
-public class LongOffset implements Offset {
-
-  /**
-   * The offset value in {@code long}
-   */
-  private final Long offset;
-
-  private LongOffset(long offset) {
-    this.offset = offset;
-  }
-
-  public LongOffset(String offset) {
-    this.offset = Long.valueOf(offset);
-  }
-
-  @Override
-  public int compareTo(Offset o) {
-    if (!(o instanceof LongOffset)) {
-      throw new IllegalArgumentException("Not comparable offset classes. 
LongOffset vs " + o.getClass().getName());
-    }
-    LongOffset other = (LongOffset) o;
-    return this.offset.compareTo(other.offset);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof LongOffset)) {
-      return false;
-    }
-    LongOffset o = (LongOffset) other;
-    return this.offset.equals(o.offset);
-  }
-
-  /**
-   * Helper method to get the minimum offset
-   *
-   * @return The minimum offset
-   */
-  public static LongOffset getMinOffset() {
-    return new LongOffset(Long.MIN_VALUE);
-  }
-
-  /**
-   * Helper method to get the maximum offset
-   *
-   * @return The maximum offset
-   */
-  public static LongOffset getMaxOffset() {
-    return new LongOffset(Long.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java 
b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
deleted file mode 100644
index 9b53b45..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-/**
- * This class defines the generic interface of {@link Message}, which is a 
entry in the input/output stream.
- *
- * <p>The {@link Message} models the basic operatible unit in streaming SQL 
processes in Samza.
- *
- */
-public interface Message<K, M> {
-
-  /**
-   * Access method to get the corresponding message body in {@link Message}
-   *
-   * @return Message object in this {@link Message}
-   */
-  M getMessage();
-
-  /**
-   * Method to indicate whether this {@link Message} indicates deletion of a 
message w/ the message key
-   *
-   * @return A boolean value indicates whether the current message is a delete 
or insert message
-   */
-  default boolean isDelete() { return false; };
-
-  /**
-   * Access method to the key of the message
-   *
-   * @return The key of the message
-   */
-  K getKey();
-
-  /**
-   * Get the message creation timestamp of the message.
-   *
-   * @return The message's timestamp in nano seconds.
-   */
-  long getTimestamp();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java 
b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
deleted file mode 100644
index 0fac2c0..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.data;
-
-/**
- * A generic interface extending {@link java.lang.Comparable} to be used as 
{@code Offset} in a stream
- */
-public interface Offset extends Comparable<Offset> {
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
deleted file mode 100644
index e9bfe0b..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines all basic stream operator classes used by internal 
implementation only. All classes defined in
- * this file are immutable.
- *
- * NOTE: Programmers should not use the operators defined in this class 
directly. All {@link Operator} objects
- * should be initiated via {@link MessageStream} API methods
- */
-public class Operators {
-  /**
-   * Private constructor to prevent instantiation of the {@link Operators} 
class
-   */
-  private Operators() {}
-
-  private static String getOperatorId() {
-    // TODO: need to change the IDs to be a consistent, durable IDs that can 
be recovered across container and job restarts
-    return UUID.randomUUID().toString();
-  }
-
-  /**
-   * Private interface for stream operator functions. The interface class 
defines the output of the stream operator function.
-   *
-   */
-  public interface Operator<OM extends Message> {
-    MessageStream<OM> getOutputStream();
-  }
-
-  /**
-   * Linear stream operator function that takes 1 input {@link Message} and 
output a collection of output {@link Message}s.
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <OM>  the type of output {@link Message}
-   */
-  public static class StreamOperator<M extends Message, OM extends Message> 
implements Operator<OM> {
-    /**
-     * The output {@link MessageStream}
-     */
-    private final MessageStream<OM> outputStream;
-
-    /**
-     * The transformation function
-     */
-    private final Function<M, Collection<OM>> txfmFunction;
-
-    /**
-     * Constructor of {@link StreamOperator}. Make it private s.t. it can only 
be created within {@link Operators}.
-     *
-     * @param transformFn  the transformation function to be applied that 
transforms 1 input {@link Message} into a collection
-     *                     of output {@link Message}s
-     */
-    private StreamOperator(Function<M, Collection<OM>> transformFn) {
-      this(transformFn, new MessageStream<>());
-    }
-
-    /**
-     * Constructor of {@link StreamOperator} which allows the user to define 
the output {@link MessageStream}
-     *
-     * @param transformFn  the transformation function
-     * @param outputStream  the output {@link MessageStream}
-     */
-    private StreamOperator(Function<M, Collection<OM>> transformFn, 
MessageStream<OM> outputStream) {
-      this.outputStream = outputStream;
-      this.txfmFunction = transformFn;
-    }
-
-    @Override
-    public MessageStream<OM> getOutputStream() {
-      return this.outputStream;
-    }
-
-    /**
-     * Method to get the transformation function.
-     *
-     * @return the {@code txfmFunction}
-     */
-    public Function<M, Collection<OM>> getFunction() {
-      return this.txfmFunction;
-    }
-
-  }
-
-  /**
-   * A sink operator function that allows customized code to send the output 
to external system. This is the terminal
-   * operator that does not have any output {@link MessageStream} that allows 
further processing in the same {@link org.apache.samza.task.StreamOperatorTask}
-   *
-   * @param <M>  the type of input {@link Message}
-   */
-  public static class SinkOperator<M extends Message> implements Operator {
-
-    /**
-     * The user-defined sink function
-     */
-    private final MessageStream.VoidFunction3<M, MessageCollector, 
TaskCoordinator> sink;
-
-    /**
-     * Default constructor for {@link SinkOperator}. Make it private s.t. it 
can only be created within {@link Operators}.
-     *
-     * @param sink  the user-defined sink function
-     */
-    private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, 
TaskCoordinator> sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public MessageStream getOutputStream() {
-      return null;
-    }
-
-    /**
-     * Method to get the user-defined function implements the {@link 
SinkOperator}
-     *
-     * @return a {@link MessageStream.VoidFunction3} function that allows the 
caller to pass in an input message, {@link MessageCollector}
-     *         and {@link TaskCoordinator} to the sink function
-     */
-    public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> 
getFunction() {
-      return this.sink;
-    }
-  }
-
-  /**
-   * The store functions that are used by {@link WindowOperator} and {@link 
PartialJoinOperator} to store and retrieve
-   * buffered messages and partial aggregation results
-   *
-   * @param <SK>  the type of key used to store the operator states
-   * @param <SS>  the type of operator state. e.g. could be the partial 
aggregation result for a window, or a buffered
-   *             input message from the join stream for a join
-   */
-  public static class StoreFunctions<M extends Message, SK, SS> {
-    /**
-     * Function to define the key to query in the operator state store, 
according to the incoming {@link Message}
-     * This method only supports finding the unique key for the incoming 
message, which supports use case of non-overlapping
-     * windows and unique-key-based join.
-     *
-     * TODO: for windows that overlaps (i.e. sliding windows and hopping 
windows) and non-unique-key-based join, the query
-     * to the state store is usually a range scan. We need to add a 
rangeKeyFinder function to map from a single input
-     * message to a range of keys in the store.
-     */
-    private final Function<M, SK> storeKeyFinder;
-
-    /**
-     * Function to update the store entry based on the current state and the 
incoming {@link Message}
-     *
-     * TODO: this is assuming a 1:1 mapping from the input message to the 
store entry. When implementing sliding/hopping
-     * windows and non-unique-key-based join, we may need to include the 
corresponding state key, in addition to the
-     * state value.
-     */
-    private final BiFunction<M, SS, SS> stateUpdater;
-
-    /**
-     * Constructor of state store functions.
-     *
-     */
-    private StoreFunctions(Function<M, SK> keyFinder,
-        BiFunction<M, SS, SS> stateUpdater) {
-      this.storeKeyFinder = keyFinder;
-      this.stateUpdater = stateUpdater;
-    }
-
-    /**
-     * Method to get the {@code storeKeyFinder} function
-     *
-     * @return  the function to calculate the key from an input {@link Message}
-     */
-    public Function<M, SK> getStoreKeyFinder() {
-      return this.storeKeyFinder;
-    }
-
-    /**
-     * Method to get the {@code stateUpdater} function
-     *
-     * @return  the function to update the corresponding state according to an 
input {@link Message}
-     */
-    public BiFunction<M, SS, SS> getStateUpdater() {
-      return this.stateUpdater;
-    }
-  }
-
-  /**
-   * Defines a window operator function that takes one {@link MessageStream} 
as an input, accumulate the window state, and generate
-   * an output {@link MessageStream} w/ output type {@code WM} which extends 
{@link WindowOutput}
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of key in the output {@link Message} from the 
{@link WindowOperator} function
-   * @param <WS>  the type of window state in the {@link WindowOperator} 
function
-   * @param <WM>  the type of window output {@link Message}
-   */
-  public static class WindowOperator<M extends Message, WK, WS extends 
WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> {
-    /**
-     * The output {@link MessageStream}
-     */
-    private final MessageStream<WM> outputStream;
-
-    /**
-     * The main window transformation function that takes {@link Message}s 
from one input stream, aggregates w/ the window
-     * state(s) from the window state store, and generate output {@link 
Message}s to the output stream.
-     */
-    private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction;
-
-    /**
-     * The state store functions for the {@link WindowOperator}
-     */
-    private final StoreFunctions<M, WK, WS> storeFunctions;
-
-    /**
-     * The window trigger function
-     */
-    private final Trigger<M, WS> trigger;
-
-    /**
-     * The unique ID of stateful operators
-     */
-    private final String opId;
-
-    /**
-     * Constructor for {@link WindowOperator}. Make it private s.t. it can 
only be created within {@link Operators}.
-     *
-     * @param windowFn  description of the window function
-     * @param operatorId  auto-generated unique ID of the operator
-     */
-    private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String 
operatorId) {
-      this.outputStream = new MessageStream<>();
-      this.txfmFunction = windowFn.getTransformFunc();
-      this.storeFunctions = windowFn.getStoreFuncs();
-      this.trigger = windowFn.getTrigger();
-      this.opId = operatorId;
-    }
-
-    @Override
-    public String toString() {
-      return this.opId;
-    }
-
-    @Override
-    public MessageStream<WM> getOutputStream() {
-      return this.outputStream;
-    }
-
-    /**
-     * Method to get the window's {@link StoreFunctions}.
-     *
-     * @return  the window operator's {@code storeFunctions}
-     */
-    public StoreFunctions<M, WK, WS> getStoreFunctions() {
-      return this.storeFunctions;
-    }
-
-    /**
-     * Method to get the window operator's main function
-     *
-     * @return   the window operator's {@code txfmFunction}
-     */
-    public BiFunction<M, Entry<WK, WS>, WM> getFunction() {
-      return this.txfmFunction;
-    }
-
-    /**
-     * Method to get the trigger functions
-     *
-     * @return  the {@link Trigger} for this {@link WindowOperator}
-     */
-    public Trigger<M, WS> getTrigger() {
-      return this.trigger;
-    }
-
-    /**
-     * Method to generate the window operator's state store name
-     *
-     * @param inputStream the input {@link MessageStream} to this state store
-     * @return   the persistent store name of the window operator
-     */
-    public String getStoreName(MessageStream<M> inputStream) {
-      //TODO: need to get the persistent name of ds and the operator in a 
serialized form
-      return String.format("input-%s-wndop-%s", inputStream.toString(), 
this.toString());
-    }
-  }
-
-  /**
-   * The partial join operator that takes {@link Message}s from one input 
stream and join w/ buffered {@link Message}s from
-   * another stream and generate join output to {@code output}
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <K>  the type of join key
-   * @param <JM>  the type of message of {@link Message} in the other join 
stream
-   * @param <RM>  the type of message of {@link Message} in the join output 
stream
-   */
-  public static class PartialJoinOperator<M extends Message<K, ?>, K, JM 
extends Message<K, ?>, RM extends Message> implements Operator<RM> {
-
-    private final MessageStream<RM> joinOutput;
-
-    /**
-     * The main transformation function of {@link PartialJoinOperator} that 
takes a type {@code M} input message,
-     * join w/ a stream of buffered {@link Message}s from another stream w/ 
type {@code JM}, and generate joined type {@code RM}.
-     */
-    private final BiFunction<M, JM, RM> txfmFunction;
-
-    /**
-     * The message store functions that read the buffered messages from the 
other stream in the join
-     */
-    private final StoreFunctions<JM, K, JM> joinStoreFunctions;
-
-    /**
-     * The message store functions that save the buffered messages of this 
{@link MessageStream} in the join
-     */
-    private final StoreFunctions<M, K, M> selfStoreFunctions;
-
-    /**
-     * The unique ID for the stateful operator
-     */
-    private final String opId;
-
-    /**
-     * Default constructor to create a {@link PartialJoinOperator} object
-     *
-     * @param partialJoin  partial join function that take type {@code M} of 
input {@link Message} and join w/ type
-     *                     {@code JM} of buffered {@link Message} from another 
stream
-     * @param joinOutput  the output {@link MessageStream} of the join results
-     */
-    private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, 
MessageStream<RM> joinOutput, String opId) {
-      this.joinOutput = joinOutput;
-      this.txfmFunction = partialJoin;
-      // Read-only join store, no creator/updater functions specified
-      this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null);
-      // Buffered message store for this input stream
-      this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) 
-> m);
-      this.opId = opId;
-    }
-
-    @Override
-    public String toString() {
-      return this.opId;
-    }
-
-    @Override
-    public MessageStream<RM> getOutputStream() {
-      return this.joinOutput;
-    }
-
-    /**
-     * Method to get {@code joinStoreFunctions}
-     *
-     * @return  {@code joinStoreFunctions}
-     */
-    public StoreFunctions<JM, K, JM> getJoinStoreFunctions() {
-      return this.joinStoreFunctions;
-    }
-
-    /**
-     * Method to get {@code selfStoreFunctions}
-     *
-     * @return  {@code selfStoreFunctions}
-     */
-    public StoreFunctions<M, K, M> getSelfStoreFunctions() {
-      return this.selfStoreFunctions;
-    }
-
-    /**
-     * Method to get {@code txfmFunction}
-     *
-     * @return  {@code txfmFunction}
-     */
-    public BiFunction<M, JM, RM> getFunction() {
-      return this.txfmFunction;
-    }
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create 
{@link StreamOperator}
-   *
-   * @param transformFn  the corresponding transformation function
-   * @param <M>  type of input {@link Message}
-   * @param <OM>  type of output {@link Message}
-   * @return  the {@link StreamOperator}
-   */
-  public static <M extends Message, OM extends Message> StreamOperator<M, OM> 
getStreamOperator(Function<M, Collection<OM>> transformFn) {
-    return new StreamOperator<>(transformFn);
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create 
{@link SinkOperator}
-   *
-   * @param sinkFn  the sink function
-   * @param <M>  type of input {@link Message}
-   * @return   the {@link SinkOperator}
-   */
-  public static <M extends Message> SinkOperator<M> 
getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, 
TaskCoordinator> sinkFn) {
-    return new SinkOperator<>(sinkFn);
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create 
{@link WindowOperator}
-   *
-   * @param windowFn  the {@link WindowFn} function
-   * @param <M>  type of input {@link Message}
-   * @param <WK>  type of window key
-   * @param <WS>  type of {@link WindowState}
-   * @param <WM>  type of output {@link WindowOutput}
-   * @return  the {@link WindowOperator}
-   */
-  public static <M extends Message, WK, WS extends WindowState, WM extends 
WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator(
-    WindowFn<M, WK, WS, WM> windowFn) {
-    return new WindowOperator<>(windowFn, Operators.getOperatorId());
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create 
{@link WindowOperator}
-   *
-   * @param joiner  the {@link WindowFn} function
-   * @param joinOutput  the output {@link MessageStream}
-   * @param <M>  type of input {@link Message}
-   * @param <K>  type of join key
-   * @param <JM>  the type of message in the {@link Message} from the other 
join stream
-   * @param <RM>  the type of message in the {@link Message} from the join 
function
-   * @return  the {@link PartialJoinOperator}
-   */
-  public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM 
extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator(
-    BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) {
-    return new PartialJoinOperator<>(joiner, joinOutput, 
Operators.getOperatorId());
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create 
{@link StreamOperator} as a merger function
-   *
-   * @param mergeOutput  the common output {@link MessageStream} from the 
merger
-   * @param <M>  the type of input {@link Message}
-   * @return  the {@link StreamOperator} for merge
-   */
-  public static <M extends Message> StreamOperator<M, M> 
getMergeOperator(MessageStream<M> mergeOutput) {
-    return new StreamOperator<M, M>(t ->
-      new ArrayList<M>() {{
-        this.add(t);
-      }},
-      mergeOutput);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
deleted file mode 100644
index 33a0134..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-/**
- * Defines the trigger functions for {@link Operators.WindowOperator}. This 
class is immutable.
- *
- * @param <M>  the type of message from the input stream
- * @param <S>  the type of state variable in the window's state store
- */
-public class Trigger<M extends Message, S extends WindowState> {
-
-  /**
-   * System timer based trigger condition. This is the only guarantee that the 
{@link Operators.WindowOperator} will proceed forward
-   */
-  private final Function<S, Boolean> timerTrigger;
-
-  /**
-   * early trigger condition that determines when to send the first output 
from the {@link Operators.WindowOperator}
-   */
-  private final BiFunction<M, S, Boolean> earlyTrigger;
-
-  /**
-   * late trigger condition that determines when to send the updated output 
after the first one from a {@link Operators.WindowOperator}
-   */
-  private final BiFunction<M, S, Boolean> lateTrigger;
-
-  /**
-   * the function to updated the window state when the first output is 
triggered
-   */
-  private final Function<S, S> earlyTriggerUpdater;
-
-  /**
-   * the function to updated the window state when the late output is triggered
-   */
-  private final Function<S, S> lateTriggerUpdater;
-
-  /**
-   * Private constructor to prevent instantiation
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger   late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater   late trigger state updater
-   */
-  private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> 
earlyTrigger, BiFunction<M, S, Boolean> lateTrigger,
-      Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) {
-    this.timerTrigger = timerTrigger;
-    this.earlyTrigger = earlyTrigger;
-    this.lateTrigger = lateTrigger;
-    this.earlyTriggerUpdater = earlyTriggerUpdater;
-    this.lateTriggerUpdater = lateTriggerUpdater;
-  }
-
-  /**
-   * Static method to create a {@link Trigger} object
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger  late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater  late trigger state updater
-   * @param <M>  the type of input {@link Message}
-   * @param <S>  the type of window state extends {@link WindowState}
-   * @return  the {@link Trigger} function
-   */
-  public static <M extends Message, S extends WindowState> Trigger<M, S> 
createTrigger(Function<S, Boolean> timerTrigger,
-      BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> 
lateTrigger, Function<S, S> earlyTriggerUpdater,
-      Function<S, S> lateTriggerUpdater) {
-    return new Trigger(timerTrigger, earlyTrigger, lateTrigger, 
earlyTriggerUpdater, lateTriggerUpdater);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
deleted file mode 100644
index 1fd88e7..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.function.BiFunction;
-
-
-/**
- * Defines an internal representation of a window function. This class SHOULD 
NOT be used by the programmer directly. It is used
- * by the internal representation and implementation classes in operators.
- *
- * @param <M> type of input stream {@link Message} for window
- * @param <WK>  type of window key in the output {@link Message}
- * @param <WS>  type of {@link WindowState} variable in the state store
- * @param <WM>  type of the message in the output stream
- */
-public interface WindowFn<M extends Message, WK, WS extends WindowState, WM 
extends WindowOutput<WK, ?>> {
-
-  /**
-   * get the transformation function of the {@link WindowFn}
-   *
-   * @return  the transformation function takes type {@code M} message and the 
window state entry, then transform to an {@link WindowOutput}
-   */
-  BiFunction<M, Entry<WK, WS>, WM> getTransformFunc();
-
-  /**
-   * get the state store functions for this {@link WindowFn}
-   *
-   * @return  the collection of state store methods
-   */
-  Operators.StoreFunctions<M, WK, WS> getStoreFuncs();
-
-  /**
-   * get the trigger conditions for this {@link WindowFn}
-   *
-   * @return  the trigger condition for the {@link WindowFn} function
-   */
-  Trigger<M, WS> getTrigger();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
deleted file mode 100644
index e202c20..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-/**
- * This class defines the specific type of output messages from a {@link 
Operators.WindowOperator} function
- *
- * @param <K>  the type of key in the output window result
- * @param <M>  the type of value in the output window result
- */
-public final class WindowOutput<K, M> implements Message<K, M> {
-  private final K key;
-  private final M value;
-
-  WindowOutput(K key, M aggregated) {
-    this.key = key;
-    this.value = aggregated;
-  }
-
-  @Override public M getMessage() {
-    return this.value;
-  }
-
-  @Override public K getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return 0;
-  }
-
-  static public <K, M> WindowOutput<K, M> of(K key, M result) {
-    return new WindowOutput<>(key, result);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
index 59de16b..82f3c28 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
@@ -19,9 +19,9 @@
 
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.Operator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.Operator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
index f16cbc6..d3d8f8b 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
@@ -19,10 +19,10 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.commons.collections.keyvalue.AbstractMapEntry;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.*;
-import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.*;
+import org.apache.samza.operators.internal.WindowOutput;
 import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
 import org.apache.samza.operators.impl.window.SessionWindowImpl;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 3ca8bde..f55c758 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.MessageStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -90,7 +90,7 @@ public abstract class OperatorImpl<M extends Message, RM 
extends Message>
   protected void init(MessageStream<M> source, TaskContext context) {};
 
   /**
-   * Method to trigger all downstream operators that consumes the output 
{@link org.apache.samza.operators.api.MessageStream}
+   * Method to trigger all downstream operators that consumes the output 
{@link MessageStream}
    * from this operator
    *
    * @param omsg  output {@link Message}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
index 5a375bc..cc7ef2b 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.data.Message;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
index b29d9c8..b0f4f27 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 5d25cfa..a8a639e 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.internal.Operators.SinkOperator;
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.data.Message;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
index f573fd0..7840b5b 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
index bbe08a4..f4a6a58 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
@@ -18,19 +18,20 @@
  */
 package org.apache.samza.operators.impl.join;
 
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators;
+import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
 import org.apache.samza.operators.impl.OperatorImpl;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * Implementation of a {@link 
org.apache.samza.operators.api.internal.Operators.PartialJoinOperator}. This 
class implements function
+ * Implementation of a {@link Operators.PartialJoinOperator}. This class 
implements function
  * that only takes in one input stream among all inputs to the join and 
generate the join output.
  *
- * @param <M>  Type of input stream {@link 
org.apache.samza.operators.api.data.Message}
- * @param <RM>  Type of join output stream {@link 
org.apache.samza.operators.api.data.Message}
+ * @param <M>  Type of input stream {@link Message}
+ * @param <RM>  Type of join output stream {@link Message}
  */
 public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends 
Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
index 59e2dec..0d6141e 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
@@ -18,11 +18,11 @@
  */
 package org.apache.samza.operators.impl.window;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.WindowOperator;
-import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
+import org.apache.samza.operators.internal.WindowOutput;
 import org.apache.samza.operators.impl.OperatorImpl;
 import org.apache.samza.operators.impl.StateStoreImpl;
 import org.apache.samza.storage.kv.Entry;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
 
b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
index e340fe8..18b077b 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
@@ -19,11 +19,12 @@
 package org.apache.samza.task;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.MessageStreams;
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.data.IncomingSystemMessage;
 import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 
@@ -45,7 +46,7 @@ public final class StreamOperatorAdaptorTask implements 
StreamTask, InitableTask
   /**
    * Wrapped {@link StreamOperatorTask} class
    */
-  private final StreamOperatorTask  userTask;
+  private final StreamOperatorTask userTask;
 
   /**
    * Constructor that wraps the user-defined {@link StreamOperatorTask}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
deleted file mode 100644
index cfdb694..0000000
--- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import java.util.Collection;
-
-/**
- * This interface defines the methods that user needs to implement via the 
operator programming APIs.
- */
-public interface StreamOperatorTask {
-
-  /**
-   * Defines the method for users to initialize the operator chains consuming 
from all {@link SystemMessageStream}s.
-   * Users have to implement this function to instantiate {@link 
org.apache.samza.operators.impl.ChainedOperators} that
-   * will process each incoming {@link SystemMessageStream}.
-   *
-   * Note that each {@link SystemMessageStream} corresponds to an input {@link 
org.apache.samza.system.SystemStreamPartition}
-   *
-   * @param sources  the collection of {@link SystemMessageStream}s that takes 
{@link org.apache.samza.operators.api.data.IncomingSystemMessage}
-   *                 from a {@link 
org.apache.samza.system.SystemStreamPartition}
-   */
-  void initOperators(Collection<SystemMessageStream> sources);
-
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
deleted file mode 100644
index 0f00fdb..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-public class TestMessage implements Message<String, String> {
-
-  private final String key;
-  private final String value;
-  private final long timestamp;
-
-  TestMessage(String key, String value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public String getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
deleted file mode 100644
index 9f9ad6b..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.internal.Operators.*;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestMessageStream {
-
-  @Test public void testMap() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Function<TestMessage, TestOutputMessage> xMap = m -> new 
TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 
2);
-    MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestOutputMessage> mapOp = subs.iterator().next();
-    assertTrue(mapOp instanceof StreamOperator);
-    assertEquals(mapOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    TestMessage xTestMsg = mock(TestMessage.class);
-    when(xTestMsg.getKey()).thenReturn("test-msg-key");
-    when(xTestMsg.getMessage()).thenReturn("123456789");
-    when(xTestMsg.getTimestamp()).thenReturn(12345L);
-    Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, 
TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
-    assertEquals(cOutputMsg.size(), 1);
-    TestOutputMessage outputMessage = cOutputMsg.iterator().next();
-    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
-    assertEquals(outputMessage.getMessage(), 
Integer.valueOf(xTestMsg.getMessage().length() + 1));
-    assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
-  }
-
-  @Test public void testFlatMap() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() {{
-      this.add(mock(TestOutputMessage.class));
-      this.add(mock(TestOutputMessage.class));
-      this.add(mock(TestOutputMessage.class));
-    }};
-    Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> 
flatOuts;
-    MessageStream<TestOutputMessage> outputStream = 
inputStream.flatMap(xFlatMap);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperator);
-    assertEquals(flatMapOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) 
flatMapOp).getFunction(), xFlatMap);
-  }
-
-  @Test public void testFilter() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
-    MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> filterOp = subs.iterator().next();
-    assertTrue(filterOp instanceof StreamOperator);
-    assertEquals(filterOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    Function<TestMessage, Collection<TestMessage>> txfmFn = 
((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
-    TestMessage mockMsg = mock(TestMessage.class);
-    when(mockMsg.getTimestamp()).thenReturn(11111L);
-    Collection<TestMessage> output = txfmFn.apply(mockMsg);
-    assertTrue(output.isEmpty());
-    when(mockMsg.getTimestamp()).thenReturn(999999L);
-    output = txfmFn.apply(mockMsg);
-    assertEquals(output.size(), 1);
-    assertEquals(output.iterator().next(), mockMsg);
-  }
-
-  @Test public void testSink() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, 
TaskCoordinator> xSink = (m, mc, tc) -> {
-      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", 
"test-stream"), m.getMessage()));
-      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
-    };
-    inputStream.sink(xSink);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> sinkOp = subs.iterator().next();
-    assertTrue(sinkOp instanceof SinkOperator);
-    assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
-    assertNull(((SinkOperator) sinkOp).getOutputStream());
-  }
-
-  @Test public void testWindow() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Windows.SessionWindow<TestMessage, String, Integer> window = 
mock(Windows.SessionWindow.class);
-    MessageStream<WindowOutput<String, Integer>> outStream = 
inputStream.window(window);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> wndOp = subs.iterator().next();
-    assertTrue(wndOp instanceof WindowOperator);
-    assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
-  }
-
-  @Test public void testJoin() {
-    MessageStream<TestMessage> source1 = new MessageStream<>();
-    MessageStream<TestMessage> source2 = new MessageStream<>();
-    BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) 
-> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + 
m2.getMessage().length(), m1.getTimestamp());
-    MessageStream<TestOutputMessage> joinOutput = source1.join(source2, 
joiner);
-    Collection<Operator> subs = source1.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> joinOp1 = subs.iterator().next();
-    assertTrue(joinOp1 instanceof PartialJoinOperator);
-    assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), 
joinOutput);
-    subs = source2.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> joinOp2 = subs.iterator().next();
-    assertTrue(joinOp2 instanceof PartialJoinOperator);
-    assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), 
joinOutput);
-    TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 
11111L);
-    TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 
22222L);
-    TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) 
joinOp1).getFunction().apply(joinMsg1, joinMsg2);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    assertEquals(xOut.getTimestamp(), 11111L);
-    xOut = (TestOutputMessage) ((PartialJoinOperator) 
joinOp2).getFunction().apply(joinMsg2, joinMsg1);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    assertEquals(xOut.getTimestamp(), 11111L);
-  }
-
-  @Test public void testMerge() {
-    MessageStream<TestMessage> merge1 = new MessageStream<>();
-    Collection<MessageStream<TestMessage>> others = new 
ArrayList<MessageStream<TestMessage>>(){{
-      this.add(new MessageStream<>());
-      this.add(new MessageStream<>());
-    }};
-    MessageStream<TestMessage> mergeOutput = merge1.merge(others);
-    validateMergeOperator(merge1, mergeOutput);
-
-    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
-  }
-
-  private void validateMergeOperator(MessageStream<TestMessage> mergeSource, 
MessageStream<TestMessage> mergeOutput) {
-    Collection<Operator> subs = mergeSource.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> mergeOp = subs.iterator().next();
-    assertTrue(mergeOp instanceof StreamOperator);
-    assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
-    TestMessage mockMsg = mock(TestMessage.class);
-    Collection<TestMessage> outputs = ((StreamOperator<TestMessage, 
TestMessage>) mergeOp).getFunction().apply(mockMsg);
-    assertEquals(outputs.size(), 1);
-    assertEquals(outputs.iterator().next(), mockMsg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
deleted file mode 100644
index e6aa692..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.Partition;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestMessageStreams {
-
-  @Test public void testInput() {
-    SystemStreamPartition ssp = new SystemStreamPartition("my-system", 
"my-stream", new Partition(0));
-    MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp);
-    assertEquals(mSysStream.getSystemStreamPartition(), ssp);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
deleted file mode 100644
index 225e77f..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-public class TestOutputMessage implements Message<String, Integer> {
-  private final String key;
-  private final Integer value;
-  private final long timestamp;
-
-  public TestOutputMessage(String key, Integer value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public Integer getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}
-

Reply via email to