Updated Branches:
  refs/heads/master 63cc71c5f -> 2cd18279c

SAMZA-57; fix npe in serialized key value store.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2cd18279
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2cd18279
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2cd18279

Branch: refs/heads/master
Commit: 2cd18279c61c92597ad6d2892b1f01b5106ffcb1
Parents: 63cc71c
Author: Chris Riccomini <[email protected]>
Authored: Tue Oct 15 15:01:07 2013 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Tue Oct 15 15:01:07 2013 -0700

----------------------------------------------------------------------
 .../org/apache/samza/serializers/IntegerSerde.scala      | 10 +++++-----
 .../samza/storage/kv/SerializedKeyValueStore.scala       |  5 ++++-
 .../org/apache/samza/storage/kv/TestKeyValueStores.scala | 11 +++++++++++
 3 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2cd18279/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
index 3f219c8..46509f7 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
@@ -25,19 +25,19 @@ import org.apache.samza.config.Config
 /**
  * A serializer for integers
  */
-class IntegerSerdeFactory extends SerdeFactory[Integer] {
-  def getSerde(name: String, config: Config): Serde[Integer] = new IntegerSerde
+class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] {
+  def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new 
IntegerSerde
 }
 
-class IntegerSerde extends Serde[Integer] {
-  def toBytes(obj: Integer): Array[Byte] = if (obj != null) {
+class IntegerSerde extends Serde[java.lang.Integer] {
+  def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) {
     ByteBuffer.allocate(4).putInt(obj.intValue).array
   } else {
     null
   }
 
   // big-endian by default
-  def fromBytes(bytes: Array[Byte]): Integer = if (bytes != null) {
+  def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) {
     ByteBuffer.wrap(bytes).getInt
   } else {
     null

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2cd18279/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 7b4db82..53a5cbe 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -61,7 +61,10 @@ class SerializedKeyValueStore[K, V](
       val curr = iter.next
       val keyBytes = keySerde.toBytes(curr.getKey)
       val valBytes = msgSerde.toBytes(curr.getValue)
-      bytesSerialized += keyBytes.size + valBytes.size
+      bytesSerialized += keyBytes.size
+      if (valBytes != null) {
+        bytesSerialized += valBytes.size
+      }
       list.add(new Entry(keyBytes, valBytes))
     }
     store.putAll(list)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2cd18279/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 0be0722..03a189e 100644
--- 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -25,6 +25,7 @@ import java.util.Random
 
 import scala.collection.JavaConversions._
 
+import org.apache.samza.serializers.IntegerSerde
 import org.iq80.leveldb.Options
 import org.junit.After
 import org.junit.Assert.assertEquals
@@ -141,6 +142,16 @@ class TestKeyValueStores(cache: Boolean) {
     vals.foreach(v => assertNull(store.get(v)))
   }
 
+  @Test
+  def testSerializedValueIsNull {
+    val serializedStore = new SerializedKeyValueStore(
+      store,
+      new IntegerSerde,
+      new IntegerSerde)
+
+    serializedStore.putAll(List(new Entry[java.lang.Integer, 
java.lang.Integer](0, null)))
+  }
+
   def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], 
Array[Byte]]) {
     for (v <- vals) {
       assertTrue(iter.hasNext)

Reply via email to