As part of the PR review we decided to add a metric to keep track of the number of skipped records due to deserialization. I updated the KIP to reflect that.
Thanks Eno > On Jun 23, 2017, at 10:59 AM, Eno Thereska <eno.there...@gmail.com> wrote: > > Done, thanks. I'll open a vote thread now. > > Eno >> On 23 Jun 2017, at 02:15, Matthias J. Sax <matth...@confluent.io >> <mailto:matth...@confluent.io>> wrote: >> >> I also think, that one config is better, with two default >> implementations: failing and log-and-continue >> >> However, I think we should fail by default. Similar to timestamp >> extractor as "silent" data loss is no good default behavior IMHO. >> >> >> -Matthias >> >> On 6/22/17 11:00 AM, Eno Thereska wrote: >>> Answers inline: >>> >>>> On 22 Jun 2017, at 03:26, Guozhang Wang <wangg...@gmail.com >>>> <mailto:wangg...@gmail.com>> wrote: >>>> >>>> Thanks for the updated KIP, some more comments: >>>> >>>> 1.The config name is "default.deserialization.exception.handler" while the >>>> interface class name is "RecordExceptionHandler", which is more general >>>> than the intended purpose. Could we rename the class name accordingly? >>> >>> Sure. >>> >>> >>>> >>>> 2. Could you describe the full implementation of "DefaultExceptionHandler", >>>> currently it is not clear to me how it is implemented with the configured >>>> value. >>>> >>>> In addition, I think we do not need to include an additional >>>> "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure() >>>> function is mainly used for users to pass any customized parameters that is >>>> out of the Streams library; plus adding such additional config sounds >>>> over-complicated for a default exception handler. Instead I'd suggest we >>>> just provide two handlers (or three if people feel strong about the >>>> LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one >>>> for LogAndContinueOnExceptionHandler. And we can set >>>> LogAndContinueOnExceptionHandler >>>> by default. >>>> >>> >>> That's what I had originally. Jay mentioned he preferred one default class, >>> with config options. >>> So with that approach, you'd have 2 config options, one for failing, one >>> for continuing, and the one >>> exception handler would take those options during it's configure() call. >>> >>> After checking the other exception handlers in the code, I might revert >>> back to what I originally had (2 default handlers) >>> as Guozhang also re-suggests, but still have the interface extend >>> Configurable. Guozhang, you ok with that? In that case >>> there is no need for the response config option. >>> >>> Thanks >>> Eno >>> >>> >>>> >>>> Guozhang >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com >>>> <mailto:eno.there...@gmail.com> <mailto:eno.there...@gmail.com >>>> <mailto:eno.there...@gmail.com>>> >>>> wrote: >>>> >>>>> Thanks Guozhang, >>>>> >>>>> I’ve updated the KIP and hopefully addressed all the comments so far. In >>>>> the process also changed the name of the KIP to reflect its scope better: >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+><https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+>> >>>>> deserialization+exception+handlers <https://cwiki.apache.org/ >>>>> <https://cwiki.apache.org/> <https://cwiki.apache.org/ >>>>> <https://cwiki.apache.org/>> >>>>> confluence/display/KAFKA/KIP-161:+streams+deserialization+ >>>>> exception+handlers> >>>>> >>>>> Any other feedback appreciated, otherwise I’ll start the vote soon. >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com >>>>>> <mailto:wangg...@gmail.com>> wrote: >>>>>> >>>>>> Eno, Thanks for bringing this proposal up and sorry for getting late on >>>>>> this. Here are my two cents: >>>>>> >>>>>> 1. First some meta comments regarding "fail fast" v.s. "making >>>>> progress". I >>>>>> agree that in general we should better "enforce user to do the right >>>>> thing" >>>>>> in system design, but we also need to keep in mind that Kafka is a >>>>>> multi-tenant system, i.e. from a Streams app's pov you probably would not >>>>>> control the whole streaming processing pipeline end-to-end. E.g. Your >>>>> input >>>>>> data may not be controlled by yourself; it could be written by another >>>>> app, >>>>>> or another team in your company, or even a different organization, and if >>>>>> an error happens maybe you cannot fix "to do the right thing" just by >>>>>> yourself in time. In such an environment I think it is important to leave >>>>>> the door open to let users be more resilient. So I find the current >>>>>> proposal which does leave the door open for either fail-fast or make >>>>>> progress quite reasonable. >>>>>> >>>>>> 2. On the other hand, if the question is whether we should provide a >>>>>> built-in "send to bad queue" handler from the library, I think that might >>>>>> be an overkill: with some tweaks (see my detailed comments below) on the >>>>>> API we can allow users to implement such handlers pretty easily. In >>>>> fact, I >>>>>> feel even "LogAndThresholdExceptionHandler" is not necessary as a >>>>> built-in >>>>>> handler, as it would then require users to specify the threshold via >>>>>> configs, etc. I think letting people provide such "eco-libraries" may be >>>>>> better. >>>>>> >>>>>> 3. Regarding the CRC error: today we validate CRC on both the broker end >>>>>> upon receiving produce requests and on consumer end upon receiving fetch >>>>>> responses; and if the CRC validation fails in the former case it would >>>>> not >>>>>> be appended to the broker logs. So if we do see a CRC failure on the >>>>>> consumer side it has to be that either we have a flipped bit on the >>>>> broker >>>>>> disks or over the wire. For the first case it is fatal while for the >>>>> second >>>>>> it is retriable. Unfortunately we cannot tell which case it is when >>>>> seeing >>>>>> CRC validation failures. But in either case, just skipping and making >>>>>> progress seems not a good choice here, and hence I would personally >>>>> exclude >>>>>> these errors from the general serde errors to NOT leave the door open of >>>>>> making progress. >>>>>> >>>>>> Currently such errors are thrown as KafkaException that wraps an >>>>>> InvalidRecordException, which may be too general and we could consider >>>>> just >>>>>> throwing the InvalidRecordException directly. But that could be an >>>>>> orthogonal discussion if we agrees that CRC failures should not be >>>>>> considered in this KIP. >>>>>> >>>>>> ---------------- >>>>>> >>>>>> Now some detailed comments: >>>>>> >>>>>> 4. Could we consider adding the processor context in the handle() >>>>> function >>>>>> as well? This context will be wrapping as the source node that is about >>>>> to >>>>>> process the record. This could expose more info like which task / source >>>>>> node sees this error, which timestamp of the message, etc, and also can >>>>>> allow users to implement their handlers by exposing some metrics, by >>>>>> calling context.forward() to implement the "send to bad queue" behavior >>>>> etc. >>>>>> >>>>>> 5. Could you add the string name of >>>>>> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP? >>>>>> Personally I find "default" prefix a bit misleading since we do not allow >>>>>> users to override it per-node yet. But I'm okay either way as I can see >>>>> we >>>>>> may extend it in the future and probably would like to not rename the >>>>>> config again. Also from the experience of `default partitioner` and >>>>>> `default timestamp extractor` we may also make sure that the passed in >>>>>> object can be either a string "class name" or a class object? >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com >>>>>> <mailto:jan.filip...@trivago.com>> >>>>>> wrote: >>>>>> >>>>>>> Hi Eno, >>>>>>> >>>>>>> On 07.06.2017 22:49, Eno Thereska wrote: >>>>>>> >>>>>>>> Comments inline: >>>>>>>> >>>>>>>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com >>>>>>>> <mailto:jan.filip...@trivago.com>> >>>>> wrote: >>>>>>>>> >>>>>>>>> Hi >>>>>>>>> >>>>>>>>> just my few thoughts >>>>>>>>> >>>>>>>>> On 05.06.2017 11:44, Eno Thereska wrote: >>>>>>>>> >>>>>>>>>> Hi there, >>>>>>>>>> >>>>>>>>>> Sorry for the late reply, I was out this past week. Looks like good >>>>>>>>>> progress was made with the discussions either way. Let me recap a >>>>> couple of >>>>>>>>>> points I saw into one big reply: >>>>>>>>>> >>>>>>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these >>>>>>>>>> happen in Kafka, before Kafka Streams gets a chance to inspect >>>>> anything, >>>>>>>>>> I'd like to hear the opinion of more Kafka folks like Ismael or >>>>> Jason on >>>>>>>>>> this one. Currently the documentation is not great with what to do >>>>> once a >>>>>>>>>> CRC check has failed. From looking at the code, it looks like the >>>>> client >>>>>>>>>> gets a KafkaException (bubbled up from the fetcher) and currently we >>>>> in >>>>>>>>>> streams catch this as part of poll() and fail. It might be >>>>> advantageous to >>>>>>>>>> treat CRC handling in a similar way to serialisation handling (e.g., >>>>> have >>>>>>>>>> the option to fail/skip). Let's see what the other folks say. >>>>> Worst-case we >>>>>>>>>> can do a separate KIP for that if it proved too hard to do in one go. >>>>>>>>>> >>>>>>>>> there is no reasonable way to "skip" a crc error. How can you know the >>>>>>>>> length you read was anything reasonable? you might be completely lost >>>>>>>>> inside your response. >>>>>>>>> >>>>>>>> On the client side, every record received is checked for validity. As >>>>> it >>>>>>>> happens, if the CRC check fails the exception is wrapped with a >>>>>>>> KafkaException that is thrown all the way to poll(). Assuming we change >>>>>>>> that and poll() throws a CRC exception, I was thinking we could treat >>>>> it >>>>>>>> similarly to a deserialize exception and pass it to the exception >>>>> handler >>>>>>>> to decide what to do. Default would be to fail. This might need a >>>>> Kafka KIP >>>>>>>> btw and can be done separately from this KIP, but Jan, would you find >>>>> this >>>>>>>> useful? >>>>>>>> >>>>>>> I don't think so. IMO you can not reasonably continue parsing when the >>>>>>> checksum of a message is not correct. If you are not sure you got the >>>>>>> correct length, how can you be sure to find the next record? I would >>>>> always >>>>>>> straight fail in all cases. Its to hard for me to understand why one >>>>> would >>>>>>> try to continue. I mentioned CRC's because thats the only bad pills I >>>>> ever >>>>>>> saw so far. But I am happy that it just stopped and I could check what >>>>> was >>>>>>> going on. This will also be invasive in the client code then. >>>>>>> >>>>>>> If you ask me, I am always going to vote for "grind to halt" let the >>>>>>> developers see what happened and let them fix it. It helps building good >>>>>>> kafka experiences and better software and architectures. For me this is: >>>>>>> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw? >>>>>>> <https://youtu.be/aAb7hSCtvGw?> >>>>> t=374 >>>>>>> eg. not letting unexpected input slip by. Letting unexpected input >>>>> slip by >>>>>>> is what bought us 15+years of war of all sorts of ingestion attacks. I >>>>>>> don't even dare to estimate how many missingrecords-search-teams going >>>>> be >>>>>>> formed, maybe some hackerone for stream apps :D >>>>>>> >>>>>>> Best Jan >>>>>>> >>>>>>> >>>>>>>> >>>>>>>>>> At a minimum, handling this type of exception will need to involve >>>>> the >>>>>>>>>> exactly-once (EoS) logic. We'd still allow the option of failing or >>>>>>>>>> skipping, but EoS would need to clean up by rolling back all the side >>>>>>>>>> effects from the processing so far. Matthias, how does this sound? >>>>>>>>>> >>>>>>>>> Eos will not help the record might be 5,6 repartitions down into the >>>>>>>>> topology. I haven't followed but I pray you made EoS optional! We >>>>> don't >>>>>>>>> need this and we don't want this and we will turn it off if it comes. >>>>> So I >>>>>>>>> wouldn't recommend relying on it. The option to turn it off is better >>>>> than >>>>>>>>> forcing it and still beeing unable to rollback badpills (as explained >>>>>>>>> before) >>>>>>>>> >>>>>>>> Yeah as Matthias mentioned EoS is optional. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Eno >>>>>>>> >>>>>>>> >>>>>>>> 6. Will add an end-to-end example as Michael suggested. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Eno >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io >>>>>>>>>> <mailto:matth...@confluent.io>> >>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> What I don't understand is this: >>>>>>>>>>> >>>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>>>>>>> done >>>>>>>>>>>> >>>>>>>>>>> If you have many producers that work fine and a new "bad" producer >>>>>>>>>>> starts up and writes bad data into your input topic, your Streams >>>>> app >>>>>>>>>>> dies but all your producers, including the bad one, keep writing. >>>>>>>>>>> >>>>>>>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted >>>>> date >>>>>>>>>>> from the topic? It might take some time to identify the root cause >>>>> and >>>>>>>>>>> stop the bad producer. Up to this point you get good and bad data >>>>> into >>>>>>>>>>> your Streams input topic. If Streams app in not able to skip over >>>>> those >>>>>>>>>>> bad records, how would you get all the good data from the topic? Not >>>>>>>>>>> saying it's not possible, but it's extra work copying the data with >>>>> a >>>>>>>>>>> new non-Streams consumer-producer-app into a new topic and than feed >>>>>>>>>>> your Streams app from this new topic -- you also need to update all >>>>>>>>>>> your >>>>>>>>>>> upstream producers to write to the new topic. >>>>>>>>>>> >>>>>>>>>>> Thus, if you want to fail fast, you can still do this. And after you >>>>>>>>>>> detected and fixed the bad producer you might just reconfigure your >>>>> app >>>>>>>>>>> to skip bad records until it reaches the good part of the data. >>>>>>>>>>> Afterwards, you could redeploy with fail-fast again. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thus, for this pattern, I actually don't see any reason why to stop >>>>> the >>>>>>>>>>> Streams app at all. If you have a callback, and use the callback to >>>>>>>>>>> raise an alert (and maybe get the bad data into a bad record >>>>> queue), it >>>>>>>>>>> will not take longer to identify and stop the "bad" producer. But >>>>> for >>>>>>>>>>> this case, you have zero downtime for your Streams app. >>>>>>>>>>> >>>>>>>>>>> This seems to be much simpler. Or do I miss anything? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Having said this, I agree that the "threshold based callback" might >>>>> be >>>>>>>>>>> questionable. But as you argue for strict "fail-fast", I want to >>>>> argue >>>>>>>>>>> that this must not always be the best pattern to apply and that the >>>>>>>>>>> overall KIP idea is super useful from my point of view. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote: >>>>>>>>>>> >>>>>>>>>>>> Could not agree more! >>>>>>>>>>>> >>>>>>>>>>>> But then I think the easiest is still: print exception and die. >>>>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>>>>>>> done >>>>>>>>>>>> >>>>>>>>>>>> All the other ways to recover a pipeline that was processing >>>>> partially >>>>>>>>>>>> all the time >>>>>>>>>>>> and suddenly went over a "I cant take it anymore" threshold is not >>>>>>>>>>>> straight forward IMO. >>>>>>>>>>>> >>>>>>>>>>>> How to find the offset, when it became to bad when it is not the >>>>>>>>>>>> latest >>>>>>>>>>>> commited one? >>>>>>>>>>>> How to reset there? with some reasonable stuff in your rockses? >>>>>>>>>>>> >>>>>>>>>>>> If one would do the following. The continuing Handler would measure >>>>>>>>>>>> for >>>>>>>>>>>> a threshold and >>>>>>>>>>>> would terminate after a certain threshold has passed (per task). >>>>> Then >>>>>>>>>>>> one can use offset commit/ flush intervals >>>>>>>>>>>> to make reasonable assumption of how much is slipping by + you get >>>>> an >>>>>>>>>>>> easy recovery when it gets to bad >>>>>>>>>>>> + you could also account for "in processing" records. >>>>>>>>>>>> >>>>>>>>>>>> Setting this threshold to zero would cover all cases with 1 >>>>>>>>>>>> implementation. It is still beneficial to have it pluggable >>>>>>>>>>>> >>>>>>>>>>>> Again CRC-Errors are the only bad pills we saw in production for >>>>> now. >>>>>>>>>>>> >>>>>>>>>>>> Best Jan >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Jan, I agree with you philosophically. I think one practical >>>>>>>>>>>>> challenge >>>>>>>>>>>>> has >>>>>>>>>>>>> to do with data formats. Many people use untyped events, so there >>>>> is >>>>>>>>>>>>> simply >>>>>>>>>>>>> no guarantee on the form of the input. E.g. many companies use >>>>> JSON >>>>>>>>>>>>> without >>>>>>>>>>>>> any kind of schema so it becomes very hard to assert anything >>>>> about >>>>>>>>>>>>> the >>>>>>>>>>>>> input which makes these programs very fragile to the "one >>>>> accidental >>>>>>>>>>>>> message publication that creates an unsolvable problem. >>>>>>>>>>>>> >>>>>>>>>>>>> For that reason I do wonder if limiting to just serialization >>>>>>>>>>>>> actually >>>>>>>>>>>>> gets >>>>>>>>>>>>> you a useful solution. For JSON it will help with the problem of >>>>>>>>>>>>> non-parseable JSON, but sounds like it won't help in the case >>>>> where >>>>>>>>>>>>> the >>>>>>>>>>>>> JSON is well-formed but does not have any of the fields you expect >>>>>>>>>>>>> and >>>>>>>>>>>>> depend on for your processing. I expect the reason for limiting >>>>> the >>>>>>>>>>>>> scope >>>>>>>>>>>>> is it is pretty hard to reason about correctness for anything that >>>>>>>>>>>>> stops in >>>>>>>>>>>>> the middle of processing an operator DAG? >>>>>>>>>>>>> >>>>>>>>>>>>> -Jay >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak < >>>>>>>>>>>>> jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> IMHO your doing it wrong then. + building to much support into the >>>>>>>>>>>>>> kafka >>>>>>>>>>>>>> eco system is very counterproductive in fostering a happy >>>>> userbase >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about >>>>>>>>>>>>>>> giving >>>>>>>>>>>>>>> people options and there are times when you don't want to fail >>>>>>>>>>>>>>> fast. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak < >>>>> jan.filip...@trivago.com <mailto:jan.filip...@trivago.com> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>> That greatly complicates monitoring. Fail Fast gives you that >>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>> monitor only the lag of all your apps >>>>>>>>>>>>>>>> you are completely covered. With that sort of new application >>>>>>>>>>>>>>>> Monitoring >>>>>>>>>>>>>>>> is very much more complicated as >>>>>>>>>>>>>>>> you know need to monitor fail % of some special apps aswell. >>>>> In my >>>>>>>>>>>>>>>> opinion that is a huge downside already. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2. >>>>>>>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be >>>>> the >>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>> that is broken, it might be just your app >>>>>>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got >>>>>>>>>>>>>>>> partitioned >>>>>>>>>>>>>>>> away from that registry. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 3. When you get alerted because of to high fail percentage. >>>>> what >>>>>>>>>>>>>>>> are the >>>>>>>>>>>>>>>> steps you gonna do? >>>>>>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much >>>>> time >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> find a good reprocess offset. >>>>>>>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much >>>>>>>>>>>>>>>> lost. >>>>>>>>>>>>>>>> This routine is nonsense. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Dead letter queues would be the worst possible addition to the >>>>>>>>>>>>>>>> kafka >>>>>>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the >>>>> architecture >>>>>>>>>>>>>>>> of having clients falling behind is a valid option. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far >>>>> is >>>>>>>>>>>>>>>> crc >>>>>>>>>>>>>>>> errors. any plans for those? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There >>>>> are >>>>>>>>>>>>>>>>> plenty >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> times when you don't want to fail-fast and must attempt to >>>>> make >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> progress. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of >>>>>>>>>>>>>>>>> course if >>>>>>>>>>>>>>>>> every record is failing, then you probably do want to give up. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax < >>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> First a meta comment. KIP discussion should take place on the >>>>> dev >>>>>>>>>>>>>>>>> list >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both >>>>>>>>>>>>>>>>>> lists. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of >>>>>>>>>>>>>>>>> sense to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> focus on deserialization exceptions for now. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to >>>>>>>>>>>>>>>>>> fail a >>>>>>>>>>>>>>>>>> task and wipe out the store to repair it via recreation from >>>>> the >>>>>>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I >>>>> want >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> bring >>>>>>>>>>>>>>>>>> it up to design the first step in a way such that we can get >>>>>>>>>>>>>>>>>> there (if >>>>>>>>>>>>>>>>>> we think it's a reasonable idea). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I >>>>>>>>>>>>>>>>>> think that >>>>>>>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I >>>>>>>>>>>>>>>>>> have in >>>>>>>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the >>>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer >>>>>>>>>>>>>>>>>> miss >>>>>>>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not >>>>> even >>>>>>>>>>>>>>>>>> be able >>>>>>>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no >>>>>>>>>>>>>>>>>> reason to >>>>>>>>>>>>>>>>>> stop processing but you just skip over those records. Of >>>>>>>>>>>>>>>>>> course, you >>>>>>>>>>>>>>>>>> need to fix the root cause, and thus you need to alert >>>>> (either >>>>>>>>>>>>>>>>>> via logs >>>>>>>>>>>>>>>>>> of the exception handler directly) and you need to start to >>>>>>>>>>>>>>>>>> investigate >>>>>>>>>>>>>>>>>> to find the bad producer, shut it down and fix it. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Here the dead letter queue comes into place. From my >>>>>>>>>>>>>>>>>> understanding, the >>>>>>>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I >>>>> don't >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> those record would be fed back at any point in time (so I >>>>> don't >>>>>>>>>>>>>>>>>> see any >>>>>>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just >>>>>>>>>>>>>>>>>> "fully >>>>>>>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually >>>>> encode >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> original records metadata (topic, partition offset etc) to >>>>>>>>>>>>>>>>>> enable >>>>>>>>>>>>>>>>>> such >>>>>>>>>>>>>>>>>> debugging. I guess, this might also be possible if you just >>>>> log >>>>>>>>>>>>>>>>>> the bad >>>>>>>>>>>>>>>>>> records, but it would be harder to access (you first must >>>>> find >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> Streams instance that did write the log and extract the >>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> there). Reading it from topic is much simpler. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I also want to mention the following. Assume you have such a >>>>>>>>>>>>>>>>>> topic with >>>>>>>>>>>>>>>>>> some bad records and some good records. If we always >>>>> fail-fast, >>>>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>>>> going to be super hard to process the good data. You would >>>>> need >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>> an extra app that copied the data into a new topic filtering >>>>>>>>>>>>>>>>>> out the >>>>>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I >>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> that failing fast is most likely the best option in >>>>> production >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> necessarily, true. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Or do you think there are scenarios, for which you can >>>>> recover >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> corrupted records successfully? And even if this is >>>>> possible, it >>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>> be a case for reprocessing instead of failing the whole >>>>>>>>>>>>>>>>>> application? >>>>>>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record, >>>>> should >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve >>>>> the >>>>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an >>>>>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the >>>>> Processor >>>>>>>>>>>>>>>>>>> API, >>>>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> user would write a simple application that would then be >>>>>>>>>>>>>>>>>>> augmented >>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> proposed KIP changes to handle exceptions. It should also >>>>>>>>>>>>>>>>>>> become much >>>>>>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code >>>>>>>>>>>>>>>>>>> paths for >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> happy case and any failure scenarios. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - Do we have sufficient information available to make >>>>> informed >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> decisions >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> what to do next? For example, do we know in which part of >>>>> the >>>>>>>>>>>>>>>>>>> topology >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic, >>>>>>>>>>>>>>>>>>> partition, >>>>>>>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related >>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> what is the associated state store, if any)? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this >>>>> is >>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send >>>>>>>>>>>>>>>>>>> corrupted >>>>>>>>>>>>>>>>>>> records to dead letter queue (quarantine topic). But, what >>>>>>>>>>>>>>>>>>> pattern >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g. >>>>> how to >>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter >>>>>>>>>>>>>>>>>>> queue >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> fails >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> again, then try the second record for the time being and go >>>>> back >>>>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> first record at a later time"). Jay and Jan already alluded >>>>> to >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> problems that will be caused by dead letter queues. As I said, >>>>>>>>>>>>>>>>> retries >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> might be out of scope but perhaps the implications should be >>>>>>>>>>>>>>>>>>> considered >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> possible? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in >>>>> the >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> conversation >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the >>>>>>>>>>>>>>>>>>> category of >>>>>>>>>>>>>>>>>>> poison pills / deserialization errors. But since Jay >>>>> brought >>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> errors again, I decided to include it again. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ----------------------------snip-------------------------- >>>>> -- >>>>>>>>>>>>>>>>>>> A meta comment: I am not sure about this split between the >>>>>>>>>>>>>>>>>>> code for >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure >>>>>>>>>>>>>>>>>>> path >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> (using >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> exception handlers). In Scala, for example, we can do: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> scala> val computation = scala.util.Try(1 / 0) >>>>>>>>>>>>>>>>>>> computation: scala.util.Try[Int] = >>>>>>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> scala> computation.getOrElse(42) >>>>>>>>>>>>>>>>>>> res2: Int = 42 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Another example with Scala's pattern matching, which is >>>>>>>>>>>>>>>>>>> similar to >>>>>>>>>>>>>>>>>>> `KStream#branch()`: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> computation match { >>>>>>>>>>>>>>>>>>> case scala.util.Success(x) => x * 5 >>>>>>>>>>>>>>>>>>> case scala.util.Failure(_) => 42 >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in >>>>>>>>>>>>>>>>>>> Scala, >>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> that's >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> not the point I'm trying to make here.) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have >>>>> an >>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> code "the happy path", and then have a different code path >>>>> for >>>>>>>>>>>>>>>>>>> failures >>>>>>>>>>>>>>>>>>> (using exceptions and handlers); or should we treat both >>>>>>>>>>>>>>>>>>> Success and >>>>>>>>>>>>>>>>>>> Failure in the same way? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think the failure/exception handling approach (as >>>>> proposed in >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> KIP) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> is well-suited for errors in the category of deserialization >>>>>>>>>>>>>>>>> problems >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> aka >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> poison pills, partly because the (default) serdes are defined >>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> configuration (explicit serdes however are defined through >>>>> API >>>>>>>>>>>>>>>>>>> calls). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception >>>>>>>>>>>>>>>>>>> handling >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail >>>>> to >>>>>>>>>>>>>>>>>>> guard >>>>>>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> scala> val stream = Seq(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>>>>> stream: Seq[Int] = List(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> // Here: Fallback to a sane default when encountering >>>>>>>>>>>>>>>>>>> failed >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).flatMap(t => >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Seq(t.getOrElse(42))) >>>>>>>>>>>>>>>>>>> res19: Seq[Int] = List(0, 1, 42, -1, 0) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> // Here: Skip over failed records >>>>>>>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).collect{ case >>>>>>>>>>>>>>>>>>> Success(s) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> => s >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> res20: Seq[Int] = List(0, 1, -1, 0) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The above is more natural to me than using error handlers to >>>>>>>>>>>>>>>>>>> define >>>>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> deal with failed records (here, the value `3` causes an >>>>>>>>>>>>>>>>>>> arithmetic >>>>>>>>>>>>>>>>>>> exception). Again, it might help the KIP if we added an >>>>>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> for such user code errors. >>>>>>>>>>>>>>>>>>> ----------------------------snip-------------------------- >>>>> -- >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Jay, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> ConsumerRecord >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> deserialisation. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I am working with Database Changelogs only. I would really >>>>> not >>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>> a dead letter queue or something >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> similliar. how am I expected to get these back in order. >>>>> Just >>>>>>>>>>>>>>>>>>>> grind >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it >>>>>>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead >>>>>>>>>>>>>>>>>>>> letters. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> (where >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> reprocessing might be even the faster fix) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - I think we should hold off on retries unless we >>>>> have >>>>>>>>>>>>>>>>>>>> worked >>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> full usage pattern, people can always implement their >>>>>>>>>>>>>>>>>>> own. I >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> the idea >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> is that you send the message to some kind of dead >>>>>>>>>>>>>>>>>>>>> letter queue >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> replay these later. This obviously destroys all >>>>> semantic >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> guarantees >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> we are >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> working hard to provide right now, which may be okay. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>> >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >