Repository: incubator-samza Updated Branches: refs/heads/master 55929095f -> 12594fb71
SAMZA-173; fix NPE when changelog restore includes a message with a null value Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/12594fb7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/12594fb7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/12594fb7 Branch: refs/heads/master Commit: 12594fb710260b62f9a21a8be785bd8dd5dcdd01 Parents: 5592909 Author: Chris Riccomini <[email protected]> Authored: Wed Mar 5 15:55:16 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Wed Mar 5 15:55:16 2014 -0800 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemConsumer.scala | 2 +- .../storage/kv/KeyValueStorageEngine.scala | 6 ++- .../test/integration/TestStatefulTask.scala | 43 ++++++++++++++------ 3 files changed, 37 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index eb48aa3..afbd7cd 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -169,7 +169,7 @@ private[kafka] class KafkaSystemConsumer( } else { null } - val message = if (msg.message.buffer != null) { + val message = if (!msg.message.isNull) { deserializer.fromBytes(Utils.readBytes(msg.message.payload)) } else { null http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index fc22383..f42ea02 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -91,7 +91,11 @@ class KeyValueStorageEngine[K, V]( batch.clear() } - metrics.restoredBytes.inc(keyBytes.size + valBytes.size) + if (valBytes != null) { + metrics.restoredBytes.inc(valBytes.size) + } + + metrics.restoredBytes.inc(keyBytes.size) metrics.restoredMessages.inc count += 1 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/12594fb7/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 493c984..7e81387 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -250,15 +250,19 @@ class TestStatefulTask { send(task, "2") send(task, "3") send(task, "2") + send(task, "99") + send(task, "-99") // Validate that messages appear in store stream. - val messages = readAll(STATE_TOPIC, 3, "testShouldStartTaskForFirstTime") + val messages = readAll(STATE_TOPIC, 5, "testShouldStartTaskForFirstTime") - assertEquals(4, messages.length) + assertEquals(6, messages.length) assertEquals("1", messages(0)) assertEquals("2", messages(1)) assertEquals("3", messages(2)) assertEquals("2", messages(3)) + assertEquals("99", messages(4)) + assertEquals(null, messages(5)) stopJob(job) } @@ -292,23 +296,27 @@ class TestStatefulTask { send(task, "5") // Validate that messages appear in store stream. - val messages = readAll(STATE_TOPIC, 10, "testShouldRestoreStore") + val messages = readAll(STATE_TOPIC, 14, "testShouldRestoreStore") - assertEquals(11, messages.length) + assertEquals(15, messages.length) // From initial start. assertEquals("1", messages(0)) assertEquals("2", messages(1)) assertEquals("3", messages(2)) assertEquals("2", messages(3)) + assertEquals("99", messages(4)) + assertEquals(null, messages(5)) // From second startup. - assertEquals("1", messages(4)) - assertEquals("2", messages(5)) - assertEquals("3", messages(6)) + assertEquals("1", messages(6)) assertEquals("2", messages(7)) + assertEquals("3", messages(8)) + assertEquals("2", messages(9)) + assertEquals("99", messages(10)) + assertEquals(null, messages(11)) // From sending in this method. - assertEquals("4", messages(8)) - assertEquals("5", messages(9)) - assertEquals("5", messages(10)) + assertEquals("4", messages(12)) + assertEquals("5", messages(13)) + assertEquals("5", messages(14)) stopJob(job) } @@ -376,7 +384,11 @@ class TestStatefulTask { while (message == null || message.offset < maxOffsetInclusive) { message = stream.next - messages += new String(message.message, "UTF-8") + if (message.message == null) { + messages += null + } else { + messages += new String(message.message, "UTF-8") + } System.err.println("TestStatefulTask.readAll(): offset=%s, message=%s" format (message.offset, messages.last)) } @@ -436,7 +448,14 @@ class TestTask extends StreamTask with InitableTask { System.err.println("TestTask.process(): %s" format msg) received += msg - store.put(msg, msg) + + // A negative string means delete + if (msg.startsWith("-")) { + store.delete(msg.substring(1)) + } else { + store.put(msg, msg) + } + coordinator.commit // Notify sender that we got a message.
