Updated Branches: refs/heads/master a268b7e0b -> 04d00f5a4
SAMZA-104; Fix NPE in kv store when delete is used with cached store and serde store. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/04d00f5a Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/04d00f5a Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/04d00f5a Branch: refs/heads/master Commit: 04d00f5a4291f54b2e64d555d7ef7f9641011c19 Parents: a268b7e Author: Chris Riccomini <[email protected]> Authored: Tue Dec 10 15:09:33 2013 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Dec 10 15:09:33 2013 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/util/Util.scala | 16 +++-- .../kv/KeyValueStorageEngineFactory.scala | 2 +- .../storage/kv/NullSafeKeyValueStore.scala | 74 ++++++++++++++++++++ .../storage/kv/SerializedKeyValueStore.scala | 70 +++++++----------- .../samza/storage/kv/TestKeyValueStores.scala | 62 ++++++++++------ 5 files changed, 153 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 8386324..6b2ec49 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -102,21 +102,29 @@ object Util extends Logging { } /** - * Returns a SystemStream object based on the system stream name given. For + * Returns a SystemStream object based on the system stream name given. For * example, kafka.topic would return new SystemStream("kafka", "topic"). */ def getSystemStreamFromNames(systemStreamNames: String): SystemStream = { val idx = systemStreamNames.indexOf('.') - if(idx < 0) + if (idx < 0) throw new IllegalArgumentException("No '.' in stream name '" + systemStreamNames + "'. Stream names should be in the form 'system.stream'") new SystemStream(systemStreamNames.substring(0, idx), systemStreamNames.substring(idx + 1, systemStreamNames.length)) } - + /** - * Returns a SystemStream object based on the system stream name given. For + * Returns a SystemStream object based on the system stream name given. For * example, kafka.topic would return new SystemStream("kafka", "topic"). */ def getNameFromSystemStream(systemStream: SystemStream) = { systemStream.getSystem + "." + systemStream.getStream } + + /** + * Makes sure that an object is not null, and throws a NullPointerException + * if it is. + */ + def notNull[T](obj: T, msg: String) = if (obj == null) { + throw new NullPointerException(msg) + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala index d42f46e..ea75fab 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala @@ -73,7 +73,7 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] { } else { serialized } - val db = maybeCachedStore + val db = new NullSafeKeyValueStore(maybeCachedStore) val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry) // Decide if we should use raw bytes when restoring http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala new file mode 100644 index 0000000..00f9af3 --- /dev/null +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + +import org.apache.samza.util.Util.notNull +import scala.collection.JavaConversions._ + +object NullSafeKeyValueStore { + val KEY_ERROR_MSG = "Null is not a valid key." + val VAL_ERROR_MSG = "Null is not a valid value." +} + +class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueStore[K, V] { + import NullSafeKeyValueStore._ + + def get(key: K): V = { + notNull(key, KEY_ERROR_MSG) + store.get(key) + } + + def put(key: K, value: V) { + notNull(key, KEY_ERROR_MSG) + notNull(value, VAL_ERROR_MSG) + store.put(key, value) + } + + def putAll(entries: java.util.List[Entry[K, V]]) { + entries.foreach(entry => { + notNull(entry.getKey, KEY_ERROR_MSG) + notNull(entry.getValue, VAL_ERROR_MSG) + }) + store.putAll(entries) + } + + def delete(key: K) { + notNull(key, KEY_ERROR_MSG) + store.delete(key) + } + + def range(from: K, to: K): KeyValueIterator[K, V] = { + notNull(from, KEY_ERROR_MSG) + notNull(to, KEY_ERROR_MSG) + store.range(from, to) + } + + def all(): KeyValueIterator[K, V] = { + store.all + } + + def flush { + store.flush + } + + def close { + store.close + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index c3bf4dc..2d3b6e5 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -33,57 +33,42 @@ class SerializedKeyValueStore[K, V]( metrics: SerializedKeyValueStoreMetrics = new SerializedKeyValueStoreMetrics) extends KeyValueStore[K, V] with Logging { def get(key: K): V = { - val keyBytes = bytesNotNull(key, keySerde) + val keyBytes = toBytesOrNull(key, keySerde) val found = store.get(keyBytes) metrics.gets.inc - metrics.bytesSerialized.inc(keyBytes.size) - if (found == null) { - null.asInstanceOf[V] - } else { - metrics.bytesDeserialized.inc(found.size) - msgSerde.fromBytes(found).asInstanceOf[V] - } + fromBytesOrNull(found, msgSerde) } def put(key: K, value: V) { metrics.puts.inc - val keyBytes = bytesNotNull(key, keySerde) - val valBytes = bytesNotNull(value, msgSerde) - metrics.bytesSerialized.inc(keyBytes.size + valBytes.size) + val keyBytes = toBytesOrNull(key, keySerde) + val valBytes = toBytesOrNull(value, msgSerde) store.put(keyBytes, valBytes) } def putAll(entries: java.util.List[Entry[K, V]]) { val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size()) val iter = entries.iterator - var bytesSerialized = 0L while (iter.hasNext) { val curr = iter.next - val keyBytes = bytesNotNull(curr.getKey, keySerde) - val valBytes = bytesNotNull(curr.getValue, msgSerde) - bytesSerialized += keyBytes.size - if (valBytes != null) { - bytesSerialized += valBytes.size - } + val keyBytes = toBytesOrNull(curr.getKey, keySerde) + val valBytes = toBytesOrNull(curr.getValue, msgSerde) list.add(new Entry(keyBytes, valBytes)) } store.putAll(list) metrics.puts.inc(list.size) - metrics.bytesSerialized.inc(bytesSerialized) } def delete(key: K) { metrics.deletes.inc - val keyBytes = bytesNotNull(key, keySerde) - metrics.bytesSerialized.inc(keyBytes.size) + val keyBytes = toBytesOrNull(key, keySerde) store.delete(keyBytes) } def range(from: K, to: K): KeyValueIterator[K, V] = { metrics.ranges.inc - val fromBytes = bytesNotNull(from, keySerde) - val toBytes = bytesNotNull(to, keySerde) - metrics.bytesSerialized.inc(fromBytes.size + toBytes.size) + val fromBytes = toBytesOrNull(from, keySerde) + val toBytes = toBytesOrNull(to, keySerde) new DeserializingIterator(store.range(fromBytes, toBytes)) } @@ -98,21 +83,8 @@ class SerializedKeyValueStore[K, V]( def close() = iter.close() def next(): Entry[K, V] = { val nxt = iter.next() - val keyBytes = nxt.getKey - val valBytes = nxt.getValue - val key = if (keyBytes != null) { - metrics.bytesDeserialized.inc(keyBytes.size) - keySerde.fromBytes(keyBytes).asInstanceOf[K] - } else { - warn("Got a null key while iterating over a store. This is highly unexpected, since null in key and value is disallowed for key value stores.") - null.asInstanceOf[K] - } - val value = if (valBytes != null) { - metrics.bytesDeserialized.inc(valBytes.size) - msgSerde.fromBytes(valBytes) - } else { - null.asInstanceOf[V] - } + val key = fromBytesOrNull(nxt.getKey, keySerde) + val value = fromBytesOrNull(nxt.getValue, msgSerde) new Entry(key, value) } } @@ -131,13 +103,19 @@ class SerializedKeyValueStore[K, V]( store.close } - /** - * Null is not allowed for keys and values because some change log systems - * (Kafka) model deletes as null. - */ - private def bytesNotNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t != null) { - serde.toBytes(t) + def toBytesOrNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t == null) { + null + } else { + val bytes = serde.toBytes(t) + metrics.bytesSerialized.inc(bytes.size) + bytes + } + + def fromBytesOrNull[T](bytes: Array[Byte], serde: Serde[T]): T = if (bytes == null) { + null.asInstanceOf[T] } else { - throw new NullPointerException("Null is not a valid key or value.") + val obj = serde.fromBytes(bytes) + metrics.bytesDeserialized.inc(bytes.size) + obj } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index e8db3f2..e99d459 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -34,24 +34,42 @@ import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters import org.apache.samza.serializers.StringSerde import org.apache.samza.util.TestUtil._ +import org.apache.samza.serializers.Serde @RunWith(value = classOf[Parameterized]) -class TestKeyValueStores(cache: Boolean) { - +class TestKeyValueStores(typeOfStore: String) { import TestKeyValueStores._ val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString) val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue)) var store: KeyValueStore[Array[Byte], Array[Byte]] = null + var cache = false + var serde = false @Before def setup() { dir.mkdirs() val leveldb = new LevelDbKeyValueStore(dir, new Options) - if (cache) - store = new CachedStore(leveldb, CacheSize, BatchSize) - else - store = leveldb + val passThroughSerde = new Serde[Array[Byte]] { + def toBytes(obj: Array[Byte]) = obj + def fromBytes(bytes: Array[Byte]) = bytes + } + store = if ("cache".equals(typeOfStore)) { + cache = true + new CachedStore(leveldb, CacheSize, BatchSize) + } else if ("serde".equals(typeOfStore)) { + serde = true + new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde) + } else if ("cache-and-serde".equals(typeOfStore)) { + val serializedStore = new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde) + serde = true + cache = true + new CachedStore(serializedStore, CacheSize, BatchSize) + } else { + leveldb + } + + store = new NullSafeKeyValueStore(store) } @After @@ -82,19 +100,21 @@ class TestKeyValueStores(cache: Boolean) { } @Test - def testNulls() { - val stringSerde = new StringSerde("UTF-8") - val serializedStore = new SerializedKeyValueStore(store, stringSerde, stringSerde) - val expectedNPEMessage = Some("Null is not a valid key or value.") - - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.get(null) } - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.delete(null) } - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.put(null, "") } - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.put("", null) } - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.putAll(List(new Entry("", ""), new Entry[String, String]("", null))) } - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.putAll(List(new Entry[String, String](null, ""))) } - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.range("", null) } - expect(classOf[NullPointerException], expectedNPEMessage) { serializedStore.range(null, "") } + def testNullsWithSerde() { + if (serde) { + val a = b("a") + val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG) + val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG) + + expect(classOf[NullPointerException], keyMsg) { store.get(null) } + expect(classOf[NullPointerException], keyMsg) { store.delete(null) } + expect(classOf[NullPointerException], keyMsg) { store.put(null, a) } + expect(classOf[NullPointerException], valMsg) { store.put(a, null) } + expect(classOf[NullPointerException], valMsg) { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) } + expect(classOf[NullPointerException], keyMsg) { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) } + expect(classOf[NullPointerException], keyMsg) { store.range(a, null) } + expect(classOf[NullPointerException], keyMsg) { store.range(null, a) } + } } @Test @@ -137,7 +157,9 @@ class TestKeyValueStores(cache: Boolean) { @Test def testDelete() { val a = b("a") + assertNull(store.get(a)) store.put(a, a) + assertTrue(Arrays.equals(a, store.get(a))) store.delete(a) assertNull(store.get(a)) } @@ -291,5 +313,5 @@ object TestKeyValueStores { val CacheSize = 10 val BatchSize = 5 @Parameters - def parameters: java.util.Collection[Array[java.lang.Boolean]] = Arrays.asList(Array(java.lang.Boolean.TRUE), Array(java.lang.Boolean.FALSE)) + def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("cache"), Array("serde"), Array("cache-and-serde"), Array("levledb")) }
