Hi Chris and others,

Yes, you are correct; I looked through KIP-298 to understand it better. I
agree with your idea to handle "errors.tolerance=none."

I see, you are basically saying you are in favor of standardizing handling
what to set the reporter to if it is not configured. I am on board with
this proposal, especially if this is in line with previous behaviors as you
mentioned.

I will add both of these suggestions to the KIP.

Lastly, unless anyone has any issues with Chris's suggestions, I believe
the last part we have to come to a consensus is using a Future as the
return type. I am for giving extra guarantees to the user if they wish;
however, I am not very familiar with the potential issues with the consumer
heartbeat as Arjun pointed out. Does anyone have any thoughts on this?

Thanks,
Aakash

On Tue, May 19, 2020 at 2:10 PM Chris Egerton <chr...@confluent.io> wrote:

> Hi Aakash,
>
> > If "errors.tolerance=none", should it not be the case that the error
> reporter does not even report any error; rather, the task just fails after
> throwing the error? I do understand the point you are saying about
> duplicates, though.
>
> I believe the "errors.tolerance" property dictates whether a task should
> fail after a record that causes problems during conversion or
> transformation is encountered and reported (for example, by writing to a
> DLQ). If it is set to "none", then the task will fail immediately; if it is
> set to "all", then the task will continue running normally. So if we want
> to preserve that behavior, we might want to immediately throw an exception
> when an errant record is reported by a "SinkTask" instance and the user has
> configured "errors.tolerance = none", which unless caught will cause the
> task to cease writing records to the sink. In addition to throwing that
> exception, we should also still fail the task; the exception is just a way
> to (hopefully) interrupt the task's processing of records in order to
> prevent duplicates if/when the task is restarted later on.
>
> > Lastly, why do you say we should always provide an errant record
> reporter?
> Doesn't that change the contract of what functionality it is providing?
>
> I'm just thinking that instead of returning "null" when no errant record
> reporter is configured, we could return one that always fails the task and
> throws an exception. This seems in line with the default behavior of the
> framework when no error handling configuration properties are specified and
> a record causes problems during conversion or transformation. We could
> leave the choice in the hands of developers but this might make things
> confusing for users who get different behavior from different connectors
> under the same circumstances.
>
> Hope this helps!
>
> Cheers,
>
> Chris
>
> On Tue, May 19, 2020 at 1:50 PM Aakash Shah <as...@confluent.io> wrote:
>
> > Hi Arjun,
> >
> > I am not very familiar with how the potential heartbeat failure would
> cause
> > more failures when consuming subsequent records. Can you elaborate on
> this?
> >
> > Thanks,
> > Aakash
> >
> > On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com>
> > wrote:
> >
> > > One more concern with the connector blocking on the Future's get() is
> > that
> > > it may cause the task's consumer to fail to heartbeat (since there is
> no
> > > independent thread to do this). That would then cause failures when we
> > > eventually try to consume more records after returning from put(). The
> > > developer would need to be cognizant of these bits before waiting on
> the
> > > future, which adds a reasonable amount of complexity.
> > >
> > > Even with preCommit() returning incomplete offsets, I suppose the
> concern
> > > would be that the put() method keeps giving the task more records, and
> to
> > > truly pause the "firehose", the task needs to pause all partitions?
> > >
> > >
> > > On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com>
> > > wrote:
> > >
> > > > Can we get a couple of examples that shows utility of waiting on the
> > > > Future<>? Also, in preCommit() we would report back on the incomplete
> > > > offsets. So that feedback mechanism will already exists for
> developers
> > > who
> > > > want to manually manage this.
> > > >
> > > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com>
> > wrote:
> > > >
> > > >> Thanks, Aakash, for updating the KIP.
> > > >>
> > > >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish <
> arjun.sat...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi Randall,
> > > >> >
> > > >> > Thanks for the explanation! Excellent point about guaranteeing
> > offsets
> > > >> in
> > > >> > the async case.
> > > >> >
> > > >> > If we can guarantee that the offsets will be advanced only after
> the
> > > bad
> > > >> > records are reported, then is there any value is the Future<>
> return
> > > >> type?
> > > >> > I feel we can declare the function with a void return type:
> > > >> >
> > > >> > void report(SinkRecord failedRecord, Throwable error)
> > > >> >
> > > >> > that works asynchronously, and advances offsets only after the DLQ
> > > >> producer
> > > >> > (and other reporters) complete successfully (as you explained).
> > > >> >
> > > >> > This actually alleviates my concern of what this Future<> actually
> > > >> means.
> > > >> > Since a failure to report should kill the tasks, there is no
> reason
> > > for
> > > >> the
> > > >> > connector to ever wait on the get().
> > > >>
> > > >>
> > > >> We should not say "there is no reason", because we don't know all of
> > the
> > > >> requirements that might exist for sending records to external
> systems.
> > > The
> > > >> additional guarantee regarding error records being fully recorded
> > before
> > > >> `preCommit(...)` is called is a minimal guarantee that Connect
> > provides
> > > >> the
> > > >> sink task, and returning a Future allow a sink task to have
> *stronger*
> > > >> guarantees than what Connect provides by default.
> > > >>
> > > >> Once again:
> > > >> 1. we need an async API to allow the sink task to report problem
> > records
> > > >> and then immediately continue doing more work.
> > > >> 2. Connect should guarantee to the sink task that all reported
> records
> > > >> will
> > > >> actually be recorded before `preCommit(...)` is called
> > > >> 3. a sink task *might* need stronger guarantees, and may need to
> block
> > > on
> > > >> the reported records some time before `preCommit(...)`, and we
> should
> > > >> allow
> > > >> them to do this.
> > > >> 4. Future and callbacks are common techniques, but there are
> > significant
> > > >> runtime risks of using callbacks, whereas Future is a
> common/standard
> > > >> pattern that is straightforward to use.
> > > >>
> > > >> This *exactly* matches the current KIP, which is why I plan to vote
> > for
> > > >> this valuable and well-thought out KIP.
> > > >>
> > > >>
> > > >> > And if we are guaranteeing that the
> > > >> > offsets are only advanced when the errors are reported, then this
> > > >> becomes a
> > > >> > double win:
> > > >> >
> > > >> > 1. connector developers can literally fire and forget failed
> > records.
> > > >> > 2. offsets are correctly advanced on errors being reported.
> Failure
> > to
> > > >> > report error will kill the task, and the last committed offset
> will
> > be
> > > >> the
> > > >> > correct one.
> > > >>
> > > >>
> > > >> > The main contract would simply be to call report() before
> > preCommit()
> > > or
> > > >> > before put() returns in the task, so the framework knows that that
> > > there
> > > >> > are error records reported, and those need to finish before the
> > > offsets
> > > >> can
> > > >> > be advanced.
> > > >> >
> > > >> > I think I'd be pretty excited about this API. and if we all agree,
> > > then
> > > >> > let's go ahead with this?
> > > >>
> > > >>
> > > >> > Best,
> > > >> >
> > > >> >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to