[
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779715#comment-17779715
]
A. Sophie Blee-Goldman commented on KAFKA-15463:
------------------------------------------------
[~yevsh] Well the intended pattern is to get the StateStore from the context
during init and then save a reference to the store for your
processor/transformer. That's why we pass the context in to #init but not to
#process (also, while it's not a significant overhead, it's definitely more
efficient to do the state store lookup only once, during init, and not on every
single record that's processed)
That said, I think it's a fair question as to why this is causing an error, and
why the error doesn't happen every time. The fact that it generally only
appears when you have more than one partition/task per application instance
makes me think there is some state that is somehow being shared between the
different tasks. This would definitely be explained by returning the same
transformer instance each time, but based on your latest update, you are
definitely returning a new transformer each time and still seeing the issue,
right?
Assuming so, I'm inclined to believe the issue is with Spring. I vaguely recall
a similar problem being reported in the past, which was ultimately because a
Spring object was unexpectedly/unknowingly acting as a static/singleton class.
This was resulting in the context – of which there is supposed to be exactly
one per task – being shared between different instances of the task/processor
nodes.
I'm pretty sure that's what is going on here as well. I'm guessing that the
MyService class is a Spring bean? If so, then it's effectively a singleton and
will be shared by each of the individual Transformer instances in your
application, meaning the different tasks will be overwriting each others
context when invoking #init on this transformer. So the context you retrieve
from the myServiceBean during #process may not be the same as the context you
saved to it in #init, causing it to throw this error since only the context
corresponding to the task that is currently being processed will have the
currentNode set to a non-null value.
Even if you made the change I originally suggested but saved the StateStore
reference by passing it to the MyService bean, it wouldn't work – it might not
throw this error but you would potentially be reading and writing to the wrong
copy of a state store for a given task, which is even worse. The only way to
solve this is by removing the Spring bean entirely or at least refactoring it
so that it doesn't hold any internal state and has to have the full application
state for that task passed in to it every time – in other words you just need
to make sure to keep all the objects used by a given transformer completely
local to that instance. Here is my recommendation for how to implement your
transformer class – hope this helps!
private final MYService myServiceBean;
private StateStore myStore;
@Overridepublic void init(ProcessorContext context) \{
myStore = context.getStateStore(STORE_NAME);
}
@Overridepublic KeyValue<String, MyItem> transform(String key, MyItem myItem) \{
myServiceBean.process(myItem, myStore);
}
Basically modify the MyService bean to accept the StateStore to operate on as a
parameter to its #process method. And definitely keep the fix in which you
return a new Transformer instance each time instead of reusing the same one.
Let me know if you have any questions! I'm going to close the ticket since I'm
fairly confident in this solution having seen the same problem before, but
definitely please do reopen it if you implement the suggested fix and still
encounter an error. Good luck!
> StreamsException: Accessing from an unknown node
> -------------------------------------------------
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.2.1
> Reporter: Yevgeny
> Priority: Major
>
> After some time application was working fine, starting to get:
>
> This is springboot application runs in kubernetes as stateful pod.
>
>
>
> {code:java}
> Exception in thread
> "xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown
> node at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
> at myclass1.java:28) at myclass2.java:48) at
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
> at
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
> at
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
> at myclass3.java:48) at
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
> {code}
>
> stream-thread
> [xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State
> transition from PENDING_SHUTDOWN to DEAD
>
>
> Transformer is Prototype bean, the supplier supplys new instance of the
> Transformer:
>
>
> {code:java}
> @Override public Transformer<String, MyItem, KeyValue<String, MyItem>> get()
> { return ctx.getBean(MyTransformer.class); }{code}
>
>
> The only way to recover is to delete all topics used by kafkastreams, even if
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog' are deleted/offset
> manipulated, can it cause the issue?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)