Done, thanks. I'll open a vote thread now.
Eno
> On 23 Jun 2017, at 02:15, Matthias J. Sax <matth...@confluent.io> wrote:
>
> I also think, that one config is better, with two default
> implementations: failing and log-and-continue
>
> However, I think we should fail by default. Similar to timestamp
> extractor as "silent" data loss is no good default behavior IMHO.
>
>
> -Matthias
>
> On 6/22/17 11:00 AM, Eno Thereska wrote:
>> Answers inline:
>>
>>> On 22 Jun 2017, at 03:26, Guozhang Wang <wangg...@gmail.com> wrote:
>>>
>>> Thanks for the updated KIP, some more comments:
>>>
>>> 1.The config name is "default.deserialization.exception.handler" while the
>>> interface class name is "RecordExceptionHandler", which is more general
>>> than the intended purpose. Could we rename the class name accordingly?
>>
>> Sure.
>>
>>
>>>
>>> 2. Could you describe the full implementation of "DefaultExceptionHandler",
>>> currently it is not clear to me how it is implemented with the configured
>>> value.
>>>
>>> In addition, I think we do not need to include an additional
>>> "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure()
>>> function is mainly used for users to pass any customized parameters that is
>>> out of the Streams library; plus adding such additional config sounds
>>> over-complicated for a default exception handler. Instead I'd suggest we
>>> just provide two handlers (or three if people feel strong about the
>>> LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one
>>> for LogAndContinueOnExceptionHandler. And we can set
>>> LogAndContinueOnExceptionHandler
>>> by default.
>>>
>>
>> That's what I had originally. Jay mentioned he preferred one default class,
>> with config options.
>> So with that approach, you'd have 2 config options, one for failing, one for
>> continuing, and the one
>> exception handler would take those options during it's configure() call.
>>
>> After checking the other exception handlers in the code, I might revert back
>> to what I originally had (2 default handlers)
>> as Guozhang also re-suggests, but still have the interface extend
>> Configurable. Guozhang, you ok with that? In that case
>> there is no need for the response config option.
>>
>> Thanks
>> Eno
>>
>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com
>>> <mailto:eno.there...@gmail.com> <mailto:eno.there...@gmail.com
>>> <mailto:eno.there...@gmail.com>>>
>>> wrote:
>>>
>>>> Thanks Guozhang,
>>>>
>>>> I’ve updated the KIP and hopefully addressed all the comments so far. In
>>>> the process also changed the name of the KIP to reflect its scope better:
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+><https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+>>
>>>> deserialization+exception+handlers <https://cwiki.apache.org/
>>>> <https://cwiki.apache.org/> <https://cwiki.apache.org/
>>>> <https://cwiki.apache.org/>>
>>>> confluence/display/KAFKA/KIP-161:+streams+deserialization+
>>>> exception+handlers>
>>>>
>>>> Any other feedback appreciated, otherwise I’ll start the vote soon.
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com
>>>>> <mailto:wangg...@gmail.com>> wrote:
>>>>>
>>>>> Eno, Thanks for bringing this proposal up and sorry for getting late on
>>>>> this. Here are my two cents:
>>>>>
>>>>> 1. First some meta comments regarding "fail fast" v.s. "making
>>>> progress". I
>>>>> agree that in general we should better "enforce user to do the right
>>>> thing"
>>>>> in system design, but we also need to keep in mind that Kafka is a
>>>>> multi-tenant system, i.e. from a Streams app's pov you probably would not
>>>>> control the whole streaming processing pipeline end-to-end. E.g. Your
>>>> input
>>>>> data may not be controlled by yourself; it could be written by another
>>>> app,
>>>>> or another team in your company, or even a different organization, and if
>>>>> an error happens maybe you cannot fix "to do the right thing" just by
>>>>> yourself in time. In such an environment I think it is important to leave
>>>>> the door open to let users be more resilient. So I find the current
>>>>> proposal which does leave the door open for either fail-fast or make
>>>>> progress quite reasonable.
>>>>>
>>>>> 2. On the other hand, if the question is whether we should provide a
>>>>> built-in "send to bad queue" handler from the library, I think that might
>>>>> be an overkill: with some tweaks (see my detailed comments below) on the
>>>>> API we can allow users to implement such handlers pretty easily. In
>>>> fact, I
>>>>> feel even "LogAndThresholdExceptionHandler" is not necessary as a
>>>> built-in
>>>>> handler, as it would then require users to specify the threshold via
>>>>> configs, etc. I think letting people provide such "eco-libraries" may be
>>>>> better.
>>>>>
>>>>> 3. Regarding the CRC error: today we validate CRC on both the broker end
>>>>> upon receiving produce requests and on consumer end upon receiving fetch
>>>>> responses; and if the CRC validation fails in the former case it would
>>>> not
>>>>> be appended to the broker logs. So if we do see a CRC failure on the
>>>>> consumer side it has to be that either we have a flipped bit on the
>>>> broker
>>>>> disks or over the wire. For the first case it is fatal while for the
>>>> second
>>>>> it is retriable. Unfortunately we cannot tell which case it is when
>>>> seeing
>>>>> CRC validation failures. But in either case, just skipping and making
>>>>> progress seems not a good choice here, and hence I would personally
>>>> exclude
>>>>> these errors from the general serde errors to NOT leave the door open of
>>>>> making progress.
>>>>>
>>>>> Currently such errors are thrown as KafkaException that wraps an
>>>>> InvalidRecordException, which may be too general and we could consider
>>>> just
>>>>> throwing the InvalidRecordException directly. But that could be an
>>>>> orthogonal discussion if we agrees that CRC failures should not be
>>>>> considered in this KIP.
>>>>>
>>>>> ----------------
>>>>>
>>>>> Now some detailed comments:
>>>>>
>>>>> 4. Could we consider adding the processor context in the handle()
>>>> function
>>>>> as well? This context will be wrapping as the source node that is about
>>>> to
>>>>> process the record. This could expose more info like which task / source
>>>>> node sees this error, which timestamp of the message, etc, and also can
>>>>> allow users to implement their handlers by exposing some metrics, by
>>>>> calling context.forward() to implement the "send to bad queue" behavior
>>>> etc.
>>>>>
>>>>> 5. Could you add the string name of
>>>>> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP?
>>>>> Personally I find "default" prefix a bit misleading since we do not allow
>>>>> users to override it per-node yet. But I'm okay either way as I can see
>>>> we
>>>>> may extend it in the future and probably would like to not rename the
>>>>> config again. Also from the experience of `default partitioner` and
>>>>> `default timestamp extractor` we may also make sure that the passed in
>>>>> object can be either a string "class name" or a class object?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com
>>>>> <mailto:jan.filip...@trivago.com>>
>>>>> wrote:
>>>>>
>>>>>> Hi Eno,
>>>>>>
>>>>>> On 07.06.2017 22:49, Eno Thereska wrote:
>>>>>>
>>>>>>> Comments inline:
>>>>>>>
>>>>>>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com
>>>>>>> <mailto:jan.filip...@trivago.com>>
>>>> wrote:
>>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> just my few thoughts
>>>>>>>>
>>>>>>>> On 05.06.2017 11:44, Eno Thereska wrote:
>>>>>>>>
>>>>>>>>> Hi there,
>>>>>>>>>
>>>>>>>>> Sorry for the late reply, I was out this past week. Looks like good
>>>>>>>>> progress was made with the discussions either way. Let me recap a
>>>> couple of
>>>>>>>>> points I saw into one big reply:
>>>>>>>>>
>>>>>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these
>>>>>>>>> happen in Kafka, before Kafka Streams gets a chance to inspect
>>>> anything,
>>>>>>>>> I'd like to hear the opinion of more Kafka folks like Ismael or
>>>> Jason on
>>>>>>>>> this one. Currently the documentation is not great with what to do
>>>> once a
>>>>>>>>> CRC check has failed. From looking at the code, it looks like the
>>>> client
>>>>>>>>> gets a KafkaException (bubbled up from the fetcher) and currently we
>>>> in
>>>>>>>>> streams catch this as part of poll() and fail. It might be
>>>> advantageous to
>>>>>>>>> treat CRC handling in a similar way to serialisation handling (e.g.,
>>>> have
>>>>>>>>> the option to fail/skip). Let's see what the other folks say.
>>>> Worst-case we
>>>>>>>>> can do a separate KIP for that if it proved too hard to do in one go.
>>>>>>>>>
>>>>>>>> there is no reasonable way to "skip" a crc error. How can you know the
>>>>>>>> length you read was anything reasonable? you might be completely lost
>>>>>>>> inside your response.
>>>>>>>>
>>>>>>> On the client side, every record received is checked for validity. As
>>>> it
>>>>>>> happens, if the CRC check fails the exception is wrapped with a
>>>>>>> KafkaException that is thrown all the way to poll(). Assuming we change
>>>>>>> that and poll() throws a CRC exception, I was thinking we could treat
>>>> it
>>>>>>> similarly to a deserialize exception and pass it to the exception
>>>> handler
>>>>>>> to decide what to do. Default would be to fail. This might need a
>>>> Kafka KIP
>>>>>>> btw and can be done separately from this KIP, but Jan, would you find
>>>> this
>>>>>>> useful?
>>>>>>>
>>>>>> I don't think so. IMO you can not reasonably continue parsing when the
>>>>>> checksum of a message is not correct. If you are not sure you got the
>>>>>> correct length, how can you be sure to find the next record? I would
>>>> always
>>>>>> straight fail in all cases. Its to hard for me to understand why one
>>>> would
>>>>>> try to continue. I mentioned CRC's because thats the only bad pills I
>>>> ever
>>>>>> saw so far. But I am happy that it just stopped and I could check what
>>>> was
>>>>>> going on. This will also be invasive in the client code then.
>>>>>>
>>>>>> If you ask me, I am always going to vote for "grind to halt" let the
>>>>>> developers see what happened and let them fix it. It helps building good
>>>>>> kafka experiences and better software and architectures. For me this is:
>>>>>> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?
>>>>>> <https://youtu.be/aAb7hSCtvGw?>
>>>> t=374
>>>>>> eg. not letting unexpected input slip by. Letting unexpected input
>>>> slip by
>>>>>> is what bought us 15+years of war of all sorts of ingestion attacks. I
>>>>>> don't even dare to estimate how many missingrecords-search-teams going
>>>> be
>>>>>> formed, maybe some hackerone for stream apps :D
>>>>>>
>>>>>> Best Jan
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>>> At a minimum, handling this type of exception will need to involve
>>>> the
>>>>>>>>> exactly-once (EoS) logic. We'd still allow the option of failing or
>>>>>>>>> skipping, but EoS would need to clean up by rolling back all the side
>>>>>>>>> effects from the processing so far. Matthias, how does this sound?
>>>>>>>>>
>>>>>>>> Eos will not help the record might be 5,6 repartitions down into the
>>>>>>>> topology. I haven't followed but I pray you made EoS optional! We
>>>> don't
>>>>>>>> need this and we don't want this and we will turn it off if it comes.
>>>> So I
>>>>>>>> wouldn't recommend relying on it. The option to turn it off is better
>>>> than
>>>>>>>> forcing it and still beeing unable to rollback badpills (as explained
>>>>>>>> before)
>>>>>>>>
>>>>>>> Yeah as Matthias mentioned EoS is optional.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Eno
>>>>>>>
>>>>>>>
>>>>>>> 6. Will add an end-to-end example as Michael suggested.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Eno
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io
>>>>>>>>> <mailto:matth...@confluent.io>>
>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> What I don't understand is this:
>>>>>>>>>>
>>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>>>>>> done
>>>>>>>>>>>
>>>>>>>>>> If you have many producers that work fine and a new "bad" producer
>>>>>>>>>> starts up and writes bad data into your input topic, your Streams
>>>> app
>>>>>>>>>> dies but all your producers, including the bad one, keep writing.
>>>>>>>>>>
>>>>>>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted
>>>> date
>>>>>>>>>> from the topic? It might take some time to identify the root cause
>>>> and
>>>>>>>>>> stop the bad producer. Up to this point you get good and bad data
>>>> into
>>>>>>>>>> your Streams input topic. If Streams app in not able to skip over
>>>> those
>>>>>>>>>> bad records, how would you get all the good data from the topic? Not
>>>>>>>>>> saying it's not possible, but it's extra work copying the data with
>>>> a
>>>>>>>>>> new non-Streams consumer-producer-app into a new topic and than feed
>>>>>>>>>> your Streams app from this new topic -- you also need to update all
>>>>>>>>>> your
>>>>>>>>>> upstream producers to write to the new topic.
>>>>>>>>>>
>>>>>>>>>> Thus, if you want to fail fast, you can still do this. And after you
>>>>>>>>>> detected and fixed the bad producer you might just reconfigure your
>>>> app
>>>>>>>>>> to skip bad records until it reaches the good part of the data.
>>>>>>>>>> Afterwards, you could redeploy with fail-fast again.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thus, for this pattern, I actually don't see any reason why to stop
>>>> the
>>>>>>>>>> Streams app at all. If you have a callback, and use the callback to
>>>>>>>>>> raise an alert (and maybe get the bad data into a bad record
>>>> queue), it
>>>>>>>>>> will not take longer to identify and stop the "bad" producer. But
>>>> for
>>>>>>>>>> this case, you have zero downtime for your Streams app.
>>>>>>>>>>
>>>>>>>>>> This seems to be much simpler. Or do I miss anything?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Having said this, I agree that the "threshold based callback" might
>>>> be
>>>>>>>>>> questionable. But as you argue for strict "fail-fast", I want to
>>>> argue
>>>>>>>>>> that this must not always be the best pattern to apply and that the
>>>>>>>>>> overall KIP idea is super useful from my point of view.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>>>>>>>>>
>>>>>>>>>>> Could not agree more!
>>>>>>>>>>>
>>>>>>>>>>> But then I think the easiest is still: print exception and die.
>>>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>>>>>> done
>>>>>>>>>>>
>>>>>>>>>>> All the other ways to recover a pipeline that was processing
>>>> partially
>>>>>>>>>>> all the time
>>>>>>>>>>> and suddenly went over a "I cant take it anymore" threshold is not
>>>>>>>>>>> straight forward IMO.
>>>>>>>>>>>
>>>>>>>>>>> How to find the offset, when it became to bad when it is not the
>>>>>>>>>>> latest
>>>>>>>>>>> commited one?
>>>>>>>>>>> How to reset there? with some reasonable stuff in your rockses?
>>>>>>>>>>>
>>>>>>>>>>> If one would do the following. The continuing Handler would measure
>>>>>>>>>>> for
>>>>>>>>>>> a threshold and
>>>>>>>>>>> would terminate after a certain threshold has passed (per task).
>>>> Then
>>>>>>>>>>> one can use offset commit/ flush intervals
>>>>>>>>>>> to make reasonable assumption of how much is slipping by + you get
>>>> an
>>>>>>>>>>> easy recovery when it gets to bad
>>>>>>>>>>> + you could also account for "in processing" records.
>>>>>>>>>>>
>>>>>>>>>>> Setting this threshold to zero would cover all cases with 1
>>>>>>>>>>> implementation. It is still beneficial to have it pluggable
>>>>>>>>>>>
>>>>>>>>>>> Again CRC-Errors are the only bad pills we saw in production for
>>>> now.
>>>>>>>>>>>
>>>>>>>>>>> Best Jan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Jan, I agree with you philosophically. I think one practical
>>>>>>>>>>>> challenge
>>>>>>>>>>>> has
>>>>>>>>>>>> to do with data formats. Many people use untyped events, so there
>>>> is
>>>>>>>>>>>> simply
>>>>>>>>>>>> no guarantee on the form of the input. E.g. many companies use
>>>> JSON
>>>>>>>>>>>> without
>>>>>>>>>>>> any kind of schema so it becomes very hard to assert anything
>>>> about
>>>>>>>>>>>> the
>>>>>>>>>>>> input which makes these programs very fragile to the "one
>>>> accidental
>>>>>>>>>>>> message publication that creates an unsolvable problem.
>>>>>>>>>>>>
>>>>>>>>>>>> For that reason I do wonder if limiting to just serialization
>>>>>>>>>>>> actually
>>>>>>>>>>>> gets
>>>>>>>>>>>> you a useful solution. For JSON it will help with the problem of
>>>>>>>>>>>> non-parseable JSON, but sounds like it won't help in the case
>>>> where
>>>>>>>>>>>> the
>>>>>>>>>>>> JSON is well-formed but does not have any of the fields you expect
>>>>>>>>>>>> and
>>>>>>>>>>>> depend on for your processing. I expect the reason for limiting
>>>> the
>>>>>>>>>>>> scope
>>>>>>>>>>>> is it is pretty hard to reason about correctness for anything that
>>>>>>>>>>>> stops in
>>>>>>>>>>>> the middle of processing an operator DAG?
>>>>>>>>>>>>
>>>>>>>>>>>> -Jay
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <
>>>>>>>>>>>> jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> IMHO your doing it wrong then. + building to much support into the
>>>>>>>>>>>>> kafka
>>>>>>>>>>>>> eco system is very counterproductive in fostering a happy
>>>> userbase
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about
>>>>>>>>>>>>>> giving
>>>>>>>>>>>>>> people options and there are times when you don't want to fail
>>>>>>>>>>>>>> fast.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <
>>>> jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>> That greatly complicates monitoring. Fail Fast gives you that
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>> monitor only the lag of all your apps
>>>>>>>>>>>>>>> you are completely covered. With that sort of new application
>>>>>>>>>>>>>>> Monitoring
>>>>>>>>>>>>>>> is very much more complicated as
>>>>>>>>>>>>>>> you know need to monitor fail % of some special apps aswell.
>>>> In my
>>>>>>>>>>>>>>> opinion that is a huge downside already.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be
>>>> the
>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>> that is broken, it might be just your app
>>>>>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got
>>>>>>>>>>>>>>> partitioned
>>>>>>>>>>>>>>> away from that registry.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3. When you get alerted because of to high fail percentage.
>>>> what
>>>>>>>>>>>>>>> are the
>>>>>>>>>>>>>>> steps you gonna do?
>>>>>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much
>>>> time
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> find a good reprocess offset.
>>>>>>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much
>>>>>>>>>>>>>>> lost.
>>>>>>>>>>>>>>> This routine is nonsense.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dead letter queues would be the worst possible addition to the
>>>>>>>>>>>>>>> kafka
>>>>>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the
>>>> architecture
>>>>>>>>>>>>>>> of having clients falling behind is a valid option.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far
>>>> is
>>>>>>>>>>>>>>> crc
>>>>>>>>>>>>>>> errors. any plans for those?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There
>>>> are
>>>>>>>>>>>>>>>> plenty
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> times when you don't want to fail-fast and must attempt to
>>>> make
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> progress.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of
>>>>>>>>>>>>>>>> course if
>>>>>>>>>>>>>>>> every record is failing, then you probably do want to give up.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <
>>>>>>>>>>>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> First a meta comment. KIP discussion should take place on the
>>>> dev
>>>>>>>>>>>>>>>> list
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both
>>>>>>>>>>>>>>>>> lists.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of
>>>>>>>>>>>>>>>> sense to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to
>>>>>>>>>>>>>>>>> fail a
>>>>>>>>>>>>>>>>> task and wipe out the store to repair it via recreation from
>>>> the
>>>>>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I
>>>> want
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> bring
>>>>>>>>>>>>>>>>> it up to design the first step in a way such that we can get
>>>>>>>>>>>>>>>>> there (if
>>>>>>>>>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I
>>>>>>>>>>>>>>>>> think that
>>>>>>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I
>>>>>>>>>>>>>>>>> have in
>>>>>>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the
>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer
>>>>>>>>>>>>>>>>> miss
>>>>>>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not
>>>> even
>>>>>>>>>>>>>>>>> be able
>>>>>>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no
>>>>>>>>>>>>>>>>> reason to
>>>>>>>>>>>>>>>>> stop processing but you just skip over those records. Of
>>>>>>>>>>>>>>>>> course, you
>>>>>>>>>>>>>>>>> need to fix the root cause, and thus you need to alert
>>>> (either
>>>>>>>>>>>>>>>>> via logs
>>>>>>>>>>>>>>>>> of the exception handler directly) and you need to start to
>>>>>>>>>>>>>>>>> investigate
>>>>>>>>>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Here the dead letter queue comes into place. From my
>>>>>>>>>>>>>>>>> understanding, the
>>>>>>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I
>>>> don't
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>> those record would be fed back at any point in time (so I
>>>> don't
>>>>>>>>>>>>>>>>> see any
>>>>>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just
>>>>>>>>>>>>>>>>> "fully
>>>>>>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually
>>>> encode
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> original records metadata (topic, partition offset etc) to
>>>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>> debugging. I guess, this might also be possible if you just
>>>> log
>>>>>>>>>>>>>>>>> the bad
>>>>>>>>>>>>>>>>> records, but it would be harder to access (you first must
>>>> find
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> Streams instance that did write the log and extract the
>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I also want to mention the following. Assume you have such a
>>>>>>>>>>>>>>>>> topic with
>>>>>>>>>>>>>>>>> some bad records and some good records. If we always
>>>> fail-fast,
>>>>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>> going to be super hard to process the good data. You would
>>>> need
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>> an extra app that copied the data into a new topic filtering
>>>>>>>>>>>>>>>>> out the
>>>>>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I
>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>> that failing fast is most likely the best option in
>>>> production
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> necessarily, true.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Or do you think there are scenarios, for which you can
>>>> recover
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> corrupted records successfully? And even if this is
>>>> possible, it
>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>> be a case for reprocessing instead of failing the whole
>>>>>>>>>>>>>>>>> application?
>>>>>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record,
>>>> should
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve
>>>> the
>>>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an
>>>>>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the
>>>> Processor
>>>>>>>>>>>>>>>>>> API,
>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> user would write a simple application that would then be
>>>>>>>>>>>>>>>>>> augmented
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> proposed KIP changes to handle exceptions. It should also
>>>>>>>>>>>>>>>>>> become much
>>>>>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code
>>>>>>>>>>>>>>>>>> paths for
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - Do we have sufficient information available to make
>>>> informed
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> decisions
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> what to do next? For example, do we know in which part of
>>>> the
>>>>>>>>>>>>>>>>>> topology
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic,
>>>>>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this
>>>> is
>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send
>>>>>>>>>>>>>>>>>> corrupted
>>>>>>>>>>>>>>>>>> records to dead letter queue (quarantine topic). But, what
>>>>>>>>>>>>>>>>>> pattern
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g.
>>>> how to
>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter
>>>>>>>>>>>>>>>>>> queue
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> again, then try the second record for the time being and go
>>>> back
>>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> first record at a later time"). Jay and Jan already alluded
>>>> to
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> problems that will be caused by dead letter queues. As I said,
>>>>>>>>>>>>>>>> retries
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> might be out of scope but perhaps the implications should be
>>>>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> possible?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in
>>>> the
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> conversation
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the
>>>>>>>>>>>>>>>>>> category of
>>>>>>>>>>>>>>>>>> poison pills / deserialization errors. But since Jay
>>>> brought
>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ----------------------------snip--------------------------
>>>> --
>>>>>>>>>>>>>>>>>> A meta comment: I am not sure about this split between the
>>>>>>>>>>>>>>>>>> code for
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure
>>>>>>>>>>>>>>>>>> path
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (using
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> exception handlers). In Scala, for example, we can do:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> scala> val computation = scala.util.Try(1 / 0)
>>>>>>>>>>>>>>>>>> computation: scala.util.Try[Int] =
>>>>>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> scala> computation.getOrElse(42)
>>>>>>>>>>>>>>>>>> res2: Int = 42
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Another example with Scala's pattern matching, which is
>>>>>>>>>>>>>>>>>> similar to
>>>>>>>>>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> computation match {
>>>>>>>>>>>>>>>>>> case scala.util.Success(x) => x * 5
>>>>>>>>>>>>>>>>>> case scala.util.Failure(_) => 42
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in
>>>>>>>>>>>>>>>>>> Scala,
>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> that's
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have
>>>> an
>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> code "the happy path", and then have a different code path
>>>> for
>>>>>>>>>>>>>>>>>> failures
>>>>>>>>>>>>>>>>>> (using exceptions and handlers); or should we treat both
>>>>>>>>>>>>>>>>>> Success and
>>>>>>>>>>>>>>>>>> Failure in the same way?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think the failure/exception handling approach (as
>>>> proposed in
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> KIP)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> is well-suited for errors in the category of deserialization
>>>>>>>>>>>>>>>> problems
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> aka
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> poison pills, partly because the (default) serdes are defined
>>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> configuration (explicit serdes however are defined through
>>>> API
>>>>>>>>>>>>>>>>>> calls).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception
>>>>>>>>>>>>>>>>>> handling
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail
>>>> to
>>>>>>>>>>>>>>>>>> guard
>>>>>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>>>>>>>>>>>> stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> // Here: Fallback to a sane default when encountering
>>>>>>>>>>>>>>>>>> failed
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>>>>>>>>> res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> // Here: Skip over failed records
>>>>>>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).collect{ case
>>>>>>>>>>>>>>>>>> Success(s)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> => s
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The above is more natural to me than using error handlers to
>>>>>>>>>>>>>>>>>> define
>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> deal with failed records (here, the value `3` causes an
>>>>>>>>>>>>>>>>>> arithmetic
>>>>>>>>>>>>>>>>>> exception). Again, it might help the KIP if we added an
>>>>>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for such user code errors.
>>>>>>>>>>>>>>>>>> ----------------------------snip--------------------------
>>>> --
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Jay,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> ConsumerRecord
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> deserialisation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am working with Database Changelogs only. I would really
>>>> not
>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> similliar. how am I expected to get these back in order.
>>>> Just
>>>>>>>>>>>>>>>>>>> grind
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead
>>>>>>>>>>>>>>>>>>> letters.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> (where
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - I think we should hold off on retries unless we
>>>> have
>>>>>>>>>>>>>>>>>>> worked
>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> full usage pattern, people can always implement their
>>>>>>>>>>>>>>>>>> own. I
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the idea
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> is that you send the message to some kind of dead
>>>>>>>>>>>>>>>>>>>> letter queue
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> replay these later. This obviously destroys all
>>>> semantic
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> guarantees
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> we are
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> working hard to provide right now, which may be okay.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang