[
https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949949#comment-15949949
]
saiprasad mishra commented on KAFKA-4344:
-----------------------------------------
Hi All
Lot of people reached out to me on how to use existing spring beans.
If anybody is looking for example on how to initialize kafka streams in spring
boot app and invoke your existing spring beans from kafka streams processor
class pls. find the code sample in below gist (Sorry for lack of a sample
project for now coz of lack of time, but it will come soon)
https://gist.github.com/saiprasadmishra/8362134f87ae84e8183eca3b1afcf23f
> Exception when accessing partition, offset and timestamp in processor class
> ---------------------------------------------------------------------------
>
> Key: KAFKA-4344
> URL: https://issues.apache.org/jira/browse/KAFKA-4344
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.0
> Reporter: saiprasad mishra
> Assignee: Guozhang Wang
> Priority: Minor
>
> I have a kafka stream pipeline like below
> source topic stream -> filter for null value ->map to make it keyed by id
> ->custom processor to mystore ->to another topic -> ktable
> I am hitting the below type of exception in a custom processor class if I try
> to access offset() or partition() or timestamp() from the ProcessorContext in
> the process() method. I was hoping it would return the partition and offset
> for the enclosing topic(in this case source topic) where its consuming from
> or -1 based on the api docs.
> java.lang.IllegalStateException: This should not happen as offset() should
> only be called while a record is processed
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> [kafka-streams-0.10.1.0.jar!/:?]
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)