Hi Arjun,

I am not very familiar with how the potential heartbeat failure would cause
more failures when consuming subsequent records. Can you elaborate on this?

Thanks,
Aakash

On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com>
wrote:

> One more concern with the connector blocking on the Future's get() is that
> it may cause the task's consumer to fail to heartbeat (since there is no
> independent thread to do this). That would then cause failures when we
> eventually try to consume more records after returning from put(). The
> developer would need to be cognizant of these bits before waiting on the
> future, which adds a reasonable amount of complexity.
>
> Even with preCommit() returning incomplete offsets, I suppose the concern
> would be that the put() method keeps giving the task more records, and to
> truly pause the "firehose", the task needs to pause all partitions?
>
>
> On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com>
> wrote:
>
> > Can we get a couple of examples that shows utility of waiting on the
> > Future<>? Also, in preCommit() we would report back on the incomplete
> > offsets. So that feedback mechanism will already exists for developers
> who
> > want to manually manage this.
> >
> > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com> wrote:
> >
> >> Thanks, Aakash, for updating the KIP.
> >>
> >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish <arjun.sat...@gmail.com>
> >> wrote:
> >>
> >> > Hi Randall,
> >> >
> >> > Thanks for the explanation! Excellent point about guaranteeing offsets
> >> in
> >> > the async case.
> >> >
> >> > If we can guarantee that the offsets will be advanced only after the
> bad
> >> > records are reported, then is there any value is the Future<> return
> >> type?
> >> > I feel we can declare the function with a void return type:
> >> >
> >> > void report(SinkRecord failedRecord, Throwable error)
> >> >
> >> > that works asynchronously, and advances offsets only after the DLQ
> >> producer
> >> > (and other reporters) complete successfully (as you explained).
> >> >
> >> > This actually alleviates my concern of what this Future<> actually
> >> means.
> >> > Since a failure to report should kill the tasks, there is no reason
> for
> >> the
> >> > connector to ever wait on the get().
> >>
> >>
> >> We should not say "there is no reason", because we don't know all of the
> >> requirements that might exist for sending records to external systems.
> The
> >> additional guarantee regarding error records being fully recorded before
> >> `preCommit(...)` is called is a minimal guarantee that Connect provides
> >> the
> >> sink task, and returning a Future allow a sink task to have *stronger*
> >> guarantees than what Connect provides by default.
> >>
> >> Once again:
> >> 1. we need an async API to allow the sink task to report problem records
> >> and then immediately continue doing more work.
> >> 2. Connect should guarantee to the sink task that all reported records
> >> will
> >> actually be recorded before `preCommit(...)` is called
> >> 3. a sink task *might* need stronger guarantees, and may need to block
> on
> >> the reported records some time before `preCommit(...)`, and we should
> >> allow
> >> them to do this.
> >> 4. Future and callbacks are common techniques, but there are significant
> >> runtime risks of using callbacks, whereas Future is a common/standard
> >> pattern that is straightforward to use.
> >>
> >> This *exactly* matches the current KIP, which is why I plan to vote for
> >> this valuable and well-thought out KIP.
> >>
> >>
> >> > And if we are guaranteeing that the
> >> > offsets are only advanced when the errors are reported, then this
> >> becomes a
> >> > double win:
> >> >
> >> > 1. connector developers can literally fire and forget failed records.
> >> > 2. offsets are correctly advanced on errors being reported. Failure to
> >> > report error will kill the task, and the last committed offset will be
> >> the
> >> > correct one.
> >>
> >>
> >> > The main contract would simply be to call report() before preCommit()
> or
> >> > before put() returns in the task, so the framework knows that that
> there
> >> > are error records reported, and those need to finish before the
> offsets
> >> can
> >> > be advanced.
> >> >
> >> > I think I'd be pretty excited about this API. and if we all agree,
> then
> >> > let's go ahead with this?
> >>
> >>
> >> > Best,
> >> >
> >> >
> >> >
> >>
> >
>

Reply via email to