Peter Pringle created KAFKA-10246:
-------------------------------------

             Summary: AbstractProcessorContext topic() throws 
NullPointerException when modifying a state store within the DSL from a 
punctuator
                 Key: KAFKA-10246
                 URL: https://issues.apache.org/jira/browse/KAFKA-10246
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.5.0
         Environment: linux, windows, java 11
            Reporter: Peter Pringle


It seems valid for AbstractProcessorContext.topic() to return null; however the 
check below returns a NullPointerException before a null can be returned.
{quote}if (topic.equals(NONEXIST_TOPIC)) {
{quote}
Make a local fix to reverse the ordering of the check (i.e. avoid the null) and 
this appears to fix the issue and sends the change to the state stores change 
log topic.
{quote}if (NONEXIST_TOPIC.equals(topic)) {
{quote}
Stacktrace below seen when deleting from a previously declared ktable 
materialized state store which is being called from a punctuator added to the 
topology using either process/valueTransform within the init method.

 

 

{{2020-07-02 07:29:46,829 
[ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR 
[o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5}}
{{51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the following 
error during processing:}}
{{java.lang.NullPointerException: null}}
{{ at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
{{ at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
{{ at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
{{ at 
org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}}
{{ at 
org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
{{ at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
{{ at 
org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}}
{{ at 
org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
{{ at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
{{ at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}}
{{ at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}}
{{ at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)}}
{{ at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)}}
{{ at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)}}
{{ at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)}}
{{ at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
{{ at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
{{ at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
{{ at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
{{ at 
com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$0(StreamsBuilderHelper.java:55)
 // punctuated lambda - user code}}
{{ at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) 
//iterates over the state store and cleans up old items}}
{{ at 
com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$1(StreamsBuilderHelper.java:47)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161)}}
{{ at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at 
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161)}}
{{ at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445)}}
{{ at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at 
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445)}}
{{ at 
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)}}
{{ at 
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868)}}
{{ at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502)}}
{{ at 
org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557)}}
{{ at 
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951)}}
{{ at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)}}
{{ at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)}}
{{ at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to