[ 
https://issues.apache.org/jira/browse/SAMZA-1878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Hu updated SAMZA-1878:
----------------------------
    Environment: 
`samza-kafka_2.10:0.10.1`

`samza-yarn_2.10:0.10.1`

`samza-kv-rocksdb_2.10:0.10.1`

`kafka-clients:0.8.2.2`

kafka cluster version: 1.1.10, the changelog topic turns on log compaction

  was:
`samza-kafka_2.10:0.10.1`

`samza-yarn_2.10:0.10.1`

`samza-kv-rocksdb_2.10:0.10.1`

`kafka-clients:0.8.2.2`

kafka cluster version: 1.1.10

       Priority: Blocker  (was: Critical)
    Description: 
The job failed to start due to NPE when restoring RocksDB from one Kafka 
changelog topic. I mark it as a Blocker as it blocks the confidence to use a 
Kafka changelog as the restoration source for state management

>From the log, `keyBytes` is NULL when reading envelope from the changelog 
>topic 
>[https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala#L127]

Even though the fix can be as easy as adding a null check, the root cause of 
null keyBytes may worths a deeper investigation as RocksDB put doesn't allow 
NULL key and Kafka topic log compaction only set value to NULL.

 

I test using a kafka-consumer to read from the topic directly. However no NULL 
key messages r found.

 

Detail logs:
java.lang.NullPointerException: null
at 
scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
at scala.collection.SeqLike$class.size(SeqLike.scala:106)
at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:105)
at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:90)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:90)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:187)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:181)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:181)
at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:76)
at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:99)
at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:677)
at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:675)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at 
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:675)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:586)
at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82)
at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:56)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

  was:
The job failed to start due to NPE when restoring RocksDB from the changelog 
topic.

>From the log, `keyBytes` is NULL when reading envelope from the changelog 
>topic 
>[https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala#L127]

 

Detail logs:
java.lang.NullPointerException: null
    at 
scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
    at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
    at scala.collection.SeqLike$class.size(SeqLike.scala:106)
    at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
    at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:105)
    at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:90)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at 
org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:90)
    at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:187)
    at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:181)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at 
org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:181)
    at 
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:76)
    at 
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:99)
    at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:677)
    at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:675)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
    at 
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:675)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:586)
    at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82)
    at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:56)
    at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)


> NPE when RocksDB restores from changelog topic
> ----------------------------------------------
>
>                 Key: SAMZA-1878
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1878
>             Project: Samza
>          Issue Type: Bug
>          Components: kv-store
>    Affects Versions: 0.10.1
>         Environment: `samza-kafka_2.10:0.10.1`
> `samza-yarn_2.10:0.10.1`
> `samza-kv-rocksdb_2.10:0.10.1`
> `kafka-clients:0.8.2.2`
> kafka cluster version: 1.1.10, the changelog topic turns on log compaction
>            Reporter: Frank Hu
>            Priority: Blocker
>
> The job failed to start due to NPE when restoring RocksDB from one Kafka 
> changelog topic. I mark it as a Blocker as it blocks the confidence to use a 
> Kafka changelog as the restoration source for state management
> From the log, `keyBytes` is NULL when reading envelope from the changelog 
> topic 
> [https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala#L127]
> Even though the fix can be as easy as adding a null check, the root cause of 
> null keyBytes may worths a deeper investigation as RocksDB put doesn't allow 
> NULL key and Kafka topic log compaction only set value to NULL.
>  
> I test using a kafka-consumer to read from the topic directly. However no 
> NULL key messages r found.
>  
> Detail logs:
> java.lang.NullPointerException: null
> at 
> scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
> at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
> at scala.collection.SeqLike$class.size(SeqLike.scala:106)
> at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
> at 
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:105)
> at 
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:90)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:90)
> at 
> org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:187)
> at 
> org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:181)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:181)
> at 
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:76)
> at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:99)
> at 
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:677)
> at 
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:675)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at 
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:675)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:586)
> at 
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:56)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to