Abhishek Agarwal created KAFKA-7213:
---------------------------------------

             Summary: NullPointerException during state restoration in kafka 
streams
                 Key: KAFKA-7213
                 URL: https://issues.apache.org/jira/browse/KAFKA-7213
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: Abhishek Agarwal
            Assignee: Abhishek Agarwal


I had written a custom state store which has a batch restoration callback 
registered. What I have observed, when multiple consumer instances are 
restarted, the application keeps failing with NullPointerException. The stack 
trace is 
{noformat}
java.lang.NullPointerException: null
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.putAll(RocksDBStore.java:351)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore.putAll(RocksDBSlotKeyValueBytesStore.java:100)
 ~[streams-core-1.0.0.297.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore$SlotKeyValueBatchRestoreCallback.restoreAll(RocksDBSlotKeyValueBytesStore.java:303)
 ~[streams-core-1.0.0.297.jar:?]
        at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreAll(CompositeRestoreListener.java:89)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:75)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:277)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:238)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
 ~[kafka-streams-1.0.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
 ~[kafka-streams-1.0.0.jar:?]
{noformat}

The faulty line in question is 
{noformat}
db.write(wOptions, batch);
{noformat}

in RocksDBStore.java which would mean that db variable is null. Probably the 
store has been closed and restoration is still being done on it. After going 
through the code, I think the problem is when state transitions from 
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED and restoration is still in progress. 
In such state transition, while the active tasks themselves are closed, the 
changelog reader is not reset. It tries to restore the tasks that have already 
been closed, db is null and results in NPE. 

I will put in a fix to see if that fixes the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to