Hi Adam and John, thank you for your effort!
We are implementing full idem-potency in our projects so that's nothing to
worry about.
As to what John said - we only have one partition, I personally assured
that.
So as i wrote in section 2. of my first message in this conversation - my
stream should have processed the "asd" message again because it is not
committed yet.
That's why i suspect it has something to do with the stream's cache; maybe
something like:
1. "asd" got processed and restored in cache
2. "{}" got processed and cached too.
3. commit interval makes the stream commit the offset of "{}"


B.t.w
If you want to run my application you should:
1. open it in some java editor as maven project
2. run it as a normal java application
3. setup kafka server & zookeeper on localhost
4. then you can send the above messages via cli

John - even if you send "asd1", "asd2", "asd3" you will see in the logs
that my app takes the latest each time

Of course that's far beyond what i can ask from you guys to do, thanks a
lot for your help.

On Wed, Sep 12, 2018 at 8:14 PM John Roesler <j...@confluent.io> wrote:

> Hi!
>
> As Adam said, if you throw an exception during processing, it should cause
> Streams to shut itself down and *not* commit that message. Therefore, when
> you start up again, it should again attempt to process that same message
> (and shut down again).
>
> Within a single partition, messages are processed in order, so a bad
> message will block the queue, and you should not see subsequent messages
> get processed.
>
> However, if your later message "{}" goes to a different partition than the
> bad message, then there's no relationship between them, and the later,
> good, message might get processed.
>
> Does that help?
> -John
>
> On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > Hi Yui Yoi
> >
> >
> > Keep in mind that Kafka Consumers don't traditionally request only a
> single
> > message at a time, but instead requests them in batches. This allows for
> > much higher throughput, but does result in the scenario of
> "at-least-once"
> > processing. Generally what will happen in this scenario is the following:
> >
> > 1) Client requests the next set of messages from offset (t). For example,
> > assume it gets 10 messages and message 6 is "bad".
> > 2) The client's processor will then process the messages one at a time.
> > Note that the offsets are not committed after the message is processed,
> but
> > only at the end of the batch.
> > 3) The bad message it hit by the processor. At this point you can decide
> to
> > skip the message, throw an exception, etc.
> > 4a) If you decide to skip the message, processing will continue. Once all
> > 10 messages are processed, the new offset (t+10) offset is committed back
> > to Kafka.
> > 4b) If you decide to throw an exception and terminate your app, you will
> > have still processed the messages that came before the bad message.
> Because
> > the offset (t+10) is not committed, the next time you start the app it
> will
> > consume from offset t, and those messages will be processed again. This
> is
> > "at-least-once" processing.
> >
> >
> > Now, if you need exactly-once processing, you have two choices -
> > 1) Use Kafka Streams with exactly-once semantics (though, as I am not
> > familiar with your framework, it may support it as well).
> > 2) Use idempotent practices (ie: it doesn't matter if the same messages
> get
> > processed more than once).
> >
> >
> > Hope this helps -
> >
> > Adam
> >
> >
> > On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi <shalosh...@gmail.com> wrote:
> >
> > > Hi Adam,
> > > Thanks a lot for the rapid response, it did helped!
> > >
> > > Let me though ask one more simple question: Can I make a stream
> > application
> > > stuck on an invalid message? and not consuming any further messages?
> > >
> > > Thanks again
> > >
> > > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <
> adam.bellem...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Yui Yoi
> > > >
> > > > Preface: I am not familiar with the spring framework.
> > > >
> > > > "Earliest" when it comes to consuming from Kafka means, "Start
> reading
> > > from
> > > > the first message in the topic, *if there is no offset stored for
> that
> > > > consumer group*". It sounds like you are expecting it to re-read each
> > > > message whenever a new message comes in. This is not going to happen,
> > as
> > > > there will be a committed offset and "earliest" will no longer be
> used.
> > > If
> > > > you were to use "latest" instead, if a consumer is started that does
> > not
> > > > have a valid offset, it would use the very latest message in the
> topic
> > as
> > > > the starting offset for message consumption.
> > > >
> > > > Now, if you are using the same consumer group each time you run the
> > > > application (which it seems is true, as you have "test-group"
> hardwired
> > > in
> > > > your application.yml), but you do not tear down your local cluster
> and
> > > > clear out its state, you will indeed see the behaviour you describe.
> > > > Remember that Kafka is durable, and maintains the offsets when the
> > > > individual applications go away. So you are probably seeing this:
> > > >
> > > > 1) start application instance 1. It realizes it has no offset when it
> > > tries
> > > > to register as a consumer on the input topic, so it creates a new
> > > consumer
> > > > entry for "earliest" for your consumer group.
> > > > 2) send message "asd"
> > > > 3) application instance 1 receives "asd", processes it, and updates
> the
> > > > offset (offset head = 1)
> > > > 4) Terminate instance 1
> > > > 5) Start application instance 2. It detects correctly that consumer
> > group
> > > > "test-group" is available and reads that offset as its starting
> point.
> > > > 6) send message "{}"
> > > > 7) application instance 2 receives "{}", processes it, and updates
> the
> > > > offset (offset head = 2)
> > > > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> > > > telling the Kafka cluster that it belongs to the same consumer group
> as
> > > > application 1.
> > > >
> > > > Hope this helps,
> > > >
> > > > Adam
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <shalosh...@gmail.com>
> wrote:
> > > >
> > > > > TL;DR:
> > > > > my streams application skips uncommitted messages
> > > > >
> > > > > Hello,
> > > > > I'm using streams API via spring framework and experiencing a weird
> > > > > behavior which I would like to get an explanation to:
> > > > > First of all: The attached zip is my test project, I used kafka cli
> > to
> > > > run
> > > > > a localhost broker and zookeeper
> > > > >
> > > > > what is happening is as follows:
> > > > > 1. I send an invalid message, such as "asd", and my consumer has a
> > lag
> > > > and
> > > > > error message as expected
> > > > > 2. I send a valid message such as "{}", but instead of rereading
> the
> > > > first
> > > > > message as expected from an "earliest" configured application - my
> > > > > application reads the latest message, commits it and ignoring the
> one
> > > in
> > > > > error, thus i have no lag!
> > > > > 3. When I'm running my application when there are uncommitted
> > messages
> > > -
> > > > > my application reads the FIRST not committed message, as if it IS
> an
> > > > > "earliest" configured application!
> > > > >
> > > > > In your documentation you assure "at least once" behavior, but
> > > according
> > > > > to section 2. it happens so my application does not receive those
> > > > messages
> > > > > not even once (as i said, those messages are uncommitted)
> > > > >
> > > > > My guess is that it has something to do with the stream's cache...
> I
> > > > would
> > > > > very like to have an explanation or even a solution
> > > > >
> > > > > I'm turning to you as a last resort, after long weeks of research
> and
> > > > > experiments
> > > > >
> > > > > Thanks alot
> > > > >
> > > >
> > >
> >
>

Reply via email to