Hi Randall,

First off, thank you for the incredibly detailed example. I don't mind
walls of text. I found it very helpful. I especially liked the idea about
modifying how the framework invokes "SinkTask::preCommit" to take most of
the work out of developers' hands in the common case of a "fire-and-forget"
but still provide flexibility to accommodate connectors with, for example,
exactly-once delivery guarantees that involve committing offsets to the
sink atomically with the actual records that they've received from Kafka.

I have one point I'd like to raise about the stated advantage of an
asynchronous API: that tasks can continue processing records and sending
them to the sink destination without having to block on the completion of
the error report.

Wouldn't this actually be a disadvantage in the case that the user has
configured the connector with "errors.tolerance = none"? In that case, the
expectation is that the task should fail as soon as it hits a bad record;
allowing it to possibly continue to produce records in that case (which
would likely end up as duplicates in the sink if/when the task is
restarted) doesn't seem optimal.

I don't think that this makes an asynchronous API completely unusable; I
just think that we'd want to synchronously throw some kind of exception
when the error reporter is invoked and the connector is configured with
"errors.tolerance = none", instead of causing one to be thrown wrapped in
an ExecutionException if/when "Future::get" is called on the returned
future.

I'd also like to suggest a slight change to the logic for invoking
"SinkTask::preCommit". The interval at which offsets are committed for sink
tasks is configurable via the worker-level "offset.flush.interval.ms"
property; I think it'd be nice to respect that property if we could. What
would you think about calling "SinkTask::preCommit" at the normally
scheduled times, but altering the offsets that are passed in to that call
to not go beyond any offsets for errant records that have been reported but
not fully processed yet?

For example, imagine a task has been given records with offsets 0-10 on a
single topic partition and reports records with offsets 2 and 7 to the
framework. Then, the framework is able to process the record with offset 2
but not the record with offset 7. When it comes time for an offset commit,
the framework will call "SinkTask::preCommit" with an offset of 6 for that
topic partition, since the record for offset 7 has not been completely
taken care of yet.

One more small suggestion: we may want to always provide an errant record
reporter to connectors, even if one has not been configured. This reporter
would simply fail the task and throw an exception as soon as it's invoked.
This would provide a more uniform experience for users across different
connectors and would establish expectations that, if a connector uses the
features added by KIP-610 at all, it will fail by default on any invalid
records (instead of doing something implementation-dependent).

Cheers,

Chris

On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com>
wrote:

> One more concern with the connector blocking on the Future's get() is that
> it may cause the task's consumer to fail to heartbeat (since there is no
> independent thread to do this). That would then cause failures when we
> eventually try to consume more records after returning from put(). The
> developer would need to be cognizant of these bits before waiting on the
> future, which adds a reasonable amount of complexity.
>
> Even with preCommit() returning incomplete offsets, I suppose the concern
> would be that the put() method keeps giving the task more records, and to
> truly pause the "firehose", the task needs to pause all partitions?
>
>
> On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com>
> wrote:
>
> > Can we get a couple of examples that shows utility of waiting on the
> > Future<>? Also, in preCommit() we would report back on the incomplete
> > offsets. So that feedback mechanism will already exists for developers
> who
> > want to manually manage this.
> >
> > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com> wrote:
> >
> >> Thanks, Aakash, for updating the KIP.
> >>
> >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish <arjun.sat...@gmail.com>
> >> wrote:
> >>
> >> > Hi Randall,
> >> >
> >> > Thanks for the explanation! Excellent point about guaranteeing offsets
> >> in
> >> > the async case.
> >> >
> >> > If we can guarantee that the offsets will be advanced only after the
> bad
> >> > records are reported, then is there any value is the Future<> return
> >> type?
> >> > I feel we can declare the function with a void return type:
> >> >
> >> > void report(SinkRecord failedRecord, Throwable error)
> >> >
> >> > that works asynchronously, and advances offsets only after the DLQ
> >> producer
> >> > (and other reporters) complete successfully (as you explained).
> >> >
> >> > This actually alleviates my concern of what this Future<> actually
> >> means.
> >> > Since a failure to report should kill the tasks, there is no reason
> for
> >> the
> >> > connector to ever wait on the get().
> >>
> >>
> >> We should not say "there is no reason", because we don't know all of the
> >> requirements that might exist for sending records to external systems.
> The
> >> additional guarantee regarding error records being fully recorded before
> >> `preCommit(...)` is called is a minimal guarantee that Connect provides
> >> the
> >> sink task, and returning a Future allow a sink task to have *stronger*
> >> guarantees than what Connect provides by default.
> >>
> >> Once again:
> >> 1. we need an async API to allow the sink task to report problem records
> >> and then immediately continue doing more work.
> >> 2. Connect should guarantee to the sink task that all reported records
> >> will
> >> actually be recorded before `preCommit(...)` is called
> >> 3. a sink task *might* need stronger guarantees, and may need to block
> on
> >> the reported records some time before `preCommit(...)`, and we should
> >> allow
> >> them to do this.
> >> 4. Future and callbacks are common techniques, but there are significant
> >> runtime risks of using callbacks, whereas Future is a common/standard
> >> pattern that is straightforward to use.
> >>
> >> This *exactly* matches the current KIP, which is why I plan to vote for
> >> this valuable and well-thought out KIP.
> >>
> >>
> >> > And if we are guaranteeing that the
> >> > offsets are only advanced when the errors are reported, then this
> >> becomes a
> >> > double win:
> >> >
> >> > 1. connector developers can literally fire and forget failed records.
> >> > 2. offsets are correctly advanced on errors being reported. Failure to
> >> > report error will kill the task, and the last committed offset will be
> >> the
> >> > correct one.
> >>
> >>
> >> > The main contract would simply be to call report() before preCommit()
> or
> >> > before put() returns in the task, so the framework knows that that
> there
> >> > are error records reported, and those need to finish before the
> offsets
> >> can
> >> > be advanced.
> >> >
> >> > I think I'd be pretty excited about this API. and if we all agree,
> then
> >> > let's go ahead with this?
> >>
> >>
> >> > Best,
> >> >
> >> >
> >> >
> >>
> >
>

Reply via email to