[
https://issues.apache.org/jira/browse/SAMZA-173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13921628#comment-13921628
]
Jakob Homan commented on SAMZA-173:
-----------------------------------
The Kafka message class has an isNull method to check this:
{noformat}
9 /**
8 * Is the payload of this message null
7 */
6 def isNull(): Boolean = payloadSize < 0{noformat}
Would it be better to use that in a probably vain attempt at information hiding
encapsulation?
> Restoring a changelog with a null value triggers an NPE
> -------------------------------------------------------
>
> Key: SAMZA-173
> URL: https://issues.apache.org/jira/browse/SAMZA-173
> Project: Samza
> Issue Type: Bug
> Components: kv
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Attachments: SAMZA-173.0.patch
>
>
> If a changelog has a null value in it (a delete), an NPE is triggered when
> the SamzaContainer restores it:
> {noformat}
> java.lang.NullPointerException
> at kafka.utils.Utils$.readBytes(Utils.scala:122)
> at
> org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.addMessage(KafkaSystemConsumer.scala:173)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:223)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:222)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> at
> org.apache.samza.system.kafka.BrokerProxy.moveMessagesToTheirQueue(BrokerProxy.scala:222)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at
> scala.collection.JavaConversions$JSetWrapper.foreach(JavaConversions.scala:642)
> at
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:156)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:120)
> at java.lang.Thread.run(Thread.java:619)
> {noformat}
> I believe it's due to this code:
> {code}
> val message = if (msg.message.buffer != null) {
> deserializer.fromBytes(Utils.readBytes(msg.message.payload))
> } else {
> null
> }
> {code}
> We are checking if buffer != null, but then passing in payload. I believe we
> need to check if payload is null instead.
--
This message was sent by Atlassian JIRA
(v6.2#6252)