[ https://issues.apache.org/jira/browse/KAFKA-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Damian Guy updated KAFKA-4300: ------------------------------ Description: evict can be called on a NamedCache even though it is empty. This is due to the shared nature of the outer ThreadCache. Currently if evict is called on an empty NamedCache it will throw a NullPointerException. >From the original email: I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka 0.10.1 release candidate. It runs ok for a few thousand of messages, and then it dies with the following exception: Exception in thread "StreamThread-1" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134) at org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121) at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) at org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182) at org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92) at org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:177) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:427) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:235) was:evict can be called on a NamedCache even though it is empty. This is due to the shared nature of the outer ThreadCache. Currently if evict is called on an empty NamedCache it will throw a NullPointerException > NamedCache throws an NPE when evict is called and the cache is empty > -------------------------------------------------------------------- > > Key: KAFKA-4300 > URL: https://issues.apache.org/jira/browse/KAFKA-4300 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0 > Reporter: Damian Guy > Assignee: Damian Guy > Fix For: 0.10.1.0 > > > evict can be called on a NamedCache even though it is empty. This is due to > the shared nature of the outer ThreadCache. Currently if evict is called on > an empty NamedCache it will throw a NullPointerException. > From the original email: > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka > 0.10.1 release candidate. > It runs ok for a few thousand of messages, and then it dies with the > following exception: > Exception in thread "StreamThread-1" java.lang.NullPointerException > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134) > at > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121) > at > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > at > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) > at > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) > at > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182) > at > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92) > at > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:177) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:427) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:235) -- This message was sent by Atlassian JIRA (v6.3.4#6332)