Andres Gomez Ferrer created KAFKA-5961:
------------------------------------------
Summary: NullPointerException when restore messages with null key.
Key: KAFKA-5961
URL: https://issues.apache.org/jira/browse/KAFKA-5961
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.11.0.0, 0.10.2.1
Reporter: Andres Gomez Ferrer
If you have a kafka streams that use:
{code:java}
stream.table("topicA")
{code}
When the application is running if you send a message with a null key, it works
fine. Later, if you restart the application when the restore consumer starts to
read the topicA from the beginning, it crashes because doesn't filter the null
key.
I know that isn't normal send a null key to a topic that is a table topic, but
maybe sometimes can happen .. and I think that kafka streams could protect it
self.
This is the stack trace:
{code}
ConsumerCoordinator [ERROR] User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
my-cep-app_enricher failed on partition assignment
java.lang.NullPointerException
at org.rocksdb.RocksDB.put(RocksDB.java:488)
at
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
at
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
at
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)