[ https://issues.apache.org/jira/browse/KAFKA-5961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177246#comment-16177246 ]
Matthias J. Sax commented on KAFKA-5961: ---------------------------------------- [~agomez] Thanks for reporting this. Looking into this, the stack trace you share if for {{0.10.2.1}}. Can you also share the stack trace for {{0.11.0.0}}? It seems to be fixed there already -- but maybe we look at the wrong code (as we don't have the corresponding stack trace) (cf. https://github.com/apache/kafka/blob/0.11.0.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L215) -- thanks to [~bbejeck] for pointing this out! > NullPointerException when consumer restore read messages with null key. > ----------------------------------------------------------------------- > > Key: KAFKA-5961 > URL: https://issues.apache.org/jira/browse/KAFKA-5961 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.1, 0.11.0.0 > Reporter: Andres Gomez Ferrer > > If you have a kafka streams that use: > {code:java} > stream.table("topicA") > {code} > When the application is running if you send a message with a null key, it > works fine. Later, if you restart the application when the restore consumer > starts to read the topicA from the beginning, it crashes because doesn't > filter the null key. > I know that isn't normal send a null key to a topic that is a table topic, > but maybe sometimes can happen .. and I think that kafka streams could > protect it self. > This is the stack trace: > {code} > ConsumerCoordinator [ERROR] User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > my-cep-app_enricher failed on partition assignment > java.lang.NullPointerException > at org.rocksdb.RocksDB.put(RocksDB.java:488) > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254) > at > org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67) > at > org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)