Chris Riccomini created SAMZA-173:
-------------------------------------

             Summary: Restoring a changelog with a null value triggers an NPE
                 Key: SAMZA-173
                 URL: https://issues.apache.org/jira/browse/SAMZA-173
             Project: Samza
          Issue Type: Bug
          Components: kv
    Affects Versions: 0.6.0
            Reporter: Chris Riccomini


If a changelog has a null value in it (a delete), an NPE is triggered when the 
SamzaContainer restores it:

{noformat}
java.lang.NullPointerException
        at kafka.utils.Utils$.readBytes(Utils.scala:122)
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.addMessage(KafkaSystemConsumer.scala:173)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:223)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:222)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
        at 
org.apache.samza.system.kafka.BrokerProxy.moveMessagesToTheirQueue(BrokerProxy.scala:222)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
        at 
scala.collection.JavaConversions$JSetWrapper.foreach(JavaConversions.scala:642)
        at 
org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:156)
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:120)
        at java.lang.Thread.run(Thread.java:619)
{noformat}

I believe it's due to this code:

{code}
      val message = if (msg.message.buffer != null) {
        deserializer.fromBytes(Utils.readBytes(msg.message.payload))
      } else {
        null
      }
{code}

We are checking if buffer != null, but then passing in payload. I believe we 
need to check if payload is null instead.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to