Hi Arjun, Thanks for the KIP. Just a few comments/questions:
1. The proposal allows users to configure the number of retries. I usually find it easier as a user to work with timeouts since it's difficult to know how long a retry might take. Have you considered adding a timeout option which would retry until the timeout expires? 2. The configs are named very generically (e.g. errors.retries.limit). Do you think it will be clear to users what operations these configs apply to? 3. I wasn't entirely clear what messages are stored in the dead letter queue. It sounds like it includes both configs and messages since we have errors.dlq.include.configs? Is there a specific schema you have in mind? 4. I didn't see it mentioned explicitly in the KIP, but I assume the tolerance metrics are reset after every task rebalance? 5. I wonder if we can do without errors.tolerance.limit. You can get a similar effect using errors.tolerance.rate.limit if you allow longer durations. I'm not sure how useful an absolute counter is in practice. Thanks, Jason On Fri, May 18, 2018 at 2:55 PM, Arjun Satish <arjun.sat...@gmail.com> wrote: > Super! Thanks, Magesh! > > On Fri, May 18, 2018 at 2:53 PM, Magesh Nandakumar <mage...@confluent.io> > wrote: > > > Arjun, > > > > Thanks for all the updates. I think it looks great and I don't have any > > other concerns. > > > > Thanks > > Magesh > > > > On Fri, May 18, 2018 at 2:36 PM, Konstantine Karantasis < > > konstant...@confluent.io> wrote: > > > > > The updated version of the KIP that uses the dead-letter-queue only for > > > sink records and only to store the raw record data looks better and > > easier > > > to understand. > > > I think it's moving to the right direction. > > > > > > No further comments from my side. > > > > > > Thanks Arjun! > > > > > > - Konstantine > > > > > > On Fri, May 18, 2018 at 1:07 AM, Arjun Satish <arjun.sat...@gmail.com> > > > wrote: > > > > > > > Ewen, > > > > > > > > Thanks a lot for your comments! > > > > > > > > 1. For errors.retry.delay.max.ms, yes we plan to use exponential > > > backoffs > > > > with an fixed initial value. Updated the KIP to say this. > > > > > > > > 2. A failed operation will be retried (potentially multiple times). > If > > > all > > > > the retries fail, we declare this to be an error. This is where > > tolerance > > > > kicks in. Hence, you can have 0 retries, but infinite tolerance ( > > > > errors.tolerance.limit = -1), where we will never retry any failure, > > but > > > > all of bad records will be skipped. Updated the KIP. Hopefully this > is > > > > clear now. > > > > > > > > 3. Yes, for error messages we have some base information (what > > operation > > > > failed and with what exception and stacktrace, for example). Hence, > the > > > > three configs. The main reason for having properties for disabling > > > messages > > > > and configs is to avoid logging sensitive information to unsecured > > > > locations (for example, the file logs). Updated the KIP to describe > > this. > > > > > > > > I think topic name should be mandatory: if we have a default topic, > > then > > > > all the connectors in a cluster will produce messages into it, making > > it > > > > confusing to read from. We could have a default pattern for > > constructing > > > > topic names, for example: a format like ${connector-name}-errors. > > > > > > > > 4. The reason for multiple clusters is to allow users with sensitive > > data > > > > to log errors into secure clusters. There are defaults for these > > > > properties, but if you think this is making the config too complex, > we > > > can > > > > drop the errors.deadletterqueue.producer.* properties from this > > > > implementation. > > > > > > > > 5. I had mentioned that the format is in JSON in the proposed changes > > > > section. Updated the public interface section to say this again. We > > could > > > > provide overrides for the Converter used here, and use an > AvroConverter > > > > instead, which should preserve the structure and schema of the data. > > The > > > > avro binary would be base64 encoded in the logged records. But yes, > > this > > > > brings in configurable converters and their configurations which > > > introduces > > > > a new level of complexity (AvroConverters and their dependency on > > Schema > > > > Registry, for instance). Hence, they were not included in this > > proposal. > > > > > > > > Another option is to add a StructSerializer and StructDeserializer, > > which > > > > can retain the schema and structure of the Structs in the schema. If > we > > > do > > > > this, non-Java clients which need to read these error records would > > need > > > to > > > > port the deserialization logic. Ultimately, we need to indicate what > > the > > > > record looks like, and > > > > > > > > Could you point out what is unclear w.r.t reprocessing? > > > > > > > > Let me know what you think. > > > > > > > > > > > > On Thu, May 17, 2018 at 11:02 PM, Ewen Cheslack-Postava < > > > e...@confluent.io > > > > > > > > > wrote: > > > > > > > > > A few more thoughts -- might not change things enough to affect a > > vote, > > > > but > > > > > still some things to consider: > > > > > > > > > > * errors.retry.delay.max.ms -- this defines the max, but I'm not > > > seeing > > > > > where we define the actual behavior. Is this intentional, or should > > we > > > > just > > > > > say that it is something like exponential, based on a starting > delay > > > > value? > > > > > * I'm not sure I understand tolerance vs retries? They sound > > generally > > > > the > > > > > same -- tolerance sounds like # of retries since it is defined in > > terms > > > > of > > > > > failures. > > > > > * errors.log.enable -- it's unclear why this shouldn't just be > > > > > errors.log.include.configs > > > > > || errors.log.include.messages (and include clauses for any other > > > flags). > > > > > If there's just some base info, that's fine, but the explanation of > > the > > > > > config should make that clear. > > > > > * errors.deadletterqueue.enable - similar question here about just > > > > enabling > > > > > based on other relevant configs. seems like additional config > > > complexity > > > > > for users when the topic name is absolutely going to be a basic > > > > requirement > > > > > anyway. > > > > > * more generally related to dlq, it seems we're trying to support > > > > multiple > > > > > clusters here -- is there a reason for this? it's not that costly, > > but > > > > one > > > > > thing supporting this requires is an entirely separate set of > > configs, > > > > > ACLs, etc. in contrast, assuming an additional topic on the same > > > cluster > > > > > we're already working with keeps things quite simple. do we think > > this > > > > > complexity is worth it? elsewhere, we've seen the complexity of > > > multiple > > > > > clusters result in a lot of config confusion. > > > > > * It's not obvious throughout that the format is JSON, and I assume > > in > > > > many > > > > > cases it uses JsonConverter. This should be clear at the highest > > level, > > > > not > > > > > just in the case of things like SchemaAndValue fields. This also > > seems > > > to > > > > > introduce possibly complications for DLQs -- instead of delivering > > the > > > > raw > > > > > data, we potentially lose raw data & schema info because we're > > > rendering > > > > it > > > > > as JSON. Not sure that's a good idea... > > > > > > > > > > I think that last item might be the biggest concern to me -- DLQ > > > formats > > > > > and control over content & reprocessing seems a bit unclear to me > > here, > > > > so > > > > > I'd assume users could also end up confused. > > > > > > > > > > -Ewen > > > > > > > > > > > > > > > On Thu, May 17, 2018 at 8:53 PM Arjun Satish < > arjun.sat...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Konstantine, > > > > > > > > > > > > Thanks for pointing out the typos. Fixed them. > > > > > > > > > > > > I had added the JSON schema which should now include key and > header > > > > > configs > > > > > > in there too. This should have been in the public interfaces > > section. > > > > > > > > > > > > Thanks very much, > > > > > > > > > > > > On Thu, May 17, 2018 at 9:13 AM, Konstantine Karantasis < > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > Thanks Arjun for your quick response. > > > > > > > > > > > > > > Adding an example for the failure log improves things, but I > > think > > > > it'd > > > > > > be > > > > > > > better to also add the schema definition of these Json entries. > > And > > > > > I'll > > > > > > > agree with Magesh that this format should be public API. > > > > > > > > > > > > > > Also, does the current example have a copy/paste typo? Seems > that > > > the > > > > > > > TRANSFORMATION stage in the end has the config of a converter. > > > > > > > Similar to the above, fields for 'key' and 'headers' (and their > > > > > > conversion > > > > > > > stages) are skipped when they are not defined? Or should they > > > present > > > > > and > > > > > > > empty? A schema definition would help to know what a consumer > of > > > such > > > > > > logs > > > > > > > should expect. > > > > > > > > > > > > > > Also, thanks for adding some info for error on the source side. > > > > > However, > > > > > > I > > > > > > > feel the current description might be a little bit ambiguous. I > > > read: > > > > > > > "For errors in a source connector, the process is similar, but > > care > > > > > needs > > > > > > > to be taken while writing back to the source." and sounds like > > it's > > > > > > > suggested that Connect will write records back to the source, > > which > > > > > can't > > > > > > > be correct. > > > > > > > > > > > > > > Finally, a nit: " adds store the row information "... typo? > > > > > > > > > > > > > > Thanks, > > > > > > > - Konstantine > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 17, 2018 at 12:48 AM, Arjun Satish < > > > > arjun.sat...@gmail.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > On Wed, May 16, 2018 at 7:13 PM, Matt Farmer <m...@frmr.me> > > > wrote: > > > > > > > > > > > > > > > > > Hey Arjun, > > > > > > > > > > > > > > > > > > I like deadletterqueue all lower case, so I'm +1 on that. > > > > > > > > > > > > > > > > > > > > > > > > > Super! updated the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, in the case we were seeing there were external system > > > > > failures. > > > > > > > > > We had issues connecting to S3. While the connector does > > > include > > > > > > > > > some retry functionality, however setting these values > > > > sufficiently > > > > > > > > > high seemed to cause us to hit timeouts and cause the > entire > > > > > > > > > task to fail anyway. (I think I was using something like > 100 > > > > > retries > > > > > > > > > during the brief test of this behavior?) > > > > > > > > > > > > > > > > > > > > > > > > > I am guessing these issues come up with trying to write to > S3. > > Do > > > > you > > > > > > > think > > > > > > > > the S3 connector can detect the safe situations where it can > > > throw > > > > > > > > RetriableExceptions instead of ConnectExceptions here (when > the > > > > > > connector > > > > > > > > think it is safe to do so)? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yeah, totally understand that there could be unintended > > > > > concequences > > > > > > > > > from this. I guess the use case I'm trying to optimize for > is > > > to > > > > > give > > > > > > > > > folks some bubblegum to keep a high volume system limping > > > > > > > > > along until the software engineers get time to address it. > So > > > I'm > > > > > > > > > imagining the situation that I'm paged on a Saturday night > > > > because > > > > > of > > > > > > > > > an intermittent network issue. With a config flag like > this I > > > > could > > > > > > > push > > > > > > > > > a config change to cause Connect to treat that as retriable > > and > > > > > allow > > > > > > > > > me to wait until the following Monday to make changes to > the > > > > code. > > > > > > > > > That may not be a sensible concern for Kafka writ large, > but > > > > > Connect > > > > > > > > > is a bit weird when compared with Streams or the Clients. > > It's > > > > > almost > > > > > > > > > more of a piece of infrastructure than a library, and I > > > generally > > > > > > like > > > > > > > > > infrastructure to have escape hatches like that. Just my > 0.02 > > > > > though. > > > > > > > :) > > > > > > > > > > > > > > > > > > > > > > > > > haha yes, it would be good to avoid those Saturday night > > pagers. > > > > > > Again, I > > > > > > > > am hesitant to imply retries on ConnectExceptions. We could > > > > > definitely > > > > > > > > define new Exceptions in the Connector, which can be thrown > to > > > > retry > > > > > if > > > > > > > the > > > > > > > > connector thinks it is safe to do so. We need to know that a > > > retry > > > > > can > > > > > > be > > > > > > > > super dangerous in a Task.put(List<SinkRecord>). Duplicate > > > records > > > > > can > > > > > > > > easily creep in, and can be notoriously hard to detect and > > clean > > > > up. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Matt > > > > > > > > > > > > > > > > > > On Tue, May 15, 2018 at 8:46 PM, Arjun Satish < > > > > > > arjun.sat...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Matt, > > > > > > > > > > > > > > > > > > > > Thanks so much for your comments. Really appreciate it! > > > > > > > > > > > > > > > > > > > > 1. Good point about the acronym. I can use > deadletterqueue > > > > > instead > > > > > > of > > > > > > > > dlq > > > > > > > > > > (using all lowercase to be consistent with the other > > configs > > > in > > > > > > > Kafka). > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > 2. Could you please tell us what errors caused these > tasks > > to > > > > > fail? > > > > > > > > Were > > > > > > > > > > they because of external system failures? And if so, > could > > > they > > > > > be > > > > > > > > > > implemented in the Connector itself? Or using retries > with > > > > > > backoffs? > > > > > > > > > > > > > > > > > > > > 3. I like this idea. But did not include it here since it > > > might > > > > > be > > > > > > a > > > > > > > > > > stretch. One thing to note is that ConnectExceptions can > be > > > > > thrown > > > > > > > > from a > > > > > > > > > > variety of places in a connector. I think it should be OK > > for > > > > the > > > > > > > > > Connector > > > > > > > > > > to throw RetriableException or something that extends it > > for > > > > the > > > > > > > > > operation > > > > > > > > > > to be retried. By changing this behavior, a lot of > existing > > > > > > > connectors > > > > > > > > > > would have to be updated so that they don't rewrite > > messages > > > > into > > > > > > > this > > > > > > > > > > sink. For example, a sink connector might write some data > > > into > > > > > the > > > > > > > > > external > > > > > > > > > > system partially, and then fail with a ConnectException. > > > Since > > > > > the > > > > > > > > > > framework has no way of knowing what was written and what > > was > > > > > not, > > > > > > a > > > > > > > > > retry > > > > > > > > > > here might cause the same data to written again into the > > > sink. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, May 14, 2018 at 12:46 PM, Matt Farmer < > > m...@frmr.me> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Arjun, > > > > > > > > > > > > > > > > > > > > > > I'm following this very closely as better error > handling > > in > > > > > > Connect > > > > > > > > is > > > > > > > > > a > > > > > > > > > > > high priority > > > > > > > > > > > for MailChimp's Data Systems team. > > > > > > > > > > > > > > > > > > > > > > A few thoughts (in no particular order): > > > > > > > > > > > > > > > > > > > > > > For the dead letter queue configuration, could we use > > > > > > > deadLetterQueue > > > > > > > > > > > instead of > > > > > > > > > > > dlq? Acronyms are notoriously hard to keep straight in > > > > > everyone's > > > > > > > > head > > > > > > > > > > and > > > > > > > > > > > unless > > > > > > > > > > > there's a compelling reason it would be nice to use the > > > > > > characters > > > > > > > > and > > > > > > > > > be > > > > > > > > > > > explicit. > > > > > > > > > > > > > > > > > > > > > > Have you considered any behavior that would > periodically > > > > > attempt > > > > > > to > > > > > > > > > > restart > > > > > > > > > > > failed > > > > > > > > > > > tasks after a certain amount of time? To get around our > > > > issues > > > > > > > > > internally > > > > > > > > > > > we've > > > > > > > > > > > deployed a tool that monitors for failed tasks and > > restarts > > > > the > > > > > > > task > > > > > > > > by > > > > > > > > > > > hitting the > > > > > > > > > > > REST API after the failure. Such a config would allow > us > > to > > > > get > > > > > > rid > > > > > > > > of > > > > > > > > > > this > > > > > > > > > > > tool. > > > > > > > > > > > > > > > > > > > > > > Have you considered a config setting to allow-list > > > additional > > > > > > > classes > > > > > > > > > as > > > > > > > > > > > retryable? In the situation we ran into, we were > getting > > > > > > > > > > ConnectExceptions > > > > > > > > > > > that > > > > > > > > > > > were intermittent due to an unrelated service. With > such > > a > > > > > > setting > > > > > > > we > > > > > > > > > > could > > > > > > > > > > > have > > > > > > > > > > > deployed a config that temporarily whitelisted that > > > Exception > > > > > as > > > > > > > > > > > retry-worthy > > > > > > > > > > > and continued attempting to make progress while the > other > > > > team > > > > > > > worked > > > > > > > > > > > on mitigating the problem. > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > On Wed, May 9, 2018 at 2:59 AM, Arjun Satish < > > > > > > > arjun.sat...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > All, > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start a discussion on adding ways to > handle > > > and > > > > > > > report > > > > > > > > > > record > > > > > > > > > > > > processing errors in Connect. Please find a KIP here: > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > > > > > > > > > > > > 298%3A+Error+Handling+in+Connect > > > > > > > > > > > > > > > > > > > > > > > > Any feedback will be highly appreciated. > > > > > > > > > > > > > > > > > > > > > > > > Thanks very much, > > > > > > > > > > > > Arjun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >