Hi Yui Yoi

Preface: I am not familiar with the spring framework.

"Earliest" when it comes to consuming from Kafka means, "Start reading from
the first message in the topic, *if there is no offset stored for that
consumer group*". It sounds like you are expecting it to re-read each
message whenever a new message comes in. This is not going to happen, as
there will be a committed offset and "earliest" will no longer be used. If
you were to use "latest" instead, if a consumer is started that does not
have a valid offset, it would use the very latest message in the topic as
the starting offset for message consumption.

Now, if you are using the same consumer group each time you run the
application (which it seems is true, as you have "test-group" hardwired in
your application.yml), but you do not tear down your local cluster and
clear out its state, you will indeed see the behaviour you describe.
Remember that Kafka is durable, and maintains the offsets when the
individual applications go away. So you are probably seeing this:

1) start application instance 1. It realizes it has no offset when it tries
to register as a consumer on the input topic, so it creates a new consumer
entry for "earliest" for your consumer group.
2) send message "asd"
3) application instance 1 receives "asd", processes it, and updates the
offset (offset head = 1)
4) Terminate instance 1
5) Start application instance 2. It detects correctly that consumer group
"test-group" is available and reads that offset as its starting point.
6) send message "{}"
7) application instance 2 receives "{}", processes it, and updates the
offset (offset head = 2)
*NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
telling the Kafka cluster that it belongs to the same consumer group as
application 1.

Hope this helps,

Adam





On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <shalosh...@gmail.com> wrote:

> TL;DR:
> my streams application skips uncommitted messages
>
> Hello,
> I'm using streams API via spring framework and experiencing a weird
> behavior which I would like to get an explanation to:
> First of all: The attached zip is my test project, I used kafka cli to run
> a localhost broker and zookeeper
>
> what is happening is as follows:
> 1. I send an invalid message, such as "asd", and my consumer has a lag and
> error message as expected
> 2. I send a valid message such as "{}", but instead of rereading the first
> message as expected from an "earliest" configured application - my
> application reads the latest message, commits it and ignoring the one in
> error, thus i have no lag!
> 3. When I'm running my application when there are uncommitted messages -
> my application reads the FIRST not committed message, as if it IS an
> "earliest" configured application!
>
> In your documentation you assure "at least once" behavior, but according
> to section 2. it happens so my application does not receive those messages
> not even once (as i said, those messages are uncommitted)
>
> My guess is that it has something to do with the stream's cache... I would
> very like to have an explanation or even a solution
>
> I'm turning to you as a last resort, after long weeks of research and
> experiments
>
> Thanks alot
>

Reply via email to