I agree with what Matthias has said w.r.t failing fast. There are plenty of
times when you don't want to fail-fast and must attempt to  make progress.
The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io> wrote:

> First a meta comment. KIP discussion should take place on the dev list
> -- if user list is cc'ed please make sure to reply to both lists. Thanks.
>
> Thanks for making the scope of the KIP clear. Makes a lot of sense to
> focus on deserialization exceptions for now.
>
> With regard to corrupted state stores, would it make sense to fail a
> task and wipe out the store to repair it via recreation from the
> changelog? That's of course a quite advance pattern, but I want to bring
> it up to design the first step in a way such that we can get there (if
> we think it's a reasonable idea).
>
> I also want to comment about fail fast vs making progress. I think that
> fail-fast must not always be the best option. The scenario I have in
> mind is like this: you got a bunch of producers that feed the Streams
> input topic. Most producers work find, but maybe one producer miss
> behaves and the data it writes is corrupted. You might not even be able
> to recover this lost data at any point -- thus, there is no reason to
> stop processing but you just skip over those records. Of course, you
> need to fix the root cause, and thus you need to alert (either via logs
> of the exception handler directly) and you need to start to investigate
> to find the bad producer, shut it down and fix it.
>
> Here the dead letter queue comes into place. From my understanding, the
> purpose of this feature is solely enable post debugging. I don't think
> those record would be fed back at any point in time (so I don't see any
> ordering issue -- a skipped record, with this regard, is just "fully
> processed"). Thus, the dead letter queue should actually encode the
> original records metadata (topic, partition offset etc) to enable such
> debugging. I guess, this might also be possible if you just log the bad
> records, but it would be harder to access (you first must find the
> Streams instance that did write the log and extract the information from
> there). Reading it from topic is much simpler.
>
> I also want to mention the following. Assume you have such a topic with
> some bad records and some good records. If we always fail-fast, it's
> going to be super hard to process the good data. You would need to write
> an extra app that copied the data into a new topic filtering out the bad
> records (or apply the map() workaround withing stream). So I don't think
> that failing fast is most likely the best option in production is
> necessarily, true.
>
> Or do you think there are scenarios, for which you can recover the
> corrupted records successfully? And even if this is possible, it might
> be a case for reprocessing instead of failing the whole application?
> Also, if you think you can "repair" a corrupted record, should the
> handler allow to return a "fixed" record? This would solve the ordering
> problem.
>
>
>
> -Matthias
>
>
>
>
> On 5/30/17 1:47 AM, Michael Noll wrote:
> > Thanks for your work on this KIP, Eno -- much appreciated!
> >
> > - I think it would help to improve the KIP by adding an end-to-end code
> > example that demonstrates, with the DSL and with the Processor API, how
> the
> > user would write a simple application that would then be augmented with
> the
> > proposed KIP changes to handle exceptions.  It should also become much
> > clearer then that e.g. the KIP would lead to different code paths for the
> > happy case and any failure scenarios.
> >
> > - Do we have sufficient information available to make informed decisions
> on
> > what to do next?  For example, do we know in which part of the topology
> the
> > record failed? `ConsumerRecord` gives us access to topic, partition,
> > offset, timestamp, etc., but what about topology-related information
> (e.g.
> > what is the associated state store, if any)?
> >
> > - Only partly on-topic for the scope of this KIP, but this is about the
> > bigger picture: This KIP would give users the option to send corrupted
> > records to dead letter queue (quarantine topic).  But, what pattern would
> > we advocate to process such a dead letter queue then, e.g. how to allow
> for
> > retries with backoff ("If the first record in the dead letter queue fails
> > again, then try the second record for the time being and go back to the
> > first record at a later time").  Jay and Jan already alluded to ordering
> > problems that will be caused by dead letter queues. As I said, retries
> > might be out of scope but perhaps the implications should be considered
> if
> > possible?
> >
> > Also, I wrote the text below before reaching the point in the
> conversation
> > that this KIP's scope will be limited to exceptions in the category of
> > poison pills / deserialization errors.  But since Jay brought up user
> code
> > errors again, I decided to include it again.
> >
> > ----------------------------snip----------------------------
> > A meta comment: I am not sure about this split between the code for the
> > happy path (e.g. map/filter/... in the DSL) from the failure path (using
> > exception handlers).  In Scala, for example, we can do:
> >
> >     scala> val computation = scala.util.Try(1 / 0)
> >     computation: scala.util.Try[Int] =
> > Failure(java.lang.ArithmeticException: / by zero)
> >
> >     scala> computation.getOrElse(42)
> >     res2: Int = 42
> >
> > Another example with Scala's pattern matching, which is similar to
> > `KStream#branch()`:
> >
> >     computation match {
> >       case scala.util.Success(x) => x * 5
> >       case scala.util.Failure(_) => 42
> >     }
> >
> > (The above isn't the most idiomatic way to handle this in Scala, but
> that's
> > not the point I'm trying to make here.)
> >
> > Hence the question I'm raising here is: Do we want to have an API where
> you
> > code "the happy path", and then have a different code path for failures
> > (using exceptions and handlers);  or should we treat both Success and
> > Failure in the same way?
> >
> > I think the failure/exception handling approach (as proposed in this KIP)
> > is well-suited for errors in the category of deserialization problems aka
> > poison pills, partly because the (default) serdes are defined through
> > configuration (explicit serdes however are defined through API calls).
> >
> > However, I'm not yet convinced that the failure/exception handling
> approach
> > is the best idea for user code exceptions, e.g. if you fail to guard
> > against NPE in your lambdas or divide a number by zero.
> >
> >     scala> val stream = Seq(1, 2, 3, 4, 5)
> >     stream: Seq[Int] = List(1, 2, 3, 4, 5)
> >
> >     // Here: Fallback to a sane default when encountering failed records
> >     scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
> > Seq(t.getOrElse(42)))
> >     res19: Seq[Int] = List(0, 1, 42, -1, 0)
> >
> >     // Here: Skip over failed records
> >     scala> stream.map(x => Try(1/(3 - x))).collect{ case Success(s) => s
> }
> >     res20: Seq[Int] = List(0, 1, -1, 0)
> >
> > The above is more natural to me than using error handlers to define how
> to
> > deal with failed records (here, the value `3` causes an arithmetic
> > exception).  Again, it might help the KIP if we added an end-to-end
> example
> > for such user code errors.
> > ----------------------------snip----------------------------
> >
> >
> >
> >
> > On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <jan.filip...@trivago.com>
> > wrote:
> >
> >> Hi Jay,
> >>
> >> Eno mentioned that he will narrow down the scope to only ConsumerRecord
> >> deserialisation.
> >>
> >> I am working with Database Changelogs only. I would really not like to
> see
> >> a dead letter queue or something
> >> similliar. how am I expected to get these back in order. Just grind to
> >> hold an call me on the weekend. I'll fix it
> >> then in a few minutes rather spend 2 weeks ordering dead letters. (where
> >> reprocessing might be even the faster fix)
> >>
> >> Best Jan
> >>
> >>
> >>
> >>
> >> On 29.05.2017 20:23, Jay Kreps wrote:
> >>
> >>>     - I think we should hold off on retries unless we have worked out
> the
> >>>     full usage pattern, people can always implement their own. I think
> >>> the idea
> >>>     is that you send the message to some kind of dead letter queue and
> >>> then
> >>>     replay these later. This obviously destroys all semantic guarantees
> >>> we are
> >>>     working hard to provide right now, which may be okay.
> >>>
> >>
> >>
> >
>
>

Reply via email to