Repository: samza
Updated Branches:
  refs/heads/master 7ad6631bb -> 7f1ec6492


SAMZA-1444: Removed circular dependency b/w samza-core test and 
samza-kv-rocksdb test

Also added an implementation for KVSerde.
vjagadish1989, thanks for the TestInMemoryStore implementation!

The only usage of InternalInMemoryStore is now in WindowOperatorImpl, which 
will be removed as part of store wiring for the window operator.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jacob Maes <[email protected]>, Xinyu Liu <[email protected]>

Closes #315 from prateekm/test-store


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7f1ec649
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7f1ec649
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7f1ec649

Branch: refs/heads/master
Commit: 7f1ec6492a4c42e41e732f34cb4180bd154aecc5
Parents: 7ad6631
Author: Prateek Maheshwari <[email protected]>
Authored: Thu Oct 5 10:59:39 2017 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Thu Oct 5 10:59:39 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 -
 .../org.apache.samza.serializers/KVSerde.scala  |  42 ++++--
 .../samza/operators/TestJoinOperator.java       |  94 ++++++------
 .../operators/impl/TestOperatorImplGraph.java   |  12 +-
 .../operators/impl/store/TestInMemoryStore.java | 144 +++++++++++++++++++
 .../impl/store/TestTimeSeriesStoreImpl.java     |  42 ++----
 6 files changed, 239 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index c710053..e0129f9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -179,7 +179,6 @@ project(":samza-core_$scalaVersion") {
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile project(":samza-api").sourceSets.test.output
-    testCompile project(":samza-kv-rocksdb_$scalaVersion")
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
index 5b0a6e3..f5cd5cf 100644
--- a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
@@ -19,7 +19,8 @@
 
 package org.apache.samza.serializers
 
-import org.apache.samza.SamzaException
+import java.nio.ByteBuffer
+
 import org.apache.samza.operators.KV
 
 object KVSerde {
@@ -27,26 +28,39 @@ object KVSerde {
 }
 
 /**
-  * A marker serde class to indicate that messages are keyed and should be 
deserialized as K-V pairs.
-  * This class is intended for use cases where a single Serde parameter or 
configuration is required.
+  * A serde for [[KV]] key-value pairs.
+  *
+  * When this serde is used for streams in the High Level API, Samza wires up 
and uses the provided
+  * keySerde and valueSerde for the keys and values in the stream separately. 
I.e., the fromBytes and toBytes
+  * methods in this class aren't used directly for streams.
   *
   * @tparam K type of the key in the message
   * @tparam V type of the value in the message
   */
 class KVSerde[K, V](keySerde: Serde[K], valueSerde: Serde[V]) extends 
Serde[KV[K, V]] {
-  /**
-    * Implementation Note: This serde must not be used by the framework for 
serialization/deserialization directly.
-    * Wire up and use the constituent keySerde and valueSerde instead.
-    */
-
-  override def fromBytes(bytes: Array[Byte]): Nothing = {
-    throw new NotImplementedError("This is a marker serde and must not be used 
directly. " +
-      "Samza must wire up and use the keySerde and valueSerde instead.")
+  override def fromBytes(bytes: Array[Byte]): KV[K, V] = {
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    val keyLength = byteBuffer.getInt()
+    val keyBytes = new Array[Byte](keyLength)
+    byteBuffer.get(keyBytes)
+    val valueLength = byteBuffer.getInt()
+    val valueBytes = new Array[Byte](valueLength)
+    byteBuffer.get(valueBytes)
+    val key = keySerde.fromBytes(keyBytes)
+    val value = valueSerde.fromBytes(valueBytes)
+    KV.of(key, value)
   }
 
-  override def toBytes(`object`: KV[K, V]): Nothing = {
-    throw new SamzaException("This is a marker serde and must not be used 
directly. " +
-      "Samza must wire up and use the keySerde and valueSerde instead.")
+  override def toBytes(obj: KV[K, V]): Array[Byte] = {
+    val keyBytes = keySerde.toBytes(obj.key)
+    val valueBytes = valueSerde.toBytes(obj.value)
+    val bytes = new Array[Byte](8 + keyBytes.length + 8 + valueBytes.length)
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    byteBuffer.putInt(keyBytes.length)
+    byteBuffer.put(keyBytes)
+    byteBuffer.putInt(valueBytes.length)
+    byteBuffer.put(valueBytes)
+    byteBuffer.array()
   }
 
   def getKeySerde: Serde[K] = keySerde

http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 1120c25..09fb56a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -25,10 +25,10 @@ import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.impl.store.TestInMemoryStore;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -63,7 +63,8 @@ public class TestJoinOperator {
 
   @Test
   public void join() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -79,7 +80,8 @@ public class TestJoinOperator {
   @Test
   public void joinFnInitAndClose() throws Exception {
     TestJoinFunction joinFn = new TestJoinFunction();
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(joinFn));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(joinFn);
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     assertEquals(1, joinFn.getNumInitCalls());
     MessageCollector messageCollector = mock(MessageCollector.class);
 
@@ -96,7 +98,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -111,7 +114,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatch() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -125,7 +129,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatchReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -139,7 +144,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKey() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -156,7 +162,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKeyReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -173,7 +180,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessages() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -195,7 +203,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessagesReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -218,7 +227,8 @@ public class TestJoinOperator {
   @Test
   public void joinRemovesExpiredMessages() throws Exception {
     TestClock testClock = new TestClock();
-    StreamOperatorTask sot = createStreamOperatorTask(testClock, new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -234,11 +244,11 @@ public class TestJoinOperator {
     assertTrue(output.isEmpty());
   }
 
-
   @Test
   public void joinRemovesExpiredMessagesReverse() throws Exception {
     TestClock testClock = new TestClock();
-    StreamOperatorTask sot = createStreamOperatorTask(testClock, new 
TestJoinStreamApplication(new TestJoinFunction()));
+    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -265,8 +275,10 @@ public class TestJoinOperator {
             new SystemStreamPartition("insystem2", "instream2", new 
Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     // need to return different stores for left and right side
-    when(taskContext.getStore(eq("join-4-L"))).thenReturn(new 
InternalInMemoryStore<>());
-    when(taskContext.getStore(eq("join-4-R"))).thenReturn(new 
InternalInMemoryStore<>());
+    IntegerSerde integerSerde = new IntegerSerde();
+    TimestampedValueSerde timestampedValueSerde = new 
TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));
+    when(taskContext.getStore(eq("join-2-L"))).thenReturn(new 
TestInMemoryStore(integerSerde, timestampedValueSerde));
+    when(taskContext.getStore(eq("join-2-R"))).thenReturn(new 
TestInMemoryStore(integerSerde, timestampedValueSerde));
 
     Config config = mock(Config.class);
 
@@ -285,24 +297,22 @@ public class TestJoinOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-      MessageStream<FirstStreamIME> inStream =
-          graph.getInputStream("instream", kvSerde).map(FirstStreamIME::new);
-      MessageStream<SecondStreamIME> inStream2 =
-          graph.getInputStream("instream2", kvSerde).map(SecondStreamIME::new);
+      IntegerSerde integerSerde = new IntegerSerde();
+      KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, 
integerSerde);
+      MessageStream<KV<Integer, Integer>> inStream = 
graph.getInputStream("instream", kvSerde);
+      MessageStream<KV<Integer, Integer>> inStream2 = 
graph.getInputStream("instream2", kvSerde);
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
       inStream
-          .join(inStream2, joinFn,
-              new IntegerSerde(), new JsonSerdeV2<>(FirstStreamIME.class), new 
JsonSerdeV2<>(SecondStreamIME.class),
-              JOIN_TTL)
+          .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL)
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
             });
     }
   }
 
-  private static class TestJoinFunction implements JoinFunction<Integer, 
FirstStreamIME, SecondStreamIME, Integer> {
+  private static class TestJoinFunction
+      implements JoinFunction<Integer, KV<Integer, Integer>, KV<Integer, 
Integer>, Integer> {
 
     private int numInitCalls = 0;
     private int numCloseCalls = 0;
@@ -313,18 +323,18 @@ public class TestJoinOperator {
     }
 
     @Override
-    public Integer apply(FirstStreamIME message, SecondStreamIME otherMessage) 
{
-      return (Integer) message.getMessage() + (Integer) 
otherMessage.getMessage();
+    public Integer apply(KV<Integer, Integer> message, KV<Integer, Integer> 
otherMessage) {
+      return message.value + otherMessage.value;
     }
 
     @Override
-    public Integer getFirstKey(FirstStreamIME message) {
-      return (Integer) message.getKey();
+    public Integer getFirstKey(KV<Integer, Integer> message) {
+      return message.key;
     }
 
     @Override
-    public Integer getSecondKey(SecondStreamIME message) {
-      return (Integer) message.getKey();
+    public Integer getSecondKey(KV<Integer, Integer> message) {
+      return message.key;
     }
 
     @Override
@@ -332,34 +342,24 @@ public class TestJoinOperator {
       numCloseCalls++;
     }
 
-    public int getNumInitCalls() {
+    int getNumInitCalls() {
       return numInitCalls;
     }
 
-    public int getNumCloseCalls() {
+    int getNumCloseCalls() {
       return numCloseCalls;
     }
   }
 
   private static class FirstStreamIME extends IncomingMessageEnvelope {
-    FirstStreamIME(KV<Integer, Integer> message) {
-      super(new SystemStreamPartition(
-          "insystem", "instream", new Partition(0)), "1", message.getKey(), 
message.getValue());
-    }
-
-    FirstStreamIME(Integer key, Integer message) {
-      this(KV.of(key, message));
+    FirstStreamIME(Integer key, Integer value) {
+      super(new SystemStreamPartition("insystem", "instream", new 
Partition(0)), "1", key, value);
     }
   }
 
   private static class SecondStreamIME extends IncomingMessageEnvelope {
-    SecondStreamIME(KV<Integer, Integer> message) {
-      super(new SystemStreamPartition(
-          "insystem2", "instream2", new Partition(0)), "1", message.getKey(), 
message.getValue());
-    }
-
-    SecondStreamIME(Integer key, Integer message) {
-      this(KV.of(key, message));
+    SecondStreamIME(Integer key, Integer value) {
+      super(new SystemStreamPartition("insystem2", "instream2", new 
Partition(0)), "1", key, value);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index a759e52..1c14fb4 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -43,14 +43,15 @@ import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
@@ -224,8 +225,10 @@ public class TestOperatorImplGraph {
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
-    when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(new 
InternalInMemoryStore<>());
-    when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(new 
InternalInMemoryStore<>());
+    KeyValueStore mockLeftStore = mock(KeyValueStore.class);
+    when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(mockLeftStore);
+    KeyValueStore mockRightStore = mock(KeyValueStore.class);
+    when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(mockRightStore);
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
 
@@ -245,12 +248,15 @@ public class TestOperatorImplGraph {
     Object joinKey = new Object();
     // verify that left partial join operator calls getFirstKey
     Object mockLeftMessage = mock(Object.class);
+    long currentTimeMillis = System.currentTimeMillis();
+    when(mockLeftStore.get(eq(joinKey))).thenReturn(new 
TimestampedValue<>(mockLeftMessage, currentTimeMillis));
     
when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
     inputOpImpl1.onMessage(KV.of("", mockLeftMessage), 
mock(MessageCollector.class), mock(TaskCoordinator.class));
     verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
 
     // verify that right partial join operator calls getSecondKey
     Object mockRightMessage = mock(Object.class);
+    when(mockRightStore.get(eq(joinKey))).thenReturn(new 
TimestampedValue<>(mockRightMessage, currentTimeMillis));
     
when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
     inputOpImpl2.onMessage(KV.of("", mockRightMessage), 
mock(MessageCollector.class), mock(TaskCoordinator.class));
     verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);

http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
new file mode 100644
index 0000000..d16954d
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.store;
+
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueStore;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * An in-memory store that supports range queries.
+ *
+ * @param <K> the type of keys in the store
+ * @param <V> the type of values in the store
+ */
+public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
+  private final ConcurrentSkipListMap<byte[], byte[]> map =
+      new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+  private final Serde<K> keySerde;
+  private final Serde<V> valSerde;
+
+  public TestInMemoryStore(Serde<K> keySerde, Serde<V> valSerde) {
+    this.keySerde = keySerde;
+    this.valSerde = valSerde;
+  }
+
+  @Override
+  public V get(K key) {
+    byte[] keyBytes = keySerde.toBytes(key);
+    byte[] valBytes = map.get(keyBytes);
+    if (valBytes == null) return null;
+    return valSerde.fromBytes(valBytes);
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    Map<K, V> results = new HashMap<>();
+    for (K key : keys) {
+      byte[] value = map.get(keySerde.toBytes(key));
+      if (value != null) {
+        results.put(key, valSerde.fromBytes(value));
+      }
+    }
+    return results;
+  }
+
+  @Override
+  public void put(K key, V value) {
+    map.put(keySerde.toBytes(key), valSerde.toBytes(value));
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    for (Entry<K, V> entry : entries) {
+      put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void delete(K key) {
+    map.remove(keySerde.toBytes(key));
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    for (K k : keys) {
+      delete(k);
+    }
+  }
+
+  @Override
+  public KeyValueIterator<K, V> range(K from, K to) {
+    ConcurrentNavigableMap<byte[], byte[]> values = 
map.subMap(keySerde.toBytes(from), keySerde.toBytes(to));
+    return new InMemoryIterator(values.entrySet().iterator(), keySerde, 
valSerde);
+  }
+
+  @Override
+  public KeyValueIterator<K, V> all() {
+    return new InMemoryIterator(map.entrySet().iterator(), keySerde, valSerde);
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void flush() {
+
+  }
+
+  private static class InMemoryIterator<K, V> implements KeyValueIterator<K, 
V> {
+
+    Iterator<Map.Entry<byte[], byte[]>> wrapped;
+    Serde<K> keySerializer;
+    Serde<V> valSerializer;
+
+    public InMemoryIterator(Iterator<Map.Entry<byte[], byte[]>> wrapped, 
Serde<K> keySerde, Serde<V> valSerde) {
+      this.wrapped = wrapped;
+      this.keySerializer = keySerde;
+      this.valSerializer = valSerde;
+    }
+
+    @Override
+    public void close() {
+      //no op
+    }
+
+    @Override
+    public boolean hasNext() {
+      return wrapped.hasNext();
+    }
+
+    @Override
+    public Entry<K, V> next() {
+      Map.Entry<byte[], byte[]> next = wrapped.next();
+      return new Entry<>(keySerializer.fromBytes(next.getKey()), 
valSerializer.fromBytes(next.getValue()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
index 7677826..0315a20 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
@@ -19,25 +19,14 @@
  */
 package org.apache.samza.operators.impl.store;
 
-import com.google.common.io.Files;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.ByteSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.ClosableIterator;
-import org.apache.samza.storage.kv.KeyValueStoreMetrics;
-import org.apache.samza.storage.kv.RocksDbKeyValueStore;
-import org.apache.samza.storage.kv.SerializedKeyValueStore;
-import org.apache.samza.storage.kv.SerializedKeyValueStoreMetrics;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.junit.Assert;
 import org.junit.Test;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.WriteOptions;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -45,7 +34,7 @@ public class TestTimeSeriesStoreImpl {
 
   @Test
   public void testGetOnTimestampBoundaries() {
-    TimeSeriesStore<String, byte[]> timeSeriesStore = 
newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new 
StringSerde("UTF-8"), true);
 
     // insert an entry with key "hello" at timestamps "1" and "2"
     timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
@@ -82,7 +71,7 @@ public class TestTimeSeriesStoreImpl {
 
   @Test
   public void testGetWithNonExistentKeys() {
-    TimeSeriesStore<String, byte[]> timeSeriesStore = 
newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new 
StringSerde("UTF-8"), true);
     timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
 
     // read from a non-existent key
@@ -96,7 +85,7 @@ public class TestTimeSeriesStoreImpl {
 
   @Test
   public void testPutWithMultipleEntries() {
-    TimeSeriesStore<String, byte[]> timeSeriesStore = 
newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new 
StringSerde("UTF-8"), true);
 
     // insert 100 entries at timestamps "1" and "2"
     for (int i = 0; i < 100; i++) {
@@ -126,7 +115,7 @@ public class TestTimeSeriesStoreImpl {
   @Test
   public void testGetOnTimestampBoundariesWithOverwriteMode() {
     // instantiate a store in overwrite mode
-    TimeSeriesStore<String, byte[]> timeSeriesStore = 
newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false);
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new 
StringSerde("UTF-8"), false);
 
     // insert an entry with key "hello" at timestamps "1" and "2"
     timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
@@ -164,7 +153,7 @@ public class TestTimeSeriesStoreImpl {
   @Test
   public void testDeletesInOverwriteMode() {
     // instantiate a store in overwrite mode
-    TimeSeriesStore<String, byte[]> timeSeriesStore = 
newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false);
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new 
StringSerde("UTF-8"), false);
 
     // insert an entry with key "hello" at timestamps "1" and "2"
     timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
@@ -179,7 +168,8 @@ public class TestTimeSeriesStoreImpl {
     Assert.assertEquals(0, values.size());
   }
 
-  private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, 
V> store, K key, long startTimestamp, long endTimestamp) {
+  private static <K, V> List<TimestampedValue<V>> readStore(
+      TimeSeriesStore<K, V> store, K key, long startTimestamp, long 
endTimestamp) {
     List<TimestampedValue<V>> list = new ArrayList<>();
     ClosableIterator<TimestampedValue<V>> storeValuesIterator = store.get(key, 
startTimestamp, endTimestamp);
 
@@ -192,19 +182,9 @@ public class TestTimeSeriesStoreImpl {
     return list;
   }
 
-  private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(String 
storeName, Serde<K> keySerde, boolean appendMode) {
-    RocksDbKeyValueStore rocksKVStore = newRocksDbStore("someStore");
-    SerializedKeyValueStore<TimeSeriesKey<K>, byte[]> kvStore = new 
SerializedKeyValueStore<>(rocksKVStore,
-            new TimeSeriesKeySerde<>(keySerde), new ByteSerde(),
-            new SerializedKeyValueStoreMetrics("", new MetricsRegistryMap()));
+  private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(Serde<K> 
keySerde, boolean appendMode) {
+    KeyValueStore<TimeSeriesKey<K>, byte[]> kvStore =
+        new TestInMemoryStore(new TimeSeriesKeySerde<>(keySerde), new 
ByteSerde());
     return new TimeSeriesStoreImpl<>(kvStore, appendMode);
   }
-
-  private static RocksDbKeyValueStore newRocksDbStore(String storeName) {
-    File dir = Files.createTempDir();
-    return new RocksDbKeyValueStore(dir,
-            new 
Options().setCreateIfMissing(true).setCompressionType(CompressionType.SNAPPY_COMPRESSION),
 new MapConfig(),
-            false, storeName, new WriteOptions(), new FlushOptions(),
-            new KeyValueStoreMetrics(storeName, new MetricsRegistryMap()));
-  }
 }

Reply via email to