[ https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585912#comment-15585912 ]
Frank Lyaruu commented on KAFKA-4311: ------------------------------------- I see another stacktrace: java.lang.ClassCastException: java.util.Collections$UnmodifiableList cannot be cast to com.dexels.replication.api.ReplicationMessage at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:80) at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134) at org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121) at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) at org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182) at org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92) at org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) I see one or the other, but not both as far as I can recall. It seems to lose track of the right serde, I use both types during processing (ReplicationMessage and List<ReplicationMessage>) but somehow it ends up using the wrong one. I found that I can 'solve' this issue by setting StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. > Exception in NamedCache.flush - Key found in dirty key set, but entry is > null > ------------------------------------------------------------------------------- > > Key: KAFKA-4311 > URL: https://issues.apache.org/jira/browse/KAFKA-4311 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0 > Reporter: Damian Guy > Assignee: Damian Guy > Fix For: 0.10.1.1 > > > Reported on the mailing list. Needs looking into how it could get in this > state. > [StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Failed to close state manager for StreamTask 0_0: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed > to close state store addr-organization > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342) > at > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121) > at > org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341) > at > org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) > at > org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338) > at > org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299) > at > org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) > Caused by: java.lang.IllegalStateException: Key found in dirty key set, but > entry is null > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340) > ... 7 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)