[
https://issues.apache.org/jira/browse/KAFKA-5961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andres Gomez Ferrer resolved KAFKA-5961.
----------------------------------------
Resolution: Fixed
> NullPointerException when consumer restore read messages with null key.
> -----------------------------------------------------------------------
>
> Key: KAFKA-5961
> URL: https://issues.apache.org/jira/browse/KAFKA-5961
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.1
> Reporter: Andres Gomez Ferrer
> Fix For: 0.11.0.1, 0.11.0.0
>
>
> If you have a kafka streams that use:
> {code:java}
> stream.table("topicA")
> {code}
> When the application is running if you send a message with a null key, it
> works fine. Later, if you restart the application when the restore consumer
> starts to read the topicA from the beginning, it crashes because doesn't
> filter the null key.
> I know that isn't normal send a null key to a topic that is a table topic,
> but maybe sometimes can happen .. and I think that kafka streams could
> protect it self.
> This is the stack trace:
> {code}
> ConsumerCoordinator [ERROR] User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> my-cep-app_enricher failed on partition assignment
> java.lang.NullPointerException
> at org.rocksdb.RocksDB.put(RocksDB.java:488)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)