I agree with what Matthias has said w.r.t failing fast. There are plenty of times when you don't want to fail-fast and must attempt to make progress. The dead-letter queue is exactly for these circumstances. Of course if every record is failing, then you probably do want to give up.
On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io> wrote: > First a meta comment. KIP discussion should take place on the dev list > -- if user list is cc'ed please make sure to reply to both lists. Thanks. > > Thanks for making the scope of the KIP clear. Makes a lot of sense to > focus on deserialization exceptions for now. > > With regard to corrupted state stores, would it make sense to fail a > task and wipe out the store to repair it via recreation from the > changelog? That's of course a quite advance pattern, but I want to bring > it up to design the first step in a way such that we can get there (if > we think it's a reasonable idea). > > I also want to comment about fail fast vs making progress. I think that > fail-fast must not always be the best option. The scenario I have in > mind is like this: you got a bunch of producers that feed the Streams > input topic. Most producers work find, but maybe one producer miss > behaves and the data it writes is corrupted. You might not even be able > to recover this lost data at any point -- thus, there is no reason to > stop processing but you just skip over those records. Of course, you > need to fix the root cause, and thus you need to alert (either via logs > of the exception handler directly) and you need to start to investigate > to find the bad producer, shut it down and fix it. > > Here the dead letter queue comes into place. From my understanding, the > purpose of this feature is solely enable post debugging. I don't think > those record would be fed back at any point in time (so I don't see any > ordering issue -- a skipped record, with this regard, is just "fully > processed"). Thus, the dead letter queue should actually encode the > original records metadata (topic, partition offset etc) to enable such > debugging. I guess, this might also be possible if you just log the bad > records, but it would be harder to access (you first must find the > Streams instance that did write the log and extract the information from > there). Reading it from topic is much simpler. > > I also want to mention the following. Assume you have such a topic with > some bad records and some good records. If we always fail-fast, it's > going to be super hard to process the good data. You would need to write > an extra app that copied the data into a new topic filtering out the bad > records (or apply the map() workaround withing stream). So I don't think > that failing fast is most likely the best option in production is > necessarily, true. > > Or do you think there are scenarios, for which you can recover the > corrupted records successfully? And even if this is possible, it might > be a case for reprocessing instead of failing the whole application? > Also, if you think you can "repair" a corrupted record, should the > handler allow to return a "fixed" record? This would solve the ordering > problem. > > > > -Matthias > > > > > On 5/30/17 1:47 AM, Michael Noll wrote: > > Thanks for your work on this KIP, Eno -- much appreciated! > > > > - I think it would help to improve the KIP by adding an end-to-end code > > example that demonstrates, with the DSL and with the Processor API, how > the > > user would write a simple application that would then be augmented with > the > > proposed KIP changes to handle exceptions. It should also become much > > clearer then that e.g. the KIP would lead to different code paths for the > > happy case and any failure scenarios. > > > > - Do we have sufficient information available to make informed decisions > on > > what to do next? For example, do we know in which part of the topology > the > > record failed? `ConsumerRecord` gives us access to topic, partition, > > offset, timestamp, etc., but what about topology-related information > (e.g. > > what is the associated state store, if any)? > > > > - Only partly on-topic for the scope of this KIP, but this is about the > > bigger picture: This KIP would give users the option to send corrupted > > records to dead letter queue (quarantine topic). But, what pattern would > > we advocate to process such a dead letter queue then, e.g. how to allow > for > > retries with backoff ("If the first record in the dead letter queue fails > > again, then try the second record for the time being and go back to the > > first record at a later time"). Jay and Jan already alluded to ordering > > problems that will be caused by dead letter queues. As I said, retries > > might be out of scope but perhaps the implications should be considered > if > > possible? > > > > Also, I wrote the text below before reaching the point in the > conversation > > that this KIP's scope will be limited to exceptions in the category of > > poison pills / deserialization errors. But since Jay brought up user > code > > errors again, I decided to include it again. > > > > ----------------------------snip---------------------------- > > A meta comment: I am not sure about this split between the code for the > > happy path (e.g. map/filter/... in the DSL) from the failure path (using > > exception handlers). In Scala, for example, we can do: > > > > scala> val computation = scala.util.Try(1 / 0) > > computation: scala.util.Try[Int] = > > Failure(java.lang.ArithmeticException: / by zero) > > > > scala> computation.getOrElse(42) > > res2: Int = 42 > > > > Another example with Scala's pattern matching, which is similar to > > `KStream#branch()`: > > > > computation match { > > case scala.util.Success(x) => x * 5 > > case scala.util.Failure(_) => 42 > > } > > > > (The above isn't the most idiomatic way to handle this in Scala, but > that's > > not the point I'm trying to make here.) > > > > Hence the question I'm raising here is: Do we want to have an API where > you > > code "the happy path", and then have a different code path for failures > > (using exceptions and handlers); or should we treat both Success and > > Failure in the same way? > > > > I think the failure/exception handling approach (as proposed in this KIP) > > is well-suited for errors in the category of deserialization problems aka > > poison pills, partly because the (default) serdes are defined through > > configuration (explicit serdes however are defined through API calls). > > > > However, I'm not yet convinced that the failure/exception handling > approach > > is the best idea for user code exceptions, e.g. if you fail to guard > > against NPE in your lambdas or divide a number by zero. > > > > scala> val stream = Seq(1, 2, 3, 4, 5) > > stream: Seq[Int] = List(1, 2, 3, 4, 5) > > > > // Here: Fallback to a sane default when encountering failed records > > scala> stream.map(x => Try(1/(3 - x))).flatMap(t => > > Seq(t.getOrElse(42))) > > res19: Seq[Int] = List(0, 1, 42, -1, 0) > > > > // Here: Skip over failed records > > scala> stream.map(x => Try(1/(3 - x))).collect{ case Success(s) => s > } > > res20: Seq[Int] = List(0, 1, -1, 0) > > > > The above is more natural to me than using error handlers to define how > to > > deal with failed records (here, the value `3` causes an arithmetic > > exception). Again, it might help the KIP if we added an end-to-end > example > > for such user code errors. > > ----------------------------snip---------------------------- > > > > > > > > > > On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <jan.filip...@trivago.com> > > wrote: > > > >> Hi Jay, > >> > >> Eno mentioned that he will narrow down the scope to only ConsumerRecord > >> deserialisation. > >> > >> I am working with Database Changelogs only. I would really not like to > see > >> a dead letter queue or something > >> similliar. how am I expected to get these back in order. Just grind to > >> hold an call me on the weekend. I'll fix it > >> then in a few minutes rather spend 2 weeks ordering dead letters. (where > >> reprocessing might be even the faster fix) > >> > >> Best Jan > >> > >> > >> > >> > >> On 29.05.2017 20:23, Jay Kreps wrote: > >> > >>> - I think we should hold off on retries unless we have worked out > the > >>> full usage pattern, people can always implement their own. I think > >>> the idea > >>> is that you send the message to some kind of dead letter queue and > >>> then > >>> replay these later. This obviously destroys all semantic guarantees > >>> we are > >>> working hard to provide right now, which may be okay. > >>> > >> > >> > > > >