Hi Adam, Thanks a lot for the rapid response, it did helped! Let me though ask one more simple question: Can I make a stream application stuck on an invalid message? and not consuming any further messages?
Thanks again On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <[email protected]> wrote: > Hi Yui Yoi > > Preface: I am not familiar with the spring framework. > > "Earliest" when it comes to consuming from Kafka means, "Start reading from > the first message in the topic, *if there is no offset stored for that > consumer group*". It sounds like you are expecting it to re-read each > message whenever a new message comes in. This is not going to happen, as > there will be a committed offset and "earliest" will no longer be used. If > you were to use "latest" instead, if a consumer is started that does not > have a valid offset, it would use the very latest message in the topic as > the starting offset for message consumption. > > Now, if you are using the same consumer group each time you run the > application (which it seems is true, as you have "test-group" hardwired in > your application.yml), but you do not tear down your local cluster and > clear out its state, you will indeed see the behaviour you describe. > Remember that Kafka is durable, and maintains the offsets when the > individual applications go away. So you are probably seeing this: > > 1) start application instance 1. It realizes it has no offset when it tries > to register as a consumer on the input topic, so it creates a new consumer > entry for "earliest" for your consumer group. > 2) send message "asd" > 3) application instance 1 receives "asd", processes it, and updates the > offset (offset head = 1) > 4) Terminate instance 1 > 5) Start application instance 2. It detects correctly that consumer group > "test-group" is available and reads that offset as its starting point. > 6) send message "{}" > 7) application instance 2 receives "{}", processes it, and updates the > offset (offset head = 2) > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is > telling the Kafka cluster that it belongs to the same consumer group as > application 1. > > Hope this helps, > > Adam > > > > > > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <[email protected]> wrote: > > > TL;DR: > > my streams application skips uncommitted messages > > > > Hello, > > I'm using streams API via spring framework and experiencing a weird > > behavior which I would like to get an explanation to: > > First of all: The attached zip is my test project, I used kafka cli to > run > > a localhost broker and zookeeper > > > > what is happening is as follows: > > 1. I send an invalid message, such as "asd", and my consumer has a lag > and > > error message as expected > > 2. I send a valid message such as "{}", but instead of rereading the > first > > message as expected from an "earliest" configured application - my > > application reads the latest message, commits it and ignoring the one in > > error, thus i have no lag! > > 3. When I'm running my application when there are uncommitted messages - > > my application reads the FIRST not committed message, as if it IS an > > "earliest" configured application! > > > > In your documentation you assure "at least once" behavior, but according > > to section 2. it happens so my application does not receive those > messages > > not even once (as i said, those messages are uncommitted) > > > > My guess is that it has something to do with the stream's cache... I > would > > very like to have an explanation or even a solution > > > > I'm turning to you as a last resort, after long weeks of research and > > experiments > > > > Thanks alot > > >
