Repository: samza Updated Branches: refs/heads/master 7ad6631bb -> 7f1ec6492
SAMZA-1444: Removed circular dependency b/w samza-core test and samza-kv-rocksdb test Also added an implementation for KVSerde. vjagadish1989, thanks for the TestInMemoryStore implementation! The only usage of InternalInMemoryStore is now in WindowOperatorImpl, which will be removed as part of store wiring for the window operator. Author: Prateek Maheshwari <[email protected]> Reviewers: Jacob Maes <[email protected]>, Xinyu Liu <[email protected]> Closes #315 from prateekm/test-store Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7f1ec649 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7f1ec649 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7f1ec649 Branch: refs/heads/master Commit: 7f1ec6492a4c42e41e732f34cb4180bd154aecc5 Parents: 7ad6631 Author: Prateek Maheshwari <[email protected]> Authored: Thu Oct 5 10:59:39 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Thu Oct 5 10:59:39 2017 -0700 ---------------------------------------------------------------------- build.gradle | 1 - .../org.apache.samza.serializers/KVSerde.scala | 42 ++++-- .../samza/operators/TestJoinOperator.java | 94 ++++++------ .../operators/impl/TestOperatorImplGraph.java | 12 +- .../operators/impl/store/TestInMemoryStore.java | 144 +++++++++++++++++++ .../impl/store/TestTimeSeriesStoreImpl.java | 42 ++---- 6 files changed, 239 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index c710053..e0129f9 100644 --- a/build.gradle +++ b/build.gradle @@ -179,7 +179,6 @@ project(":samza-core_$scalaVersion") { compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" testCompile project(":samza-api").sourceSets.test.output - testCompile project(":samza-kv-rocksdb_$scalaVersion") testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.powermock:powermock-api-mockito:$powerMockVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala index 5b0a6e3..f5cd5cf 100644 --- a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala +++ b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala @@ -19,7 +19,8 @@ package org.apache.samza.serializers -import org.apache.samza.SamzaException +import java.nio.ByteBuffer + import org.apache.samza.operators.KV object KVSerde { @@ -27,26 +28,39 @@ object KVSerde { } /** - * A marker serde class to indicate that messages are keyed and should be deserialized as K-V pairs. - * This class is intended for use cases where a single Serde parameter or configuration is required. + * A serde for [[KV]] key-value pairs. + * + * When this serde is used for streams in the High Level API, Samza wires up and uses the provided + * keySerde and valueSerde for the keys and values in the stream separately. I.e., the fromBytes and toBytes + * methods in this class aren't used directly for streams. * * @tparam K type of the key in the message * @tparam V type of the value in the message */ class KVSerde[K, V](keySerde: Serde[K], valueSerde: Serde[V]) extends Serde[KV[K, V]] { - /** - * Implementation Note: This serde must not be used by the framework for serialization/deserialization directly. - * Wire up and use the constituent keySerde and valueSerde instead. - */ - - override def fromBytes(bytes: Array[Byte]): Nothing = { - throw new NotImplementedError("This is a marker serde and must not be used directly. " + - "Samza must wire up and use the keySerde and valueSerde instead.") + override def fromBytes(bytes: Array[Byte]): KV[K, V] = { + val byteBuffer = ByteBuffer.wrap(bytes) + val keyLength = byteBuffer.getInt() + val keyBytes = new Array[Byte](keyLength) + byteBuffer.get(keyBytes) + val valueLength = byteBuffer.getInt() + val valueBytes = new Array[Byte](valueLength) + byteBuffer.get(valueBytes) + val key = keySerde.fromBytes(keyBytes) + val value = valueSerde.fromBytes(valueBytes) + KV.of(key, value) } - override def toBytes(`object`: KV[K, V]): Nothing = { - throw new SamzaException("This is a marker serde and must not be used directly. " + - "Samza must wire up and use the keySerde and valueSerde instead.") + override def toBytes(obj: KV[K, V]): Array[Byte] = { + val keyBytes = keySerde.toBytes(obj.key) + val valueBytes = valueSerde.toBytes(obj.value) + val bytes = new Array[Byte](8 + keyBytes.length + 8 + valueBytes.length) + val byteBuffer = ByteBuffer.wrap(bytes) + byteBuffer.putInt(keyBytes.length) + byteBuffer.put(keyBytes) + byteBuffer.putInt(valueBytes.length) + byteBuffer.put(valueBytes) + byteBuffer.array() } def getKeySerde: Serde[K] = keySerde http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 1120c25..09fb56a 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -25,10 +25,10 @@ import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.util.InternalInMemoryStore; +import org.apache.samza.operators.impl.store.TestInMemoryStore; +import org.apache.samza.operators.impl.store.TimestampedValueSerde; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -63,7 +63,8 @@ public class TestJoinOperator { @Test public void join() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -79,7 +80,8 @@ public class TestJoinOperator { @Test public void joinFnInitAndClose() throws Exception { TestJoinFunction joinFn = new TestJoinFunction(); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(joinFn)); + TestJoinStreamApplication app = new TestJoinStreamApplication(joinFn); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); assertEquals(1, joinFn.getNumInitCalls()); MessageCollector messageCollector = mock(MessageCollector.class); @@ -96,7 +98,8 @@ public class TestJoinOperator { @Test public void joinReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -111,7 +114,8 @@ public class TestJoinOperator { @Test public void joinNoMatch() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -125,7 +129,8 @@ public class TestJoinOperator { @Test public void joinNoMatchReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -139,7 +144,8 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKey() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -156,7 +162,8 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKeyReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -173,7 +180,8 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessages() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -195,7 +203,8 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessagesReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -218,7 +227,8 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessages() throws Exception { TestClock testClock = new TestClock(); - StreamOperatorTask sot = createStreamOperatorTask(testClock, new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(testClock, app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -234,11 +244,11 @@ public class TestJoinOperator { assertTrue(output.isEmpty()); } - @Test public void joinRemovesExpiredMessagesReverse() throws Exception { TestClock testClock = new TestClock(); - StreamOperatorTask sot = createStreamOperatorTask(testClock, new TestJoinStreamApplication(new TestJoinFunction())); + TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(testClock, app); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -265,8 +275,10 @@ public class TestJoinOperator { new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); // need to return different stores for left and right side - when(taskContext.getStore(eq("join-4-L"))).thenReturn(new InternalInMemoryStore<>()); - when(taskContext.getStore(eq("join-4-R"))).thenReturn(new InternalInMemoryStore<>()); + IntegerSerde integerSerde = new IntegerSerde(); + TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde)); + when(taskContext.getStore(eq("join-2-L"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); + when(taskContext.getStore(eq("join-2-R"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); Config config = mock(Config.class); @@ -285,24 +297,22 @@ public class TestJoinOperator { @Override public void init(StreamGraph graph, Config config) { - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - MessageStream<FirstStreamIME> inStream = - graph.getInputStream("instream", kvSerde).map(FirstStreamIME::new); - MessageStream<SecondStreamIME> inStream2 = - graph.getInputStream("instream2", kvSerde).map(SecondStreamIME::new); + IntegerSerde integerSerde = new IntegerSerde(); + KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); + MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde); + MessageStream<KV<Integer, Integer>> inStream2 = graph.getInputStream("instream2", kvSerde); SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); inStream - .join(inStream2, joinFn, - new IntegerSerde(), new JsonSerdeV2<>(FirstStreamIME.class), new JsonSerdeV2<>(SecondStreamIME.class), - JOIN_TTL) + .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL) .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); }); } } - private static class TestJoinFunction implements JoinFunction<Integer, FirstStreamIME, SecondStreamIME, Integer> { + private static class TestJoinFunction + implements JoinFunction<Integer, KV<Integer, Integer>, KV<Integer, Integer>, Integer> { private int numInitCalls = 0; private int numCloseCalls = 0; @@ -313,18 +323,18 @@ public class TestJoinOperator { } @Override - public Integer apply(FirstStreamIME message, SecondStreamIME otherMessage) { - return (Integer) message.getMessage() + (Integer) otherMessage.getMessage(); + public Integer apply(KV<Integer, Integer> message, KV<Integer, Integer> otherMessage) { + return message.value + otherMessage.value; } @Override - public Integer getFirstKey(FirstStreamIME message) { - return (Integer) message.getKey(); + public Integer getFirstKey(KV<Integer, Integer> message) { + return message.key; } @Override - public Integer getSecondKey(SecondStreamIME message) { - return (Integer) message.getKey(); + public Integer getSecondKey(KV<Integer, Integer> message) { + return message.key; } @Override @@ -332,34 +342,24 @@ public class TestJoinOperator { numCloseCalls++; } - public int getNumInitCalls() { + int getNumInitCalls() { return numInitCalls; } - public int getNumCloseCalls() { + int getNumCloseCalls() { return numCloseCalls; } } private static class FirstStreamIME extends IncomingMessageEnvelope { - FirstStreamIME(KV<Integer, Integer> message) { - super(new SystemStreamPartition( - "insystem", "instream", new Partition(0)), "1", message.getKey(), message.getValue()); - } - - FirstStreamIME(Integer key, Integer message) { - this(KV.of(key, message)); + FirstStreamIME(Integer key, Integer value) { + super(new SystemStreamPartition("insystem", "instream", new Partition(0)), "1", key, value); } } private static class SecondStreamIME extends IncomingMessageEnvelope { - SecondStreamIME(KV<Integer, Integer> message) { - super(new SystemStreamPartition( - "insystem2", "instream2", new Partition(0)), "1", message.getKey(), message.getValue()); - } - - SecondStreamIME(Integer key, Integer message) { - this(KV.of(key, message)); + SecondStreamIME(Integer key, Integer value) { + super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, value); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index a759e52..1c14fb4 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -43,14 +43,15 @@ import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.impl.store.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec.OpCode; -import org.apache.samza.operators.util.InternalInMemoryStore; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; @@ -224,8 +225,10 @@ public class TestOperatorImplGraph { TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(new InternalInMemoryStore<>()); - when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(new InternalInMemoryStore<>()); + KeyValueStore mockLeftStore = mock(KeyValueStore.class); + when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(mockLeftStore); + KeyValueStore mockRightStore = mock(KeyValueStore.class); + when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(mockRightStore); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); @@ -245,12 +248,15 @@ public class TestOperatorImplGraph { Object joinKey = new Object(); // verify that left partial join operator calls getFirstKey Object mockLeftMessage = mock(Object.class); + long currentTimeMillis = System.currentTimeMillis(); + when(mockLeftStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockLeftMessage, currentTimeMillis)); when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey); inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage); // verify that right partial join operator calls getSecondKey Object mockRightMessage = mock(Object.class); + when(mockRightStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockRightMessage, currentTimeMillis)); when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey); inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage); http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java new file mode 100644 index 0000000..d16954d --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.store; + +import com.google.common.primitives.UnsignedBytes; +import org.apache.samza.serializers.Serde; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueIterator; +import org.apache.samza.storage.kv.KeyValueStore; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * An in-memory store that supports range queries. + * + * @param <K> the type of keys in the store + * @param <V> the type of values in the store + */ +public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> { + private final ConcurrentSkipListMap<byte[], byte[]> map = + new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator()); + private final Serde<K> keySerde; + private final Serde<V> valSerde; + + public TestInMemoryStore(Serde<K> keySerde, Serde<V> valSerde) { + this.keySerde = keySerde; + this.valSerde = valSerde; + } + + @Override + public V get(K key) { + byte[] keyBytes = keySerde.toBytes(key); + byte[] valBytes = map.get(keyBytes); + if (valBytes == null) return null; + return valSerde.fromBytes(valBytes); + } + + @Override + public Map<K, V> getAll(List<K> keys) { + Map<K, V> results = new HashMap<>(); + for (K key : keys) { + byte[] value = map.get(keySerde.toBytes(key)); + if (value != null) { + results.put(key, valSerde.fromBytes(value)); + } + } + return results; + } + + @Override + public void put(K key, V value) { + map.put(keySerde.toBytes(key), valSerde.toBytes(value)); + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + for (Entry<K, V> entry : entries) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void delete(K key) { + map.remove(keySerde.toBytes(key)); + } + + @Override + public void deleteAll(List<K> keys) { + for (K k : keys) { + delete(k); + } + } + + @Override + public KeyValueIterator<K, V> range(K from, K to) { + ConcurrentNavigableMap<byte[], byte[]> values = map.subMap(keySerde.toBytes(from), keySerde.toBytes(to)); + return new InMemoryIterator(values.entrySet().iterator(), keySerde, valSerde); + } + + @Override + public KeyValueIterator<K, V> all() { + return new InMemoryIterator(map.entrySet().iterator(), keySerde, valSerde); + } + + @Override + public void close() { + + } + + @Override + public void flush() { + + } + + private static class InMemoryIterator<K, V> implements KeyValueIterator<K, V> { + + Iterator<Map.Entry<byte[], byte[]>> wrapped; + Serde<K> keySerializer; + Serde<V> valSerializer; + + public InMemoryIterator(Iterator<Map.Entry<byte[], byte[]>> wrapped, Serde<K> keySerde, Serde<V> valSerde) { + this.wrapped = wrapped; + this.keySerializer = keySerde; + this.valSerializer = valSerde; + } + + @Override + public void close() { + //no op + } + + @Override + public boolean hasNext() { + return wrapped.hasNext(); + } + + @Override + public Entry<K, V> next() { + Map.Entry<byte[], byte[]> next = wrapped.next(); + return new Entry<>(keySerializer.fromBytes(next.getKey()), valSerializer.fromBytes(next.getValue())); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7f1ec649/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java index 7677826..0315a20 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java @@ -19,25 +19,14 @@ */ package org.apache.samza.operators.impl.store; -import com.google.common.io.Files; -import org.apache.samza.config.MapConfig; -import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.ByteSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.ClosableIterator; -import org.apache.samza.storage.kv.KeyValueStoreMetrics; -import org.apache.samza.storage.kv.RocksDbKeyValueStore; -import org.apache.samza.storage.kv.SerializedKeyValueStore; -import org.apache.samza.storage.kv.SerializedKeyValueStoreMetrics; +import org.apache.samza.storage.kv.KeyValueStore; import org.junit.Assert; import org.junit.Test; -import org.rocksdb.CompressionType; -import org.rocksdb.FlushOptions; -import org.rocksdb.Options; -import org.rocksdb.WriteOptions; -import java.io.File; import java.util.ArrayList; import java.util.List; @@ -45,7 +34,7 @@ public class TestTimeSeriesStoreImpl { @Test public void testGetOnTimestampBoundaries() { - TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true); + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new StringSerde("UTF-8"), true); // insert an entry with key "hello" at timestamps "1" and "2" timeSeriesStore.put("hello", "world-1".getBytes(), 1L); @@ -82,7 +71,7 @@ public class TestTimeSeriesStoreImpl { @Test public void testGetWithNonExistentKeys() { - TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true); + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new StringSerde("UTF-8"), true); timeSeriesStore.put("hello", "world-1".getBytes(), 1L); // read from a non-existent key @@ -96,7 +85,7 @@ public class TestTimeSeriesStoreImpl { @Test public void testPutWithMultipleEntries() { - TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true); + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new StringSerde("UTF-8"), true); // insert 100 entries at timestamps "1" and "2" for (int i = 0; i < 100; i++) { @@ -126,7 +115,7 @@ public class TestTimeSeriesStoreImpl { @Test public void testGetOnTimestampBoundariesWithOverwriteMode() { // instantiate a store in overwrite mode - TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false); + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new StringSerde("UTF-8"), false); // insert an entry with key "hello" at timestamps "1" and "2" timeSeriesStore.put("hello", "world-1".getBytes(), 1L); @@ -164,7 +153,7 @@ public class TestTimeSeriesStoreImpl { @Test public void testDeletesInOverwriteMode() { // instantiate a store in overwrite mode - TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false); + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore(new StringSerde("UTF-8"), false); // insert an entry with key "hello" at timestamps "1" and "2" timeSeriesStore.put("hello", "world-1".getBytes(), 1L); @@ -179,7 +168,8 @@ public class TestTimeSeriesStoreImpl { Assert.assertEquals(0, values.size()); } - private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) { + private static <K, V> List<TimestampedValue<V>> readStore( + TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) { List<TimestampedValue<V>> list = new ArrayList<>(); ClosableIterator<TimestampedValue<V>> storeValuesIterator = store.get(key, startTimestamp, endTimestamp); @@ -192,19 +182,9 @@ public class TestTimeSeriesStoreImpl { return list; } - private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(String storeName, Serde<K> keySerde, boolean appendMode) { - RocksDbKeyValueStore rocksKVStore = newRocksDbStore("someStore"); - SerializedKeyValueStore<TimeSeriesKey<K>, byte[]> kvStore = new SerializedKeyValueStore<>(rocksKVStore, - new TimeSeriesKeySerde<>(keySerde), new ByteSerde(), - new SerializedKeyValueStoreMetrics("", new MetricsRegistryMap())); + private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(Serde<K> keySerde, boolean appendMode) { + KeyValueStore<TimeSeriesKey<K>, byte[]> kvStore = + new TestInMemoryStore(new TimeSeriesKeySerde<>(keySerde), new ByteSerde()); return new TimeSeriesStoreImpl<>(kvStore, appendMode); } - - private static RocksDbKeyValueStore newRocksDbStore(String storeName) { - File dir = Files.createTempDir(); - return new RocksDbKeyValueStore(dir, - new Options().setCreateIfMissing(true).setCompressionType(CompressionType.SNAPPY_COMPRESSION), new MapConfig(), - false, storeName, new WriteOptions(), new FlushOptions(), - new KeyValueStoreMetrics(storeName, new MetricsRegistryMap())); - } }
