Updated Branches:
  refs/heads/master a268b7e0b -> 04d00f5a4

SAMZA-104; Fix NPE in kv store when delete is used with cached store and serde 
store.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/04d00f5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/04d00f5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/04d00f5a

Branch: refs/heads/master
Commit: 04d00f5a4291f54b2e64d555d7ef7f9641011c19
Parents: a268b7e
Author: Chris Riccomini <[email protected]>
Authored: Tue Dec 10 15:09:33 2013 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Tue Dec 10 15:09:33 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/util/Util.scala | 16 +++--
 .../kv/KeyValueStorageEngineFactory.scala       |  2 +-
 .../storage/kv/NullSafeKeyValueStore.scala      | 74 ++++++++++++++++++++
 .../storage/kv/SerializedKeyValueStore.scala    | 70 +++++++-----------
 .../samza/storage/kv/TestKeyValueStores.scala   | 62 ++++++++++------
 5 files changed, 153 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 8386324..6b2ec49 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -102,21 +102,29 @@ object Util extends Logging {
   }
 
   /**
-   * Returns a SystemStream object based on the system stream name given. For 
+   * Returns a SystemStream object based on the system stream name given. For
    * example, kafka.topic would return new SystemStream("kafka", "topic").
    */
   def getSystemStreamFromNames(systemStreamNames: String): SystemStream = {
     val idx = systemStreamNames.indexOf('.')
-    if(idx < 0)
+    if (idx < 0)
       throw new IllegalArgumentException("No '.' in stream name '" + 
systemStreamNames + "'. Stream names should be in the form 'system.stream'")
     new SystemStream(systemStreamNames.substring(0, idx), 
systemStreamNames.substring(idx + 1, systemStreamNames.length))
   }
-  
+
   /**
-   * Returns a SystemStream object based on the system stream name given. For 
+   * Returns a SystemStream object based on the system stream name given. For
    * example, kafka.topic would return new SystemStream("kafka", "topic").
    */
   def getNameFromSystemStream(systemStream: SystemStream) = {
     systemStream.getSystem + "." + systemStream.getStream
   }
+
+  /**
+   * Makes sure that an object is not null, and throws a NullPointerException
+   * if it is.
+   */
+  def notNull[T](obj: T, msg: String) = if (obj == null) {
+    throw new NullPointerException(msg)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
index d42f46e..ea75fab 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
@@ -73,7 +73,7 @@ class KeyValueStorageEngineFactory[K, V] extends 
StorageEngineFactory[K, V] {
     } else {
       serialized
     }
-    val db = maybeCachedStore
+    val db = new NullSafeKeyValueStore(maybeCachedStore)
     val keyValueStorageEngineMetrics = new 
KeyValueStorageEngineMetrics(storeName, registry)
 
     // Decide if we should use raw bytes when restoring

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
new file mode 100644
index 0000000..00f9af3
--- /dev/null
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.util.Util.notNull
+import scala.collection.JavaConversions._
+
+object NullSafeKeyValueStore {
+  val KEY_ERROR_MSG = "Null is not a valid key."
+  val VAL_ERROR_MSG = "Null is not a valid value."
+}
+
+class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends 
KeyValueStore[K, V] {
+  import NullSafeKeyValueStore._
+
+  def get(key: K): V = {
+    notNull(key, KEY_ERROR_MSG)
+    store.get(key)
+  }
+
+  def put(key: K, value: V) {
+    notNull(key, KEY_ERROR_MSG)
+    notNull(value, VAL_ERROR_MSG)
+    store.put(key, value)
+  }
+
+  def putAll(entries: java.util.List[Entry[K, V]]) {
+    entries.foreach(entry => {
+      notNull(entry.getKey, KEY_ERROR_MSG)
+      notNull(entry.getValue, VAL_ERROR_MSG)
+    })
+    store.putAll(entries)
+  }
+
+  def delete(key: K) {
+    notNull(key, KEY_ERROR_MSG)
+    store.delete(key)
+  }
+
+  def range(from: K, to: K): KeyValueIterator[K, V] = {
+    notNull(from, KEY_ERROR_MSG)
+    notNull(to, KEY_ERROR_MSG)
+    store.range(from, to)
+  }
+
+  def all(): KeyValueIterator[K, V] = {
+    store.all
+  }
+
+  def flush {
+    store.flush
+  }
+
+  def close {
+    store.close
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index c3bf4dc..2d3b6e5 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -33,57 +33,42 @@ class SerializedKeyValueStore[K, V](
   metrics: SerializedKeyValueStoreMetrics = new 
SerializedKeyValueStoreMetrics) extends KeyValueStore[K, V] with Logging {
 
   def get(key: K): V = {
-    val keyBytes = bytesNotNull(key, keySerde)
+    val keyBytes = toBytesOrNull(key, keySerde)
     val found = store.get(keyBytes)
     metrics.gets.inc
-    metrics.bytesSerialized.inc(keyBytes.size)
-    if (found == null) {
-      null.asInstanceOf[V]
-    } else {
-      metrics.bytesDeserialized.inc(found.size)
-      msgSerde.fromBytes(found).asInstanceOf[V]
-    }
+    fromBytesOrNull(found, msgSerde)
   }
 
   def put(key: K, value: V) {
     metrics.puts.inc
-    val keyBytes = bytesNotNull(key, keySerde)
-    val valBytes = bytesNotNull(value, msgSerde)
-    metrics.bytesSerialized.inc(keyBytes.size + valBytes.size)
+    val keyBytes = toBytesOrNull(key, keySerde)
+    val valBytes = toBytesOrNull(value, msgSerde)
     store.put(keyBytes, valBytes)
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
     val list = new java.util.ArrayList[Entry[Array[Byte], 
Array[Byte]]](entries.size())
     val iter = entries.iterator
-    var bytesSerialized = 0L
     while (iter.hasNext) {
       val curr = iter.next
-      val keyBytes = bytesNotNull(curr.getKey, keySerde)
-      val valBytes = bytesNotNull(curr.getValue, msgSerde)
-      bytesSerialized += keyBytes.size
-      if (valBytes != null) {
-        bytesSerialized += valBytes.size
-      }
+      val keyBytes = toBytesOrNull(curr.getKey, keySerde)
+      val valBytes = toBytesOrNull(curr.getValue, msgSerde)
       list.add(new Entry(keyBytes, valBytes))
     }
     store.putAll(list)
     metrics.puts.inc(list.size)
-    metrics.bytesSerialized.inc(bytesSerialized)
   }
 
   def delete(key: K) {
     metrics.deletes.inc
-    val keyBytes = bytesNotNull(key, keySerde)
-    metrics.bytesSerialized.inc(keyBytes.size)
+    val keyBytes = toBytesOrNull(key, keySerde)
     store.delete(keyBytes)
   }
 
   def range(from: K, to: K): KeyValueIterator[K, V] = {
     metrics.ranges.inc
-    val fromBytes = bytesNotNull(from, keySerde)
-    val toBytes = bytesNotNull(to, keySerde)
-    metrics.bytesSerialized.inc(fromBytes.size + toBytes.size)
+    val fromBytes = toBytesOrNull(from, keySerde)
+    val toBytes = toBytesOrNull(to, keySerde)
     new DeserializingIterator(store.range(fromBytes, toBytes))
   }
 
@@ -98,21 +83,8 @@ class SerializedKeyValueStore[K, V](
     def close() = iter.close()
     def next(): Entry[K, V] = {
       val nxt = iter.next()
-      val keyBytes = nxt.getKey
-      val valBytes = nxt.getValue
-      val key = if (keyBytes != null) {
-        metrics.bytesDeserialized.inc(keyBytes.size)
-        keySerde.fromBytes(keyBytes).asInstanceOf[K]
-      } else {
-        warn("Got a null key while iterating over a store. This is highly 
unexpected, since null in key and value is disallowed for key value stores.")
-        null.asInstanceOf[K]
-      }
-      val value = if (valBytes != null) {
-        metrics.bytesDeserialized.inc(valBytes.size)
-        msgSerde.fromBytes(valBytes)
-      } else {
-        null.asInstanceOf[V]
-      }
+      val key = fromBytesOrNull(nxt.getKey, keySerde)
+      val value = fromBytesOrNull(nxt.getValue, msgSerde)
       new Entry(key, value)
     }
   }
@@ -131,13 +103,19 @@ class SerializedKeyValueStore[K, V](
     store.close
   }
 
-  /**
-   * Null is not allowed for keys and values because some change log systems
-   * (Kafka) model deletes as null.
-   */
-  private def bytesNotNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t != 
null) {
-    serde.toBytes(t)
+  def toBytesOrNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t == null) {
+    null
+  } else {
+    val bytes = serde.toBytes(t)
+    metrics.bytesSerialized.inc(bytes.size)
+    bytes
+  }
+
+  def fromBytesOrNull[T](bytes: Array[Byte], serde: Serde[T]): T = if (bytes 
== null) {
+    null.asInstanceOf[T]
   } else {
-    throw new NullPointerException("Null is not a valid key or value.")
+    val obj = serde.fromBytes(bytes)
+    metrics.bytesDeserialized.inc(bytes.size)
+    obj
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/04d00f5a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index e8db3f2..e99d459 100644
--- 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -34,24 +34,42 @@ import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
 import org.apache.samza.serializers.StringSerde
 import org.apache.samza.util.TestUtil._
+import org.apache.samza.serializers.Serde
 
 @RunWith(value = classOf[Parameterized])
-class TestKeyValueStores(cache: Boolean) {
-
+class TestKeyValueStores(typeOfStore: String) {
   import TestKeyValueStores._
 
   val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
   val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + 
new Random().nextInt(Int.MaxValue))
   var store: KeyValueStore[Array[Byte], Array[Byte]] = null
+  var cache = false
+  var serde = false
 
   @Before
   def setup() {
     dir.mkdirs()
     val leveldb = new LevelDbKeyValueStore(dir, new Options)
-    if (cache)
-      store = new CachedStore(leveldb, CacheSize, BatchSize)
-    else
-      store = leveldb
+    val passThroughSerde = new Serde[Array[Byte]] {
+      def toBytes(obj: Array[Byte]) = obj
+      def fromBytes(bytes: Array[Byte]) = bytes
+    }
+    store = if ("cache".equals(typeOfStore)) {
+      cache = true
+      new CachedStore(leveldb, CacheSize, BatchSize)
+    } else if ("serde".equals(typeOfStore)) {
+      serde = true
+      new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde)
+    } else if ("cache-and-serde".equals(typeOfStore)) {
+      val serializedStore = new SerializedKeyValueStore(leveldb, 
passThroughSerde, passThroughSerde)
+      serde = true
+      cache = true
+      new CachedStore(serializedStore, CacheSize, BatchSize)
+    } else {
+      leveldb
+    }
+
+    store = new NullSafeKeyValueStore(store)
   }
 
   @After
@@ -82,19 +100,21 @@ class TestKeyValueStores(cache: Boolean) {
   }
 
   @Test
-  def testNulls() {
-    val stringSerde = new StringSerde("UTF-8")
-    val serializedStore = new SerializedKeyValueStore(store, stringSerde, 
stringSerde)
-    val expectedNPEMessage = Some("Null is not a valid key or value.")
-
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.get(null) }
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.delete(null) }
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.put(null, "") }
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.put("", null) }
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.putAll(List(new Entry("", ""), new Entry[String, String]("", 
null))) }
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.putAll(List(new Entry[String, String](null, ""))) }
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.range("", null) }
-    expect(classOf[NullPointerException], expectedNPEMessage) { 
serializedStore.range(null, "") }
+  def testNullsWithSerde() {
+    if (serde) {
+      val a = b("a")
+      val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
+      val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
+
+      expect(classOf[NullPointerException], keyMsg) { store.get(null) }
+      expect(classOf[NullPointerException], keyMsg) { store.delete(null) }
+      expect(classOf[NullPointerException], keyMsg) { store.put(null, a) }
+      expect(classOf[NullPointerException], valMsg) { store.put(a, null) }
+      expect(classOf[NullPointerException], valMsg) { store.putAll(List(new 
Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
+      expect(classOf[NullPointerException], keyMsg) { store.putAll(List(new 
Entry[Array[Byte], Array[Byte]](null, a))) }
+      expect(classOf[NullPointerException], keyMsg) { store.range(a, null) }
+      expect(classOf[NullPointerException], keyMsg) { store.range(null, a) }
+    }
   }
 
   @Test
@@ -137,7 +157,9 @@ class TestKeyValueStores(cache: Boolean) {
   @Test
   def testDelete() {
     val a = b("a")
+    assertNull(store.get(a))
     store.put(a, a)
+    assertTrue(Arrays.equals(a, store.get(a)))
     store.delete(a)
     assertNull(store.get(a))
   }
@@ -291,5 +313,5 @@ object TestKeyValueStores {
   val CacheSize = 10
   val BatchSize = 5
   @Parameters
-  def parameters: java.util.Collection[Array[java.lang.Boolean]] = 
Arrays.asList(Array(java.lang.Boolean.TRUE), Array(java.lang.Boolean.FALSE))
+  def parameters: java.util.Collection[Array[String]] = 
Arrays.asList(Array("cache"), Array("serde"), Array("cache-and-serde"), 
Array("levledb"))
 }

Reply via email to