Updated Branches: refs/heads/master 63cc71c5f -> 2cd18279c
SAMZA-57; fix npe in serialized key value store. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2cd18279 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2cd18279 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2cd18279 Branch: refs/heads/master Commit: 2cd18279c61c92597ad6d2892b1f01b5106ffcb1 Parents: 63cc71c Author: Chris Riccomini <[email protected]> Authored: Tue Oct 15 15:01:07 2013 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Oct 15 15:01:07 2013 -0700 ---------------------------------------------------------------------- .../org/apache/samza/serializers/IntegerSerde.scala | 10 +++++----- .../samza/storage/kv/SerializedKeyValueStore.scala | 5 ++++- .../org/apache/samza/storage/kv/TestKeyValueStores.scala | 11 +++++++++++ 3 files changed, 20 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2cd18279/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala index 3f219c8..46509f7 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala @@ -25,19 +25,19 @@ import org.apache.samza.config.Config /** * A serializer for integers */ -class IntegerSerdeFactory extends SerdeFactory[Integer] { - def getSerde(name: String, config: Config): Serde[Integer] = new IntegerSerde +class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] { + def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new IntegerSerde } -class IntegerSerde extends Serde[Integer] { - def toBytes(obj: Integer): Array[Byte] = if (obj != null) { +class IntegerSerde extends Serde[java.lang.Integer] { + def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) { ByteBuffer.allocate(4).putInt(obj.intValue).array } else { null } // big-endian by default - def fromBytes(bytes: Array[Byte]): Integer = if (bytes != null) { + def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) { ByteBuffer.wrap(bytes).getInt } else { null http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2cd18279/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index 7b4db82..53a5cbe 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -61,7 +61,10 @@ class SerializedKeyValueStore[K, V]( val curr = iter.next val keyBytes = keySerde.toBytes(curr.getKey) val valBytes = msgSerde.toBytes(curr.getValue) - bytesSerialized += keyBytes.size + valBytes.size + bytesSerialized += keyBytes.size + if (valBytes != null) { + bytesSerialized += valBytes.size + } list.add(new Entry(keyBytes, valBytes)) } store.putAll(list) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2cd18279/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index 0be0722..03a189e 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -25,6 +25,7 @@ import java.util.Random import scala.collection.JavaConversions._ +import org.apache.samza.serializers.IntegerSerde import org.iq80.leveldb.Options import org.junit.After import org.junit.Assert.assertEquals @@ -141,6 +142,16 @@ class TestKeyValueStores(cache: Boolean) { vals.foreach(v => assertNull(store.get(v))) } + @Test + def testSerializedValueIsNull { + val serializedStore = new SerializedKeyValueStore( + store, + new IntegerSerde, + new IntegerSerde) + + serializedStore.putAll(List(new Entry[java.lang.Integer, java.lang.Integer](0, null))) + } + def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) { for (v <- vals) { assertTrue(iter.hasNext)
