[ 
https://issues.apache.org/jira/browse/SAMZA-173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-173:
----------------------------------

    Attachment: SAMZA-173.0.patch

Attaching patch.

Changes:

1. Update TestStatefulTask to trigger the NPE.
2. Fix KafkaSystemConsumer to use payload instead of buffer when doing null 
checks.
3. Fix KeyValueStorageEngine to only count value bytes when value is non-null.

Test passes.

> Restoring a changelog with a null value triggers an NPE
> -------------------------------------------------------
>
>                 Key: SAMZA-173
>                 URL: https://issues.apache.org/jira/browse/SAMZA-173
>             Project: Samza
>          Issue Type: Bug
>          Components: kv
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>         Attachments: SAMZA-173.0.patch
>
>
> If a changelog has a null value in it (a delete), an NPE is triggered when 
> the SamzaContainer restores it:
> {noformat}
> java.lang.NullPointerException
>       at kafka.utils.Utils$.readBytes(Utils.scala:122)
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.addMessage(KafkaSystemConsumer.scala:173)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:223)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:222)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>       at 
> org.apache.samza.system.kafka.BrokerProxy.moveMessagesToTheirQueue(BrokerProxy.scala:222)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>       at 
> scala.collection.JavaConversions$JSetWrapper.foreach(JavaConversions.scala:642)
>       at 
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:156)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:120)
>       at java.lang.Thread.run(Thread.java:619)
> {noformat}
> I believe it's due to this code:
> {code}
>       val message = if (msg.message.buffer != null) {
>         deserializer.fromBytes(Utils.readBytes(msg.message.payload))
>       } else {
>         null
>       }
> {code}
> We are checking if buffer != null, but then passing in payload. I believe we 
> need to check if payload is null instead.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to