SAMZA-1056: Added wiring for High Level API state stores, their serdes and 
changelogs.

Provided join operator access to durable state stores.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jagadish Venkatraman <[email protected]>

Closes #309 from prateekm/operator-store-wiring


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a671288e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a671288e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a671288e

Branch: refs/heads/master
Commit: a671288e18dea3336fe1c625233bb7427ed8b5b2
Parents: ad80cf9
Author: Prateek Maheshwari <[email protected]>
Authored: Wed Oct 4 15:37:50 2017 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Oct 4 15:37:50 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../apache/samza/operators/MessageStream.java   |  13 ++-
 .../operators/functions/WatermarkFunction.java  |  31 +++---
 .../apache/samza/container/TaskContextImpl.java |   6 +-
 .../coordinator/DistributedLockWithState.java   |   4 +-
 .../org/apache/samza/execution/JobNode.java     |  63 ++++++++---
 .../samza/operators/MessageStreamImpl.java      |  16 +--
 .../functions/PartialJoinFunction.java          |  26 +----
 .../samza/operators/impl/OperatorImplGraph.java |  38 +++++--
 .../operators/impl/PartialJoinOperatorImpl.java |  72 ++++--------
 .../operators/impl/store/TimeSeriesStore.java   |   1 +
 .../operators/impl/store/TimestampedValue.java  |  16 +--
 .../impl/store/TimestampedValueSerde.java       |  55 +++++++++
 .../samza/operators/spec/JoinOperatorSpec.java  |  70 +++++++++---
 .../samza/operators/spec/OperatorSpecs.java     |  17 ++-
 .../operators/spec/StatefulOperatorSpec.java    |  37 +++++++
 .../samza/operators/spec/StoreDescriptor.java   |  81 ++++++++++++++
 .../samza/runtime/LocalApplicationRunner.java   |   2 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |   5 +-
 .../apache/samza/container/SamzaContainer.scala |   8 +-
 .../samza/example/OrderShipmentJoinExample.java |   4 +-
 .../samza/execution/TestExecutionPlanner.java   |  42 ++++---
 .../execution/TestJobGraphJsonGenerator.java    |  19 +++-
 .../org/apache/samza/execution/TestJobNode.java | 111 ++++++++++++++-----
 .../samza/operators/TestJoinOperator.java       |  12 +-
 .../samza/operators/TestMessageStreamImpl.java  |   3 +-
 .../operators/impl/TestOperatorImplGraph.java   |  31 ++++--
 .../impl/store/TestTimeSeriesStoreImpl.java     |  54 ++++-----
 .../impl/store/TestTimestampedValueSerde.java   |  52 +++++++++
 .../apache/samza/test/operator/PageView.java    |  50 ---------
 .../test/operator/RepartitionJoinWindowApp.java |  92 +++++++++++++++
 .../test/operator/RepartitionWindowApp.java     |  55 ---------
 .../samza/test/operator/SessionWindowApp.java   |   1 +
 ...StreamApplicationIntegrationTestHarness.java |  12 +-
 .../operator/TestRepartitionJoinWindowApp.java  |  81 ++++++++++++++
 .../test/operator/TestRepartitionWindowApp.java |  70 ------------
 .../samza/test/operator/TumblingWindowApp.java  |   1 +
 .../samza/test/operator/data/AdClick.java       |  41 +++++++
 .../samza/test/operator/data/PageView.java      |  50 +++++++++
 .../test/operator/data/UserPageAdClick.java     |  62 +++++++++++
 40 files changed, 974 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 7cbffe7..c2e9180 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,6 +22,7 @@ docs/_site
 build
 **/bin
 samza-test/state
+state/
 docs/learn/documentation/*/api/javadocs
 docs/learn/documentation/*/rest/javadocs
 .DS_Store

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 2a1045d..c36fe1f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -128,13 +129,17 @@ public interface MessageStream<M> {
    * @param otherStream the other {@link MessageStream} to be joined with
    * @param joinFn the function to join messages from this and the other 
{@link MessageStream}
    * @param ttl the ttl for messages in each stream
+   * @param keySerde the serde for the join key
+   * @param messageSerde the serde for messages in this stream
+   * @param otherMessageSerde the serde for messages in the other stream
    * @param <K> the type of join key
-   * @param <JM> the type of messages in the other stream
-   * @param <OM> the type of messages resulting from the {@code joinFn}
+   * @param <OM> the type of messages in the other stream
+   * @param <JM> the type of messages resulting from the {@code joinFn}
    * @return the joined {@link MessageStream}
    */
-  <K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream,
-      JoinFunction<? extends K, ? super M, ? super JM, ? extends OM> joinFn, 
Duration ttl);
+  <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+      JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, 
Duration ttl);
 
   /**
    * Merges all {@code otherStreams} with this {@link MessageStream}.

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
index 9c4b9bf..3be293e 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
@@ -20,14 +20,14 @@
 package org.apache.samza.operators.functions;
 
 /**
- * Allows user-specific handling of Watermark
+ * Allows handling of watermarks.
  */
 public interface WatermarkFunction {
 
   /**
    * Processes the input watermark coming from upstream operators.
-   * This allows user-defined watermark handling, such as trigger events
-   * or propagate it to downstream.
+   * This allows custom watermark handling, such as triggering events or 
propagating it downstream.
+   *
    * @param watermark input watermark
    */
   void processWatermark(long watermark);
@@ -35,24 +35,19 @@ public interface WatermarkFunction {
   /**
    * Returns the output watermark. This function will be invoked immediately 
after either
    * of the following events:
-   *
    * <ol>
-   *
-   * <li> Return of the transform function, e.g. {@link FlatMapFunction}.
-   *
-   * <li> Return of the processWatermark function.
-   *
+   *  <li> Return from the transform function, e.g. {@link FlatMapFunction}.
+   *  <li> Return from the {@link #processWatermark} function.
    * </ol>
+
+   * Note: If the transform function returns a collection of messages, the 
output watermark
+   * will be emitted after the output collection has been propagated to 
downstream operators.
+   * This might delay the watermark propagation, which will cause more 
buffering and might
+   * have a performance impact.
    *
-   *
-   *
-   * Note: If the transform function returns a collection of output, the 
output watermark
-   * will be emitted after the output collection is propagated to downstream 
operators. So
-   * it might delay the watermark propagation. The delay will cause more 
buffering and might
-   * have performance impact.
-   *
-   * @return output watermark, or null if the output watermark should not be 
updated. Samza
-   * guarantees that the same watermark value will be only emitted once.
+   * @return output watermark, or null if the output watermark should not be 
updated.
+   *         Samza guarantees that the same watermark value will only be 
emitted once.
    */
   Long getOutputWatermark();
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index 7990d2b..aa622a3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -23,8 +23,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
-import org.apache.samza.storage.StorageEngine;
 import org.apache.samza.storage.TaskStorageManager;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.TaskContext;
@@ -79,9 +79,9 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
-  public StorageEngine getStore(String storeName) {
+  public KeyValueStore getStore(String storeName) {
     if (storageManager != null) {
-      return storageManager.apply(storeName);
+      return (KeyValueStore) storageManager.apply(storeName);
     } else {
       LOG.warn("No store found for name: {}", storeName);
       return null;

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
index 0de7813..c8e9033 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
@@ -26,12 +26,12 @@ import java.util.concurrent.TimeoutException;
 public interface DistributedLockWithState {
 
   /**
-   * Trie to acquire the lock, but first check if the state flag is set. If it 
is set, return false.
+   * Try to acquire the lock, but first check if the state flag is set. If it 
is set, return false.
    * If the flag is not set, and lock is acquired - return true.
-   * Throw TimeOutException if could not acquire the lock.
    * @param timeout Duration of lock acquiring timeout.
    * @param unit Time Unit of the timeout defined above.
    * @return true if lock is acquired successfully, false if state is already 
set.
+   * @throws TimeoutException if could not acquire the lock.
    */
   boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 0368829..2e89292 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -40,6 +41,7 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StatefulOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.serializers.Serde;
@@ -129,6 +131,14 @@ public class JobNode {
       }
     }
 
+    streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+        if (opSpec instanceof StatefulOperatorSpec) {
+          ((StatefulOperatorSpec) opSpec).getStoreDescriptors()
+              .forEach(sd -> configs.putAll(sd.getStorageConfigs()));
+          // store key and message serdes are configured separately in 
#addSerdeConfigs
+        }
+      });
+
     configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
 
     // write input/output streams to configs
@@ -168,32 +178,42 @@ public class JobNode {
    *
    * @param configs the configs to add serialized serde instances and stream 
serde configs to
    */
-  protected void addSerdeConfigs(Map<String, String> configs) {
+  void addSerdeConfigs(Map<String, String> configs) {
     // collect all key and msg serde instances for streams
-    Map<String, Serde> keySerdes = new HashMap<>();
-    Map<String, Serde> msgSerdes = new HashMap<>();
+    Map<String, Serde> streamKeySerdes = new HashMap<>();
+    Map<String, Serde> streamMsgSerdes = new HashMap<>();
     Map<StreamSpec, InputOperatorSpec> inputOperators = 
streamGraph.getInputOperators();
     inEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         InputOperatorSpec inputOperatorSpec = 
inputOperators.get(edge.getStreamSpec());
-        Serde keySerde = inputOperatorSpec.getKeySerde();
-        Serde valueSerde = inputOperatorSpec.getValueSerde();
-        keySerdes.put(streamId, keySerde);
-        msgSerdes.put(streamId, valueSerde);
+        streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
+        streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
       });
     Map<StreamSpec, OutputStreamImpl> outputStreams = 
streamGraph.getOutputStreams();
     outEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         OutputStreamImpl outputStream = 
outputStreams.get(edge.getStreamSpec());
-        Serde keySerde = outputStream.getKeySerde();
-        Serde valueSerde = outputStream.getValueSerde();
-        keySerdes.put(streamId, keySerde);
-        msgSerdes.put(streamId, valueSerde);
+        streamKeySerdes.put(streamId, outputStream.getKeySerde());
+        streamMsgSerdes.put(streamId, outputStream.getValueSerde());
+      });
+
+    // collect all key and msg serde instances for stores
+    Map<String, Serde> storeKeySerdes = new HashMap<>();
+    Map<String, Serde> storeMsgSerdes = new HashMap<>();
+    streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+        if (opSpec instanceof StatefulOperatorSpec) {
+          ((StatefulOperatorSpec) 
opSpec).getStoreDescriptors().forEach(storeDescriptor -> {
+              storeKeySerdes.put(storeDescriptor.getStoreName(), 
storeDescriptor.getKeySerde());
+              storeMsgSerdes.put(storeDescriptor.getStoreName(), 
storeDescriptor.getMsgSerde());
+            });
+        }
       });
 
-    // for each unique serde instance, generate a unique name and serialize to 
config
-    HashSet<Serde> serdes = new HashSet<>(keySerdes.values());
-    serdes.addAll(msgSerdes.values());
+    // for each unique stream or store serde instance, generate a unique name 
and serialize to config
+    HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
+    serdes.addAll(streamMsgSerdes.values());
+    serdes.addAll(storeKeySerdes.values());
+    serdes.addAll(storeMsgSerdes.values());
     SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
     Base64.Encoder base64Encoder = Base64.getEncoder();
     Map<Serde, String> serdeUUIDs = new HashMap<>();
@@ -205,17 +225,28 @@ public class JobNode {
       });
 
     // set key and msg serdes for streams to the serde names generated above
-    keySerdes.forEach((streamId, serde) -> {
+    streamKeySerdes.forEach((streamId, serde) -> {
         String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamId);
         String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
         configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
       });
 
-    msgSerdes.forEach((streamId, serde) -> {
+    streamMsgSerdes.forEach((streamId, serde) -> {
         String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), 
streamId);
         String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
         configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
       });
+
+    // set key and msg serdes for stores to the serde names generated above
+    storeKeySerdes.forEach((storeName, serde) -> {
+        String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), 
storeName);
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    storeMsgSerdes.forEach((storeName, serde) -> {
+        String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), 
storeName);
+        configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
+      });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 7b93a9e..8460ada 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -37,6 +37,7 @@ import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -112,15 +113,16 @@ public class MessageStreamImpl<M> implements 
MessageStream<M> {
   }
 
   @Override
-  public <K, JM, TM> MessageStream<TM> join(MessageStream<JM> otherStream,
-      JoinFunction<? extends K, ? super M, ? super JM, ? extends TM> joinFn, 
Duration ttl) {
-    OperatorSpec<?, JM> otherOpSpec = ((MessageStreamImpl<JM>) 
otherStream).getOperatorSpec();
-    JoinOperatorSpec<K, M, JM, TM> joinOpSpec =
-        OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec,
-            (JoinFunction<K, M, JM, TM>) joinFn, ttl.toMillis(), 
this.graph.getNextOpId());
+  public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+      JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, 
Duration ttl) {
+    OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) 
otherStream).getOperatorSpec();
+    JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
+        OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, 
(JoinFunction<K, M, OM, JM>) joinFn,
+            keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), 
this.graph.getNextOpId());
 
     this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
-    otherOpSpec.registerNextOperatorSpec((OperatorSpec<JM, ?>) joinOpSpec);
+    otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
 
     return new MessageStreamImpl<>(this.graph, joinOpSpec);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
 
b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
index 9b7956a..5ede5e8 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -18,21 +18,22 @@
  */
 package org.apache.samza.operators.functions;
 
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.storage.kv.KeyValueStore;
 
 /**
  * An internal function that maintains state and join logic for one side of a 
two-way join.
  */
-public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, 
ClosableFunction {
+public interface PartialJoinFunction<K, M, OM, JM> extends InitableFunction, 
ClosableFunction {
 
   /**
    * Joins a message in this stream with a message from another stream.
    *
    * @param m  message from this input stream
-   * @param jm  message from the other input stream
+   * @param om  message from the other input stream
    * @return  the joined message in the output stream
    */
-  RM apply(M m, JM jm);
+  JM apply(M m, OM om);
 
   /**
    * Gets the key for the input message.
@@ -47,23 +48,6 @@ public interface PartialJoinFunction<K, M, JM, RM> extends 
InitableFunction, Clo
    *
    * @return the key value store containing the state for this stream
    */
-  KeyValueStore<K, PartialJoinMessage<M>> getState();
+  KeyValueStore<K, TimestampedValue<M>> getState();
 
-  class PartialJoinMessage<M> {
-    private final M message;
-    private final long receivedTimeMs;
-
-    public PartialJoinMessage(M message, long receivedTimeMs) {
-      this.message = message;
-      this.receivedTimeMs = receivedTimeMs;
-    }
-
-    public M getMessage() {
-      return message;
-    }
-
-    public long getReceivedTimeMs() {
-      return receivedTimeMs;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 1f86975..808ddbf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -37,7 +37,7 @@ import 
org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
@@ -219,16 +219,17 @@ public class OperatorImplGraph {
 
   private KV<PartialJoinFunction, PartialJoinFunction> 
getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
     return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
-        joinOpId -> KV.of(createLeftJoinFn(joinOpSpec.getJoinFn()), 
createRightJoinFn(joinOpSpec.getJoinFn())));
+        joinOpId -> KV.of(createLeftJoinFn(joinOpSpec), 
createRightJoinFn(joinOpSpec)));
   }
 
-  private PartialJoinFunction<Object, Object, Object, Object> 
createLeftJoinFn(JoinFunction joinFn) {
+  private PartialJoinFunction<Object, Object, Object, Object> 
createLeftJoinFn(JoinOperatorSpec joinOpSpec) {
     return new PartialJoinFunction<Object, Object, Object, Object>() {
-      private KeyValueStore<Object, PartialJoinMessage<Object>> 
leftStreamState = new InternalInMemoryStore<>();
+      private final JoinFunction joinFn = joinOpSpec.getJoinFn();
+      private KeyValueStore<Object, TimestampedValue<Object>> leftStreamState;
 
       @Override
-      public Object apply(Object m, Object jm) {
-        return joinFn.apply(m, jm);
+      public Object apply(Object m, Object om) {
+        return joinFn.apply(m, om);
       }
 
       @Override
@@ -237,12 +238,15 @@ public class OperatorImplGraph {
       }
 
       @Override
-      public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+      public KeyValueStore<Object, TimestampedValue<Object>> getState() {
         return leftStreamState;
       }
 
       @Override
       public void init(Config config, TaskContext context) {
+        String leftStoreName = joinOpSpec.getLeftOpName();
+        leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) 
context.getStore(leftStoreName);
+
         // user-defined joinFn should only be initialized once, so we do it 
only in left partial join function.
         joinFn.init(config, context);
       }
@@ -255,13 +259,14 @@ public class OperatorImplGraph {
     };
   }
 
-  private PartialJoinFunction<Object, Object, Object, Object> 
createRightJoinFn(JoinFunction joinFn) {
+  private PartialJoinFunction<Object, Object, Object, Object> 
createRightJoinFn(JoinOperatorSpec joinOpSpec) {
     return new PartialJoinFunction<Object, Object, Object, Object>() {
-      private KeyValueStore<Object, PartialJoinMessage<Object>> 
rightStreamState = new InternalInMemoryStore<>();
+      private final JoinFunction joinFn = joinOpSpec.getJoinFn();
+      private KeyValueStore<Object, TimestampedValue<Object>> rightStreamState;
 
       @Override
-      public Object apply(Object m, Object jm) {
-        return joinFn.apply(jm, m);
+      public Object apply(Object m, Object om) {
+        return joinFn.apply(om, m);
       }
 
       @Override
@@ -270,7 +275,16 @@ public class OperatorImplGraph {
       }
 
       @Override
-      public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+      public void init(Config config, TaskContext context) {
+        String rightStoreName = joinOpSpec.getRightOpName();
+        rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) 
context.getStore(rightStoreName);
+
+        // user-defined joinFn should only be initialized once,
+        // so we do it only in left partial join function and not here again.
+      }
+
+      @Override
+      public KeyValueStore<Object, TimestampedValue<Object>> getState() {
         return rightStreamState;
       }
     };

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index ad66962..e976a43 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -19,47 +19,39 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.metrics.Counter;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import 
org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.Clock;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * Implementation of one side of a {@link JoinOperatorSpec} that buffers and 
joins its input messages of
- * type {@code M} with buffered input messages of type {@code JM} in the 
paired {@link PartialJoinOperatorImpl}.
+ * type {@code M} with buffered input messages of type {@code OM} in the 
paired {@link PartialJoinOperatorImpl}.
  *
  * @param <K> the type of join key
  * @param <M> the type of input messages on this side of the join
- * @param <JM> the type of input message on the other side of the join
- * @param <RM> the type of join result
+ * @param <OM> the type of input message on the other side of the join
+ * @param <JM> the type of join result
  */
-class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
+class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
 
-  private final JoinOperatorSpec<K, M, JM, RM> joinOpSpec;
+  private final JoinOperatorSpec<K, M, OM, JM> joinOpSpec;
   private final boolean isLeftSide; // whether this operator impl is for the 
left side of the join
-  private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
-  private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
+  private final PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn;
+  private final PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn;
   private final long ttlMs;
   private final Clock clock;
 
-  private Counter keysRemoved;
-
-  PartialJoinOperatorImpl(JoinOperatorSpec<K, M, JM, RM> joinOpSpec, boolean 
isLeftSide,
-      PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
-      PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
+  PartialJoinOperatorImpl(JoinOperatorSpec<K, M, OM, JM> joinOpSpec, boolean 
isLeftSide,
+      PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn,
+      PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn,
       Config config, TaskContext context, Clock clock) {
     this.joinOpSpec = joinOpSpec;
     this.isLeftSide = isLeftSide;
@@ -71,54 +63,29 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends 
OperatorImpl<M, RM> {
 
   @Override
   protected void handleInit(Config config, TaskContext context) {
-    keysRemoved = context.getMetricsRegistry()
-        .newCounter(OperatorImpl.class.getName(), getOperatorName() + 
"-keys-removed");
     this.thisPartialJoinFn.init(config, context);
   }
 
   @Override
-  public Collection<RM> handleMessage(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
+  public Collection<JM> handleMessage(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
     K key = thisPartialJoinFn.getKey(message);
-    thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, 
clock.currentTimeMillis()));
-    PartialJoinMessage<JM> otherMessage = 
otherPartialJoinFn.getState().get(key);
+    thisPartialJoinFn.getState().put(key, new TimestampedValue<>(message, 
clock.currentTimeMillis()));
+    TimestampedValue<OM> otherMessage = otherPartialJoinFn.getState().get(key);
     long now = clock.currentTimeMillis();
-    if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - 
ttlMs) {
-      RM joinResult = thisPartialJoinFn.apply(message, 
otherMessage.getMessage());
+    if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
+      JM joinResult = thisPartialJoinFn.apply(message, 
otherMessage.getValue());
       return Collections.singletonList(joinResult);
     }
     return Collections.emptyList();
   }
 
   @Override
-  public Collection<RM> handleTimer(MessageCollector collector, 
TaskCoordinator coordinator) {
-    long now = clock.currentTimeMillis();
-
-    KeyValueStore<K, PartialJoinMessage<M>> thisState = 
thisPartialJoinFn.getState();
-    KeyValueIterator<K, PartialJoinMessage<M>> iterator = thisState.all();
-    List<K> keysToRemove = new ArrayList<>();
-
-    while (iterator.hasNext()) {
-      Entry<K, PartialJoinMessage<M>> entry = iterator.next();
-      if (entry.getValue().getReceivedTimeMs() < now - ttlMs) {
-        keysToRemove.add(entry.getKey());
-      } else {
-        break; // InternalInMemoryStore uses a LinkedHashMap and will return 
entries in insertion order
-      }
-    }
-
-    iterator.close();
-    thisState.deleteAll(keysToRemove);
-    keysRemoved.inc(keysToRemove.size());
-    return Collections.emptyList();
-  }
-
-  @Override
   protected void handleClose() {
     this.thisPartialJoinFn.close();
   }
 
-  protected OperatorSpec<M, RM> getOperatorSpec() {
-    return (OperatorSpec<M, RM>) joinOpSpec;
+  protected OperatorSpec<M, JM> getOperatorSpec() {
+    return (OperatorSpec<M, JM>) joinOpSpec;
   }
 
   /**
@@ -129,7 +96,6 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends 
OperatorImpl<M, RM> {
    */
   @Override
   protected String getOperatorName() {
-    String side = isLeftSide ? "L" : "R";
-    return this.joinOpSpec.getOpName() + "-" + side;
+    return isLeftSide ? joinOpSpec.getLeftOpName() : 
joinOpSpec.getRightOpName();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
index e544e2e..56d839e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
@@ -52,6 +52,7 @@ public interface TimeSeriesStore<K, V> {
    * @param key the key to look up in the store
    * @param startTimestamp the start timestamp of the range, inclusive
    * @param endTimestamp the end timestamp of the range, exclusive
+   * @return an iterator over the values for the given key in the provided 
time-range that must be closed after use
    * @throws IllegalArgumentException when startTimeStamp &gt; endTimestamp, 
or when either of them is negative
    */
   ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long 
endTimestamp);

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
index ad5e844..5e45148 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
@@ -26,18 +26,18 @@ package org.apache.samza.operators.impl.store;
  */
 public class TimestampedValue<V> {
   private final V value;
-  private final Long timestamp;
+  private final long timestamp;
 
-  public TimestampedValue(V v, Long time) {
-    value = v;
-    timestamp = time;
+  public TimestampedValue(V v, long timestamp) {
+    this.value = v;
+    this.timestamp = timestamp;
   }
 
   public V getValue() {
     return value;
   }
 
-  public Long getTimestamp() {
+  public long getTimestamp() {
     return timestamp;
   }
 
@@ -48,14 +48,14 @@ public class TimestampedValue<V> {
 
     TimestampedValue<?> that = (TimestampedValue<?>) o;
 
-    if (value != null ? !value.equals(that.value) : that.value != null) return 
false;
-    return timestamp.equals(that.timestamp);
+    if (timestamp != that.timestamp) return false;
+    return value != null ? value.equals(that.value) : (that.value == null);
   }
 
   @Override
   public int hashCode() {
     int result = value != null ? value.hashCode() : 0;
-    result = 31 * result + timestamp.hashCode();
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
new file mode 100644
index 0000000..b14f8a4
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.Serde;
+
+import java.nio.ByteBuffer;
+
+
+public class TimestampedValueSerde<V> implements Serde<TimestampedValue<V>> {
+  private static final int TIMESTAMP_BYTES = 8;
+  private final Serde<V> vSerde;
+
+  public TimestampedValueSerde(Serde<V> vSerde) {
+    this.vSerde = vSerde;
+  }
+
+  @Override
+  public TimestampedValue<V> fromBytes(byte[] bytes) {
+    ByteBuffer bb = ByteBuffer.wrap(bytes);
+    byte[] vBytes = new byte[bytes.length - TIMESTAMP_BYTES];
+    bb.get(vBytes, 0, vBytes.length);
+    V v = vSerde.fromBytes(vBytes);
+    long ts = bb.getLong();
+    return new TimestampedValue<>(v, ts);
+  }
+
+  @Override
+  public byte[] toBytes(TimestampedValue<V> tv) {
+    byte[] vBytes = vSerde.toBytes(tv.getValue());
+    int vBytesLength = vBytes != null ? vBytes.length : 0;
+    ByteBuffer bb = ByteBuffer.allocate(vBytesLength + TIMESTAMP_BYTES);
+    if (vBytes != null) {
+      bb.put(vBytes);
+    }
+    bb.putLong(tv.getTimestamp());
+    return bb.array();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index f4fe0fd..3f99280 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -18,8 +18,16 @@
  */
 package org.apache.samza.operators.spec;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
+import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.serializers.Serde;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
 
 
 /**
@@ -28,14 +36,17 @@ import 
org.apache.samza.operators.functions.WatermarkFunction;
  *
  * @param <K>  the type of join key
  * @param <M>  the type of message in this stream
- * @param <JM>  the type of message in the other stream
- * @param <RM>  the type of join result
+ * @param <OM>  the type of message in the other stream
+ * @param <JM>  the type of join result
  */
-public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { 
// Object == M | JM
+public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> 
implements StatefulOperatorSpec { // Object == M | OM
 
   private final OperatorSpec<?, M> leftInputOpSpec;
-  private final OperatorSpec<?, JM> rightInputOpSpec;
-  private final JoinFunction<K, M, JM, RM> joinFn;
+  private final OperatorSpec<?, OM> rightInputOpSpec;
+  private final JoinFunction<K, M, OM, JM> joinFn;
+  private final Serde<K> keySerde;
+  private final Serde<TimestampedValue<M>> messageSerde;
+  private final Serde<TimestampedValue<OM>> otherMessageSerde;
   private final long ttlMs;
 
   /**
@@ -47,15 +58,45 @@ public class JoinOperatorSpec<K, M, JM, RM> extends 
OperatorSpec<Object, RM> { /
    * @param ttlMs  the ttl in ms for retaining messages in each stream
    * @param opId  the unique ID for this operator
    */
-  JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> 
rightInputOpSpec,
-      JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
+  JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> 
rightInputOpSpec,
+      JoinFunction<K, M, OM, JM> joinFn, Serde<K> keySerde, Serde<M> 
messageSerde, Serde<OM> otherMessageSerde,
+      long ttlMs, int opId) {
     super(OpCode.JOIN, opId);
     this.leftInputOpSpec = leftInputOpSpec;
     this.rightInputOpSpec = rightInputOpSpec;
     this.joinFn = joinFn;
+    this.keySerde = keySerde;
+    this.messageSerde = new TimestampedValueSerde<>(messageSerde);
+    this.otherMessageSerde = new TimestampedValueSerde<>(otherMessageSerde);
     this.ttlMs = ttlMs;
   }
 
+  @Override
+  public Collection<StoreDescriptor> getStoreDescriptors() {
+    String rocksDBStoreFactory = 
"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
+    String leftStoreName = getLeftOpName();
+    String rightStoreName = getRightOpName();
+    Map<String, String> leftStoreCustomProps = ImmutableMap.of(
+        String.format("stores.%s.rocksdb.ttl.ms", leftStoreName), 
Long.toString(ttlMs),
+        String.format("stores.%s.changelog.kafka.cleanup.policy", 
leftStoreName), "delete",
+        String.format("stores.%s.changelog.kafka.retention.ms", 
leftStoreName), Long.toString(ttlMs));
+    Map<String, String> rightStoreCustomProps = ImmutableMap.of(
+        String.format("stores.%s.rocksdb.ttl.ms", rightStoreName), 
Long.toString(ttlMs),
+        String.format("stores.%s.changelog.kafka.cleanup.policy", 
rightStoreName), "delete",
+        String.format("stores.%s.changelog.kafka.retention.ms", 
rightStoreName), Long.toString(ttlMs));
+
+    return Arrays.asList(
+        new StoreDescriptor(leftStoreName, rocksDBStoreFactory, this.keySerde, 
this.messageSerde,
+            leftStoreName, leftStoreCustomProps),
+        new StoreDescriptor(rightStoreName, rocksDBStoreFactory, 
this.keySerde, this.otherMessageSerde,
+            rightStoreName, rightStoreCustomProps));
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : 
null;
+  }
+
   public OperatorSpec getLeftInputOpSpec() {
     return leftInputOpSpec;
   }
@@ -64,16 +105,19 @@ public class JoinOperatorSpec<K, M, JM, RM> extends 
OperatorSpec<Object, RM> { /
     return rightInputOpSpec;
   }
 
-  public JoinFunction<K, M, JM, RM> getJoinFn() {
+  public String getLeftOpName() {
+    return this.getOpName() + "-L";
+  }
+
+  public String getRightOpName() {
+    return this.getOpName() + "-R";
+  }
+
+  public JoinFunction<K, M, OM, JM> getJoinFn() {
     return this.joinFn;
   }
 
   public long getTtlMs() {
     return ttlMs;
   }
-
-  @Override
-  public WatermarkFunction getWatermarkFn() {
-    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : 
null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index e67179e..8b2b177 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.task.TaskContext;
 
 import java.util.ArrayList;
@@ -189,18 +190,22 @@ public class OperatorSpecs {
    * @param leftInputOpSpec  the operator spec for the stream on the left side 
of the join
    * @param rightInputOpSpec  the operator spec for the stream on the right 
side of the join
    * @param joinFn  the user-defined join function to get join keys and results
+   * @param keySerde  the serde for the join key
+   * @param messageSerde  the serde for messages in the stream on the lefta 
side of the join
+   * @param otherMessageSerde  the serde for messages in the stream on the 
right side of the join
    * @param ttlMs  the ttl in ms for retaining messages in each stream
    * @param opId  the unique ID of the operator
    * @param <K>  the type of join key
    * @param <M>  the type of input message
-   * @param <JM>  the type of message in the other join stream
-   * @param <RM>  the type of join result
+   * @param <OM>  the type of message in the other stream
+   * @param <JM>  the type of join result
    * @return  the {@link JoinOperatorSpec}
    */
-  public static <K, M, JM, RM> JoinOperatorSpec<K, M, JM, RM> 
createJoinOperatorSpec(
-      OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
-      JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
-    return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, 
ttlMs, opId);
+  public static <K, M, OM, JM> JoinOperatorSpec<K, M, OM, JM> 
createJoinOperatorSpec(
+      OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> 
rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn,
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, 
long ttlMs, int opId) {
+    return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn,
+        keySerde, messageSerde, otherMessageSerde, ttlMs, opId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
new file mode 100644
index 0000000..90dfe59
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.spec;
+
+import java.util.Collection;
+
+
+/**
+ * Spec for stateful operators.
+ */
+public interface StatefulOperatorSpec {
+
+  /**
+   * Get the store descriptors for stores required by this operator.
+   *
+   * @return store descriptors for this operator's stores
+   */
+  Collection<StoreDescriptor> getStoreDescriptors();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
new file mode 100644
index 0000000..8aa2dd9
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.Serde;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A descriptor for a store.
+ */
+public class StoreDescriptor {
+  private final String storeName;
+  private final String storeFactory;
+  private final Serde keySerde;
+  private final Serde msgSerde;
+  private final String changelogStream;
+  private final Map<String, String> otherProperties;
+
+  StoreDescriptor(String storeName, String storeFactory, Serde keySerde, Serde 
msgSerde,
+      String changelogStream, Map<String, String> otherProperties) {
+    this.storeName = storeName;
+    this.storeFactory = storeFactory;
+    this.keySerde = keySerde;
+    this.msgSerde = msgSerde;
+    this.changelogStream = changelogStream;
+    this.otherProperties = otherProperties;
+  }
+
+  public String getStoreName() {
+    return storeName;
+  }
+
+  public Serde getKeySerde() {
+    return keySerde;
+  }
+
+  public Serde getMsgSerde() {
+    return msgSerde;
+  }
+
+  public JavaStorageConfig getStorageConfigs() {
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(String.format(StorageConfig.FACTORY(), this.getStoreName()), 
this.getStoreFactory());
+    configs.put(String.format(StorageConfig.CHANGELOG_STREAM(), 
this.getStoreName()), this.getChangelogStream());
+    configs.putAll(this.getOtherProperties());
+    return new JavaStorageConfig(new MapConfig(configs));
+  }
+
+  private String getStoreFactory() {
+    return storeFactory;
+  }
+
+  private String getChangelogStream() {
+    return changelogStream;
+  }
+
+  private Map<String, String> getOtherProperties() {
+    return otherProperties;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 1caca26..ff0299d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -154,7 +154,7 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
       writePlanJsonFile(executionPlanJson);
 
       // 2. create the necessary streams
-      // TODO: System generated intermediate streams should have robust naming 
scheme. Refer JIRA-1391
+      // TODO: System generated intermediate streams should have robust naming 
scheme. See SAMZA-1391
       String planId = String.valueOf(executionPlanJson.hashCode());
       createStreams(planId, plan.getIntermediateStreams());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 4c4a645..9d44ec1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -216,13 +216,14 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   public void onNewJobModelAvailable(final String version) {
     debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () ->
       {
-        LOG.info("pid=" + processorId + "new JobModel available");
+        LOG.info("pid=" + processorId + ": new JobModel available");
         // get the new job model from ZK
         newJobModel = zkUtils.getJobModel(version);
         LOG.info("pid=" + processorId + ": new JobModel available. ver=" + 
version + "; jm = " + newJobModel);
 
         if (!newJobModel.getContainers().containsKey(processorId)) {
-          LOG.info("JobModel: {} does not contain the processorId: {}. 
Stopping the processor.", newJobModel, processorId);
+          LOG.info("New JobModel does not contain pid={}. Stopping this 
processor. New JobModel: {}",
+              processorId, newJobModel);
           stop();
         } else {
           // stop current work

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c3e1be2..6071c1f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -805,7 +805,7 @@ class SamzaContainer(
    */
   def shutdown(): Unit = {
     if (status == SamzaContainerStatus.STOPPED || status == 
SamzaContainerStatus.FAILED) {
-      throw new IllegalContainerStateException("Cannot shutdown a container 
with status - " + status)
+      throw new IllegalContainerStateException("Cannot shutdown a container 
with status " + status)
     }
     shutdownRunLoop()
   }
@@ -936,16 +936,18 @@ class SamzaContainer(
     val runLoopThread = Thread.currentThread()
     shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
       override def run() = {
-        info("Shutting down, will wait up to %s ms" format shutdownMs)
+        info("Shutting down, will wait up to %s ms." format shutdownMs)
         shutdownRunLoop()  //TODO: Pull out shutdown hook to 
LocalContainerRunner or SP
         try {
           runLoopThread.join(shutdownMs)
         } catch {
           case e: Throwable => // Ignore to avoid deadlock with 
uncaughtExceptionHandler. See SAMZA-1220
-            error("Did not shut down within %s ms, exiting" format shutdownMs, 
e)
+            error("Did not shut down within %s ms, exiting." format 
shutdownMs, e)
         }
         if (!runLoopThread.isAlive) {
           info("Shutdown complete")
+        } else {
+          error("Did not shut down within %s ms, exiting." format shutdownMs)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
 
b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 95939c4..df393b0 100644
--- 
a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ 
b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -49,7 +49,9 @@ public class OrderShipmentJoinExample implements 
StreamApplication {
             KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(FulfilledOrderRecord.class)));
 
     orders
-        .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
+        .join(shipments, new MyJoinFunction(),
+            new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new 
JsonSerdeV2<>(ShipmentRecord.class),
+            Duration.ofMinutes(1))
         .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
         .sendTo(fulfilledOrders);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 50b0a13..f6441dc 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -31,6 +31,7 @@ import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -130,14 +131,14 @@ public class TestExecutionPlanner {
      */
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream<KV<Object, Object>> m1 =
+    MessageStream<KV<Object, Object>> messageStream1 =
         streamGraph.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
-    MessageStream<KV<Object, Object>> m2 =
+    MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value)
             .filter(m -> true);
-    MessageStream<KV<Object, Object>> m3 =
+    MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value)
@@ -145,8 +146,14 @@ public class TestExecutionPlanner {
     OutputStream<KV<Object, Object>> output1 = 
streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> output2 = 
streamGraph.getOutputStream("output2");
 
-    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2))
+        .sendTo(output1);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1))
+        .sendTo(output2);
 
     return streamGraph;
   }
@@ -154,14 +161,14 @@ public class TestExecutionPlanner {
   private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream<KV<Object, Object>> m1 =
+    MessageStream<KV<Object, Object>> messageStream1 =
         streamGraph.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
-    MessageStream<KV<Object, Object>> m2 =
+    MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value)
             .filter(m -> true);
-    MessageStream<KV<Object, Object>> m3 =
+    MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value)
@@ -169,17 +176,26 @@ public class TestExecutionPlanner {
     OutputStream<KV<Object, Object>> output1 = 
streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> output2 = 
streamGraph.getOutputStream("output2");
 
-    m1.map(m -> m)
+    messageStream1.map(m -> m)
         .filter(m->true)
         .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
 
-    m2.map(m -> m)
+    messageStream2.map(m -> m)
         .filter(m->true)
         .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
 
-    m1.join(m2, mock(JoinFunction.class), 
Duration.ofMillis(1600)).sendTo(output1);
-    m3.join(m2, mock(JoinFunction.class), 
Duration.ofMillis(100)).sendTo(output2);
-    m3.join(m2, mock(JoinFunction.class), 
Duration.ofMillis(252)).sendTo(output2);
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(1600))
+        .sendTo(output1);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(100))
+        .sendTo(output2);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(252))
+        .sendTo(output2);
 
     return streamGraph;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 095e407..10c4aa3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -30,6 +30,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -107,14 +108,14 @@ public class TestJobGraphJsonGenerator {
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
-    MessageStream<KV<Object, Object>> m1 =
+    MessageStream<KV<Object, Object>> messageStream1 =
         streamGraph.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
-    MessageStream<KV<Object, Object>> m2 =
+    MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value)
             .filter(m -> true);
-    MessageStream<KV<Object, Object>> m3 =
+    MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value)
@@ -122,9 +123,15 @@ public class TestJobGraphJsonGenerator {
     OutputStream<KV<Object, Object>> outputStream1 = 
streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> outputStream2 = 
streamGraph.getOutputStream("output2");
 
-    m1.join(m2, mock(JoinFunction.class), 
Duration.ofHours(2)).sendTo(outputStream1);
-    m2.sink((message, collector, coordinator) -> { });
-    m3.join(m2, mock(JoinFunction.class), 
Duration.ofHours(1)).sendTo(outputStream2);
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2))
+        .sendTo(outputStream1);
+    messageStream2.sink((message, collector, coordinator) -> { });
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1))
+        .sendTo(outputStream2);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     ExecutionPlan plan = planner.plan(streamGraph);

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index 918da26..f6ebaf9 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -26,6 +26,8 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -35,6 +37,7 @@ import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
@@ -50,25 +53,35 @@ public class TestJobNode {
   @Test
   public void testAddSerdeConfigs() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+    StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
+    StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
     StreamSpec outputSpec = new StreamSpec("output", "output", 
"output-system");
-    StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", 
"partition_by-1", "intermediate-system");
-    doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+    StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-2", 
"partition_by-2", "intermediate-system");
+    doReturn(input1Spec).when(mockRunner).getStreamSpec("input1");
+    doReturn(input2Spec).when(mockRunner).getStreamSpec("input2");
     doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
-    
doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+    
doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-2");
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
     streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
-    MessageStream<KV<String, Object>> input = 
streamGraph.getInputStream("input");
+    MessageStream<KV<String, Object>> input1 = 
streamGraph.getInputStream("input1");
+    MessageStream<KV<String, Object>> input2 = 
streamGraph.getInputStream("input2");
     OutputStream<KV<String, Object>> output = 
streamGraph.getOutputStream("output");
-    input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+    JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = 
mock(JoinFunction.class);
+    input1
+        .partitionBy(KV::getKey, KV::getValue).map(kv -> kv.value)
+        .join(input2.map(kv -> kv.value), mockJoinFn,
+            new StringSerde(), new JsonSerdeV2<>(Object.class), new 
JsonSerdeV2<>(Object.class), Duration.ofHours(1))
+        .sendTo(output);
 
     JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, 
mock(Config.class));
     Config config = new MapConfig();
-    StreamEdge inputEdge = new StreamEdge(inputSpec, config);
+    StreamEdge input1Edge = new StreamEdge(input1Spec, config);
+    StreamEdge input2Edge = new StreamEdge(input2Spec, config);
     StreamEdge outputEdge = new StreamEdge(outputSpec, config);
     StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config);
-    jobNode.addInEdge(inputEdge);
+    jobNode.addInEdge(input1Edge);
+    jobNode.addInEdge(input2Edge);
     jobNode.addOutEdge(outputEdge);
     jobNode.addInEdge(repartitionEdge);
     jobNode.addOutEdge(repartitionEdge);
@@ -85,28 +98,74 @@ public class TestJobNode {
         e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), 
""),
         e -> 
serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
     ));
-    assertEquals(2, serializers.size());
+    assertEquals(5, serializers.size()); // 2 default + 3 specific for join
 
-    String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
-    String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
-    assertTrue(deserializedSerdes.containsKey(inputKeySerde));
-    assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
-    assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+    String input1KeySerde = mapConfig.get("streams.input1.samza.key.serde");
+    String input1MsgSerde = mapConfig.get("streams.input1.samza.msg.serde");
+    assertTrue("Serialized serdes should contain input1 key serde",
+        deserializedSerdes.containsKey(input1KeySerde));
+    assertTrue("Serialized input1 key serde should be a StringSerde",
+        input1KeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain input1 msg serde",
+        deserializedSerdes.containsKey(input1MsgSerde));
+    assertTrue("Serialized input1 msg serde should be a JsonSerdeV2",
+        input1MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String input2KeySerde = mapConfig.get("streams.input2.samza.key.serde");
+    String input2MsgSerde = mapConfig.get("streams.input2.samza.msg.serde");
+    assertTrue("Serialized serdes should contain input2 key serde",
+        deserializedSerdes.containsKey(input2KeySerde));
+    assertTrue("Serialized input2 key serde should be a StringSerde",
+        input2KeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain input2 msg serde",
+        deserializedSerdes.containsKey(input2MsgSerde));
+    assertTrue("Serialized input2 msg serde should be a JsonSerdeV2",
+        input2MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
 
     String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
     String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
-    assertTrue(deserializedSerdes.containsKey(outputKeySerde));
-    assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
-    assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
-
-    String partitionByKeySerde = 
mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
-    String partitionByMsgSerde = 
mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
-    assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
-    
assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
-    
assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain output key serde",
+        deserializedSerdes.containsKey(outputKeySerde));
+    assertTrue("Serialized output key serde should be a StringSerde",
+        outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain output msg serde",
+        deserializedSerdes.containsKey(outputMsgSerde));
+    assertTrue("Serialized output msg serde should be a StringSerde",
+        outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String partitionByKeySerde = 
mapConfig.get("streams.null-null-partition_by-2.samza.key.serde");
+    String partitionByMsgSerde = 
mapConfig.get("streams.null-null-partition_by-2.samza.msg.serde");
+    assertTrue("Serialized serdes should contain intermediate stream key 
serde",
+        deserializedSerdes.containsKey(partitionByKeySerde));
+    assertTrue("Serialized intermediate stream key serde should be a 
StringSerde",
+        partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain intermediate stream msg 
serde",
+        deserializedSerdes.containsKey(partitionByMsgSerde));
+    assertTrue(
+        "Serialized intermediate stream msg serde should be a StringSerde",
+        partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String leftJoinStoreKeySerde = mapConfig.get("stores.join-6-L.key.serde");
+    String leftJoinStoreMsgSerde = mapConfig.get("stores.join-6-L.msg.serde");
+    assertTrue("Serialized serdes should contain left join store key serde",
+        deserializedSerdes.containsKey(leftJoinStoreKeySerde));
+    assertTrue("Serialized left join store key serde should be a StringSerde",
+        leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain left join store msg serde",
+        deserializedSerdes.containsKey(leftJoinStoreMsgSerde));
+    assertTrue("Serialized left join store msg serde should be a 
TimestampedValueSerde",
+        
leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
+
+    String rightJoinStoreKeySerde = mapConfig.get("stores.join-6-R.key.serde");
+    String rightJoinStoreMsgSerde = mapConfig.get("stores.join-6-R.msg.serde");
+    assertTrue("Serialized serdes should contain right join store key serde",
+        deserializedSerdes.containsKey(rightJoinStoreKeySerde));
+    assertTrue("Serialized right join store key serde should be a StringSerde",
+        rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain right join store msg serde",
+        deserializedSerdes.containsKey(rightJoinStoreMsgSerde));
+    assertTrue("Serialized right join store msg serde should be a 
TimestampedValueSerde",
+        
rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 0df6721..1120c25 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,10 +22,13 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -34,7 +37,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.TestClock;
@@ -49,6 +51,7 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -261,6 +264,9 @@ public class TestJoinOperator {
         .of(new SystemStreamPartition("insystem", "instream", new 
Partition(0)),
             new SystemStreamPartition("insystem2", "instream2", new 
Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
+    // need to return different stores for left and right side
+    when(taskContext.getStore(eq("join-4-L"))).thenReturn(new 
InternalInMemoryStore<>());
+    when(taskContext.getStore(eq("join-4-R"))).thenReturn(new 
InternalInMemoryStore<>());
 
     Config config = mock(Config.class);
 
@@ -287,7 +293,9 @@ public class TestJoinOperator {
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
       inStream
-          .join(inStream2, joinFn, JOIN_TTL)
+          .join(inStream2, joinFn,
+              new IntegerSerde(), new JsonSerdeV2<>(FirstStreamIME.class), new 
JsonSerdeV2<>(SecondStreamIME.class),
+              JOIN_TTL)
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
             });

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index c6554bc..f23bb14 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -41,6 +41,7 @@ import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -288,7 +289,7 @@ public class TestMessageStreamImpl {
         mock(JoinFunction.class);
 
     Duration joinTtl = Duration.ofMinutes(1);
-    source1.join(source2, mockJoinFn, joinTtl);
+    source1.join(source2, mockJoinFn, mock(Serde.class), mock(Serde.class), 
mock(Serde.class), joinTtl);
 
     ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
     
verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index d73c545..a759e52 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -44,6 +44,7 @@ import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
@@ -218,10 +219,13 @@ public class TestOperatorImplGraph {
     JoinFunction mockJoinFunction = mock(JoinFunction.class);
     MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", 
new NoOpSerde<>());
     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", 
new NoOpSerde<>());
-    inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
+    inputStream1.join(inputStream2, mockJoinFunction,
+        mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1));
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
+    when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(new 
InternalInMemoryStore<>());
+    when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(new 
InternalInMemoryStore<>());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
 
@@ -386,15 +390,22 @@ public class TestOperatorImplGraph {
         .thenReturn(int2);
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    Serde inputSerde = new NoOpSerde<>();
-    MessageStream m1 = streamGraph.getInputStream("input1", inputSerde).map(m 
-> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", 
inputSerde).filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", 
inputSerde).filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m);
-    OutputStream<Object> om1 = streamGraph.getOutputStream("output1");
-    OutputStream<Object> om2 = streamGraph.getOutputStream("output2");
-
-    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m 
-> "haha", m -> m).sendTo(om1);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
+    MessageStream messageStream1 = streamGraph.getInputStream("input1").map(m 
-> m);
+    MessageStream messageStream2 = 
streamGraph.getInputStream("input2").filter(m -> true);
+    MessageStream messageStream3 =
+        streamGraph.getInputStream("input3").filter(m -> true).partitionBy(m 
-> "hehe", m -> m).map(m -> m);
+    OutputStream<Object> outputStream1 = 
streamGraph.getOutputStream("output1");
+    OutputStream<Object> outputStream2 = 
streamGraph.getOutputStream("output2");
+
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2))
+        .partitionBy(m -> "haha", m -> m)
+        .sendTo(outputStream1);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1))
+        .sendTo(outputStream2);
 
     Multimap<SystemStream, SystemStream> outputToInput = 
OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
     Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());

Reply via email to