Repository: samza Updated Branches: refs/heads/master f16ba2692 -> 56d564c60
SAMZA-1423; Implement time series storage for joins and windows Notable changes: * New interface for storing and retrieving time series data. * New store and serde implementation for use in windows and joins Pending: * Documentation, and minor clean-ups * Wire-up of stores from ExecutionPlanner * Usage of the store to implement various windows and joins Author: Jagadish <[email protected]> Author: Jagadish <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>,Xinyu Liu<[email protected]> Closes #303 from vjagadish1989/window-store Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/56d564c6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/56d564c6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/56d564c6 Branch: refs/heads/master Commit: 56d564c604c810ce7ea549e7dd4581588f8c47b5 Parents: f16ba26 Author: Jagadish <[email protected]> Authored: Mon Oct 2 12:23:24 2017 -0700 Committer: Jagadish <[email protected]> Committed: Mon Oct 2 12:23:24 2017 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../samza/storage/kv/ClosableIterator.java | 40 ++++ .../operators/impl/store/TimeSeriesKey.java | 80 +++++++ .../impl/store/TimeSeriesKeySerde.java | 96 +++++++++ .../operators/impl/store/TimeSeriesStore.java | 80 +++++++ .../impl/store/TimeSeriesStoreImpl.java | 195 +++++++++++++++++ .../operators/impl/store/TimestampedValue.java | 61 ++++++ .../impl/store/TestTimeSeriesKeySerde.java | 71 +++++++ .../impl/store/TestTimeSeriesStoreImpl.java | 210 +++++++++++++++++++ 9 files changed, 834 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 16091ae..b10241a 100644 --- a/build.gradle +++ b/build.gradle @@ -178,6 +178,7 @@ project(":samza-core_$scalaVersion") { compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" testCompile project(":samza-api").sourceSets.test.output + testCompile project(":samza-kv-rocksdb_$scalaVersion") testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.powermock:powermock-api-mockito:$powerMockVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java new file mode 100644 index 0000000..cc215e6 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +import java.util.Iterator; + +/** + * An iterator that must be closed. + * + * <p> + * Implement close to free resources assigned to the iterator such as open file handles, persistent state etc. + * + * @param <V> the type of value returned by this iterator + */ +public interface ClosableIterator<V> extends Iterator<V> { + + /** + * Closes this iterator and frees resources assigned to it. + * + * It is illegal to invoke {@link #next()} and {@link #hasNext()} after an iterator has been closed. + */ + public void close(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java new file mode 100644 index 0000000..4ed73aa --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.operators.impl.store; + +/** + * The store key used in the {@link TimeSeriesStore} to uniquely identify a row. + */ +public class TimeSeriesKey<K> { + + // version for backwards compatibility + private static final byte VERSION = 0x00; + private final K key; + private final long timestamp; + + private final long seqNum; + + public TimeSeriesKey(K k, long time, long seq) { + key = k; + timestamp = time; + seqNum = seq; + } + + public K getKey() { + return key; + } + + public long getTimestamp() { + return timestamp; + } + + public byte getVersion() { + return VERSION; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimeSeriesKey<?> that = (TimeSeriesKey<?>) o; + + if (timestamp != that.timestamp) return false; + if (seqNum != that.seqNum) return false; + return key != null ? key.equals(that.key) : that.key == null; + } + + @Override + public int hashCode() { + int result = key != null ? key.hashCode() : 0; + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + (int) (seqNum ^ (seqNum >>> 32)); + return result; + } + + public long getSeqNum() { + return seqNum; + } + + @Override + public String toString() { + return String.format("TimeSeriesKey {key: %s timestamp: %s seqNum: %s}", key, timestamp, seqNum); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java new file mode 100644 index 0000000..273c40a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.operators.impl.store; + +import org.apache.samza.SamzaException; +import org.apache.samza.serializers.Serde; + +import java.nio.ByteBuffer; + +/** + * A {@link Serde} for {@link TimeSeriesKey}s. + * + * <p> + * This wraps the actual key's serde with serializers for timestamp, version number and sequence number. + * + * A {@link TimeSeriesKeySerde} serializes a key as follows: + * +-------------------------+------------------+----------------+------------------+ + * | serialized-key bytes | timestamp | version (0) | seqNum | + * |(serialized by keySerde) | | | | + * +-------------------------+------------------+----------------+------------------+ + * +---serialized key len----+-------8 bytes----+---1 byte-------+---7 bytes---------+ + * + * @param <K> the type of the wrapped key + */ +public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> { + + private static final long SEQUENCE_NUM_MASK = 0x00ffffffffffffffL; + private static final int TIMESTAMP_SIZE = 8; + private static final int SEQNUM_SIZE = 8; + + private final Serde<K> keySerde; + + public TimeSeriesKeySerde(Serde<K> keySerde) { + this.keySerde = keySerde; + } + + @Override + public byte[] toBytes(TimeSeriesKey<K> timeSeriesKey) { + K key = timeSeriesKey.getKey(); + long timestamp = timeSeriesKey.getTimestamp(); + long seqNum = timeSeriesKey.getSeqNum(); + + byte[] serializedKey = keySerde.toBytes(key); + int keySize = serializedKey == null ? 0 : serializedKey.length; + + // append the timestamp and sequence number to the serialized key bytes + ByteBuffer buf = ByteBuffer.allocate(keySize + TIMESTAMP_SIZE + SEQNUM_SIZE); + if (serializedKey != null) { + buf.put(serializedKey); + } + buf.putLong(timestamp); + buf.putLong(seqNum & SEQUENCE_NUM_MASK); + + return buf.array(); + } + + @Override + public TimeSeriesKey<K> fromBytes(byte[] timeSeriesKeyBytes) { + // First obtain the key bytes, and deserialize them. Later de-serialize the timestamp and sequence number + ByteBuffer buf = ByteBuffer.wrap(timeSeriesKeyBytes); + int keySize = timeSeriesKeyBytes.length - TIMESTAMP_SIZE - SEQNUM_SIZE; + K key = null; + + if (keySize != 0) { + byte[] keyBytes = new byte[keySize]; + buf.get(keyBytes); + key = keySerde.fromBytes(keyBytes); + } + + long timeStamp = buf.getLong(); + long seqNum = buf.getLong(); + long version = seqNum & ~SEQUENCE_NUM_MASK; + + if (version != 0) { + throw new SamzaException("Version is not zero. Sequence number: " + seqNum); + } + return new TimeSeriesKey(key, timeStamp, seqNum); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java new file mode 100644 index 0000000..e544e2e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.operators.impl.store; + +import org.apache.samza.storage.kv.ClosableIterator; + +/** + * A key-value store that allows entries to be queried and stored based on time ranges. + * + * Operations on the store can be invoked from multiple threads. Hence, implementations are expected to be thread-safe. + * + * @param <K> the type of key in the store + * @param <V> the type of value in the store + */ +public interface TimeSeriesStore<K, V> { + + /** + * Insert a key and the value in the store with the provided timestamp. + * + * @param key the key to insert + * @param val the value to insert + * @param timestamp the timestamp in milliseconds + */ + void put(K key, V val, long timestamp); + + /** + * Returns an iterator over values for the given key in the provided time-range - [{@code startTimestamp}, {@code endTimestamp}) + * + * Values returned by the iterator are ordered by their timestamp. Values with the same timestamp are + * returned in their order of insertion. + * + * <p> The iterator <b>must</b> be closed after use by calling {@link #close}. Not doing so will result in memory leaks. + * + * @param key the key to look up in the store + * @param startTimestamp the start timestamp of the range, inclusive + * @param endTimestamp the end timestamp of the range, exclusive + * @throws IllegalArgumentException when startTimeStamp > endTimestamp, or when either of them is negative + */ + ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp); + + /** + * Removes all values for this key in the given time-range. + * + * @param key the key to look up in the store + * @param startTimestamp the start timestamp of the range, inclusive + * @param endTimeStamp the end timestamp of the range, exclusive + * @throws IllegalArgumentException when startTimeStamp > endTimeStamp, or when either of them is negative + */ + void remove(K key, long startTimestamp, long endTimeStamp); + + /** + * Flushes this time series store, if applicable. + */ + void flush(); + + /** + * Closes this store. + * + * Use this to perform final clean-ups, release acquired resources etc. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java new file mode 100644 index 0000000..5e35219 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.operators.impl.store; + +import org.apache.samza.storage.kv.ClosableIterator; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueIterator; +import org.apache.samza.storage.kv.KeyValueStore; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Provides a view on top of a {@link KeyValueStore} that allows retrieval of entries by time ranges. + * + * <p> + * A {@link TimeSeriesStoreImpl} can be backed by persistent stores like rocksDB, in-memory stores, change-logged + * stores, cached stores (or any combination of these). + * + * <p> + * Range iterators in the store return values in the order of their timestamp. Within the same key and timestamp, + * values are returned in their order of insertion. + * + * <p> + * This store has two modes of operation depending on how duplicates are handled: + * <ol> + * <li> + * Overwrite Mode: In this mode, the store only retains the most recent value for a given key and timestamp. I.e.,Calling + * {@link #put} on an existing key and timestamp will overwrite the previously stored value for that key. + * </li> + * <li> + * Append Mode: In this mode, the store retains all previous values for a given key and timestamp. I.e., Calling {@link #put} + * with an existing key and timestamp will append the value to the list. + * </li> + * </ol> + * <p> + * Implementation Notes: + * + * Data is serialized and organized into K-V pairs as follows: + * <pre> + * +-----------------------+------------------+------------+------------------------+-----------------------+ + * | serialized-key bytes | timestamp | version | sequence number | serialized value | + * | | | | | | + * +-----------------------+------------------+------------+------------------------+-----------------------+ + * +----------------------+--------8 bytes----+----1 bytes-+---------7 bytes--------+----value size---------- + * +----------------------------------STORE KEY-------------------------------------+---STORE VAL-----------+ + * </pre> + * An 8 byte timestamp, a one byte version and a 7 byte sequence number are appended to the provided key and this + * combination is used as the key in the k-v store. The provided value is stored as is. + * + * <p> This class is thread-safe and concurrent reads/writes are expected. + * + * @param <K> the type of key in the store + * @param <V> the type of value in the store + */ +public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> { + + private final KeyValueStore<TimeSeriesKey<K>, V> kvStore; + + /** + * Since timestamps are at the granularity of milliseconds, multiple entries added in the same + * millisecond are distinguished by a monotonically increasing sequence number. + */ + private final AtomicLong seqNum = new AtomicLong(); + private final boolean appendMode; + + /** + * Creates a {@link TimeSeriesStoreImpl} + * + * @param kvStore the backing kv store to use + * @param appendMode should the store be used in appendMode + */ + public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> kvStore, boolean appendMode) { + this.kvStore = kvStore; + this.appendMode = appendMode; + } + + /** + * Creates a {@link TimeSeriesStoreImpl} in append mode. + * + * @param kvStore the backing kv store to use + */ + public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> kvStore) { + this(kvStore, true); + } + + @Override + public void put(K key, V val, long timestamp) { + // For append mode, values are differentiated by an unique sequence number. For overwrite mode, the sequence + // number is always zero. This ensures that only the most recent value is retained. + if (appendMode) { + seqNum.getAndIncrement(); + } + TimeSeriesKey<K> timeSeriesKey = new TimeSeriesKey<>(key, timestamp, seqNum.get()); + kvStore.put(timeSeriesKey, val); + } + + @Override + public ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp) { + validateRange(startTimestamp, endTimestamp); + TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0); + TimeSeriesKey<K> toKey = new TimeSeriesKey(key, endTimestamp, 0); + + KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey); + return new TimeSeriesStoreIterator<>(range); + } + + @Override + public void remove(K key, long startTimestamp, long endTimeStamp) { + validateRange(startTimestamp, endTimeStamp); + TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0); + TimeSeriesKey<K> toKey = new TimeSeriesKey(key, endTimeStamp, 0); + + List<TimeSeriesKey<K>> keysToDelete = new LinkedList<>(); + + KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey); + while (range.hasNext()) { + keysToDelete.add(range.next().getKey()); + } + + kvStore.deleteAll(keysToDelete); + } + + @Override + public void flush() { + kvStore.flush(); + } + + @Override + public void close() { + kvStore.close(); + } + + private void validateRange(long startTimestamp, long endTimestamp) throws IllegalArgumentException { + if (startTimestamp < 0) { + throw new IllegalArgumentException(String.format("Start timestamp :%d is less than zero", startTimestamp)); + } + + if (endTimestamp < 0) { + throw new IllegalArgumentException(String.format("End timestamp :%d is less than zero", endTimestamp)); + } + + if (endTimestamp < startTimestamp) { + throw new IllegalArgumentException(String.format("End timestamp :%d is less than start timestamp: %d", endTimestamp, startTimestamp)); + } + } + + private static class TimeSeriesStoreIterator<K, V> implements ClosableIterator<TimestampedValue<V>> { + + private final KeyValueIterator<TimeSeriesKey<K>, V> wrappedIterator; + + public TimeSeriesStoreIterator(KeyValueIterator<TimeSeriesKey<K>, V> wrappedIterator) { + this.wrappedIterator = wrappedIterator; + } + + @Override + public void close() { + wrappedIterator.close(); + } + + @Override + public boolean hasNext() { + return wrappedIterator.hasNext(); + } + + @Override + public TimestampedValue<V> next() { + Entry<TimeSeriesKey<K>, V> next = wrappedIterator.next(); + return new TimestampedValue<>(next.getValue(), next.getKey().getTimestamp()); + } + + @Override + public void remove() { + wrappedIterator.remove(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java new file mode 100644 index 0000000..ad5e844 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.operators.impl.store; + +/** + * An immutable pair of a value, and its corresponding timestamp. + * + * @param <V> the type of the value + */ +public class TimestampedValue<V> { + private final V value; + private final Long timestamp; + + public TimestampedValue(V v, Long time) { + value = v; + timestamp = time; + } + + public V getValue() { + return value; + } + + public Long getTimestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !getClass().equals(o.getClass())) return false; + + TimestampedValue<?> that = (TimestampedValue<?>) o; + + if (value != null ? !value.equals(that.value) : that.value != null) return false; + return timestamp.equals(that.timestamp); + } + + @Override + public int hashCode() { + int result = value != null ? value.hashCode() : 0; + result = 31 * result + timestamp.hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java new file mode 100644 index 0000000..3c1df9c --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.operators.impl.store; + +import org.apache.samza.serializers.LongSerde; +import org.apache.samza.serializers.StringSerde; +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; + +public class TestTimeSeriesKeySerde { + + @Test + public void testStringTimeSeriesKey() { + TimeSeriesKey<String> storeKey = new TimeSeriesKey<>("test", 1, 23); + TimeSeriesKeySerde<String> serde = new TimeSeriesKeySerde<>(new StringSerde("UTF-8")); + + byte[] serializedBytes = serde.toBytes(storeKey); + TimeSeriesKey<String> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes); + + assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey()); + assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum()); + assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp()); + assertEquals(storeKey, deserializedTimeSeriesKey); + } + + @Test + public void testNullTimeSeriesKey() { + TimeSeriesKey<String> storeKey = new TimeSeriesKey<>(null, 1, 23); + TimeSeriesKeySerde<String> serde = new TimeSeriesKeySerde<>(new StringSerde("UTF-8")); + byte[] serializedBytes = serde.toBytes(storeKey); + TimeSeriesKey<String> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes); + + assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey()); + assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum()); + assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp()); + + assertEquals(storeKey, deserializedTimeSeriesKey); + } + + @Test + public void testLongTimeSeriesKey() { + TimeSeriesKey<Long> storeKey = new TimeSeriesKey<>(30L, 1, 23); + TimeSeriesKeySerde<Long> serde = new TimeSeriesKeySerde<>(new LongSerde()); + byte[] serializedBytes = serde.toBytes(storeKey); + TimeSeriesKey<Long> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes); + + assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey()); + assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum()); + assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp()); + + assertEquals(storeKey, deserializedTimeSeriesKey); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java new file mode 100644 index 0000000..62304f3 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.operators.impl.store; + +import com.google.common.io.Files; +import org.apache.samza.config.MapConfig; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.serializers.ByteSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.kv.ClosableIterator; +import org.apache.samza.storage.kv.KeyValueStoreMetrics; +import org.apache.samza.storage.kv.RocksDbKeyValueStore; +import org.apache.samza.storage.kv.SerializedKeyValueStore; +import org.apache.samza.storage.kv.SerializedKeyValueStoreMetrics; +import org.junit.Assert; +import org.junit.Test; +import org.rocksdb.CompressionType; +import org.rocksdb.FlushOptions; +import org.rocksdb.Options; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class TestTimeSeriesStoreImpl { + + @Test + public void testGetOnTimestampBoundaries() { + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true); + + // insert an entry with key "hello" at timestamps "1" and "2" + timeSeriesStore.put("hello", "world-1".getBytes(), 1L); + timeSeriesStore.put("hello", "world-1".getBytes(), 2L); + timeSeriesStore.put("hello", "world-2".getBytes(), 2L); + + // read from time-range + List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L); + Assert.assertEquals(values.size(), 0); + + // read from time-range [1,2) should return one entry + values = readStore(timeSeriesStore, "hello", 1L, 2L); + Assert.assertEquals(values.size(), 1); + Assert.assertEquals(new String(values.get(0).getValue()), "world-1"); + + // read from time-range [2,3) should return two entries + values = readStore(timeSeriesStore, "hello", 2L, 3L); + Assert.assertEquals(values.size(), 2); + Assert.assertEquals(new String(values.get(0).getValue()), "world-1"); + Assert.assertEquals(values.get(0).getTimestamp(), new Long(2)); + + // read from time-range [0,3) should return three entries + values = readStore(timeSeriesStore, "hello", 0L, 3L); + Assert.assertEquals(values.size(), 3); + + // read from time-range [2,999999) should return two entries + values = readStore(timeSeriesStore, "hello", 2L, 999999L); + Assert.assertEquals(values.size(), 2); + + // read from time-range [3,4) should return no entries + values = readStore(timeSeriesStore, "hello", 3L, 4L); + Assert.assertEquals(values.size(), 0); + } + + @Test + public void testGetWithNonExistentKeys() { + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true); + timeSeriesStore.put("hello", "world-1".getBytes(), 1L); + + // read from a non-existent key + List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "non-existent-key", 0, Integer.MAX_VALUE); + Assert.assertEquals(values.size(), 0); + + // read from an existing key but out of range timestamp + values = readStore(timeSeriesStore, "hello", 2, Integer.MAX_VALUE); + Assert.assertEquals(values.size(), 0); + } + + @Test + public void testPutWithMultipleEntries() { + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true); + + // insert 100 entries at timestamps "1" and "2" + for (int i = 0; i < 100; i++) { + timeSeriesStore.put("hello", "world-1".getBytes(), 1L); + timeSeriesStore.put("hello", "world-2".getBytes(), 2L); + } + + // read from time-range [0,2) should return 100 entries + List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 2L); + Assert.assertEquals(values.size(), 100); + values.forEach(timeSeriesValue -> { + Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-1"); + }); + + // read from time-range [2,4) should return 100 entries + values = readStore(timeSeriesStore, "hello", 2L, 4L); + Assert.assertEquals(values.size(), 100); + values.forEach(timeSeriesValue -> { + Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-2"); + }); + + // read all entries in the store + values = readStore(timeSeriesStore, "hello", 0L, Integer.MAX_VALUE); + Assert.assertEquals(values.size(), 200); + } + + @Test + public void testGetOnTimestampBoundariesWithOverwriteMode() { + // instantiate a store in overwrite mode + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false); + + // insert an entry with key "hello" at timestamps "1" and "2" + timeSeriesStore.put("hello", "world-1".getBytes(), 1L); + timeSeriesStore.put("hello", "world-1".getBytes(), 2L); + timeSeriesStore.put("hello", "world-2".getBytes(), 2L); + + // read from time-range + List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L); + Assert.assertEquals(values.size(), 0); + + // read from time-range [1,2) should return one entry + values = readStore(timeSeriesStore, "hello", 1L, 2L); + Assert.assertEquals(values.size(), 1); + Assert.assertEquals(new String(values.get(0).getValue()), "world-1"); + + // read from time-range [2,3) should return the most recent entry + values = readStore(timeSeriesStore, "hello", 2L, 3L); + Assert.assertEquals(values.size(), 1); + Assert.assertEquals(new String(values.get(0).getValue()), "world-2"); + Assert.assertEquals(values.get(0).getTimestamp(), new Long(2)); + + // read from time-range [0,3) should return two entries + values = readStore(timeSeriesStore, "hello", 0L, 3L); + Assert.assertEquals(values.size(), 2); + + // read from time-range [2,999999) should return one entry + values = readStore(timeSeriesStore, "hello", 2L, 999999L); + Assert.assertEquals(values.size(), 1); + + // read from time-range [3,4) should return no entries + values = readStore(timeSeriesStore, "hello", 3L, 4L); + Assert.assertEquals(values.size(), 0); + } + + @Test + public void testDeletesInOverwriteMode() { + // instantiate a store in overwrite mode + TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false); + + // insert an entry with key "hello" at timestamps "1" and "2" + timeSeriesStore.put("hello", "world-1".getBytes(), 1L); + timeSeriesStore.put("hello", "world-1".getBytes(), 2L); + timeSeriesStore.put("hello", "world-2".getBytes(), 2L); + + List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 1L, 3L); + Assert.assertEquals(values.size(), 2); + + timeSeriesStore.remove("hello", 0L, 3L); + values = readStore(timeSeriesStore, "hello", 1L, 3L); + Assert.assertEquals(values.size(), 0); + } + + private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) { + List<TimestampedValue<V>> list = new ArrayList<>(); + ClosableIterator<TimestampedValue<V>> storeValuesIterator = store.get(key, startTimestamp, endTimestamp); + + while (storeValuesIterator.hasNext()) { + TimestampedValue<V> next = storeValuesIterator.next(); + list.add(next); + } + + storeValuesIterator.close(); + return list; + } + + private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(String storeName, Serde<K> keySerde, boolean appendMode) { + RocksDbKeyValueStore rocksKVStore = newRocksDbStore("someStore"); + SerializedKeyValueStore<TimeSeriesKey<K>, byte[]> kvStore = new SerializedKeyValueStore<>(rocksKVStore, + new TimeSeriesKeySerde<>(keySerde), new ByteSerde(), + new SerializedKeyValueStoreMetrics("", new MetricsRegistryMap())); + return new TimeSeriesStoreImpl<>(kvStore, appendMode); + } + + private static RocksDbKeyValueStore newRocksDbStore(String storeName) { + File dir = Files.createTempDir(); + return new RocksDbKeyValueStore(dir, + new Options().setCreateIfMissing(true).setCompressionType(CompressionType.SNAPPY_COMPRESSION), new MapConfig(), + false, storeName, new WriteOptions(), new FlushOptions(), + new KeyValueStoreMetrics(storeName, new MetricsRegistryMap())); + } +}
