Hi Aakash,

I asked this earlier about whether futures were the right way to go, if we
wanted to enable asynchronous behavior at all:

> I'm still unclear on how futures are going to provide any benefit to
developers, though. Blocking on the return of such a future slightly later
on in the process of handling records is still blocking, and to be done
truly asynchronously without blocking processing of non-errant records,
would have to be done on a separate thread. It's technically possible for
users to cache all of these futures and instead of invoking "get" on them,
simply check whether they're complete or not via "isDone", but this seems
like an anti-pattern.

> What is the benefit of wrapping this in a future?

As far as I can tell, there hasn't been a practical example yet where the
flexibility provided by a future would actually be beneficial in writing a
connector. It'd be great if we could find one. One possible use case might
be processing records received in "SinkTask::put" without having to block
for each errant record report before sending non-errant records to the
sink. However, this could also be addressed by allowing for batch reporting
of errant records instead of accepting a single record at a time; the task
would track errant records as it processes them in "put" and report them
all en-masse after all non-errant records have been processed.

With regards to the precedent of using futures for asynchronous APIs, I
think we should make sure that whatever API we decide on is actually useful
for the cases it serves. There's plenty of precedent for callback-based
asynchronous APIs in Kafka with both "Producer::send" and
"Consumer::commitAsync"; the question here shouldn't be about what's done
in different APIs, but what would work for this one in particular.

Finally, it's also been brought up that if we're going to introduce a new
error reporter interface, we can always modify that interface later on to
go from asynchronous to synchronous behavior, or vice-versa, or even to add
a callback- or future-based variant that didn't exist before. We have
plenty of room to maneuver in the future here, so the pressure to get
everything right the first time and provide maximum flexibility doesn't
seem as pressing, and the goal of minimizing the kind of API that we have
to support for future versions without making unnecessary additions is
easier to achieve.

Cheers,

Chris



On Mon, May 18, 2020 at 12:20 PM Aakash Shah <as...@confluent.io> wrote:

> Hi Arjun,
>
> Thanks for your feedback.
>
> I agree with moving to Future<Void>, those are good points.
>
> I believe an earlier point made for asynchronous functionality were that
> modern APIs tend to be asynchronous as they result in more expressive and
> better defined APIs.
> Additionally, because a lot of Kafka Connect functionality is already
> asynchronous, I am inclined to believe that customers will want an
> asynchronous solution for this as well. And if is relatively simple to
> block with future.get() to make it synchronous, would you not say that
> having an opt-in synchronous functionality rather than synchronous only
> functionality allows for customer control while maintaining that not too
> much burden of implementation is placed on the customer?
> WDYT?
>
> Thanks,
> Aakash
>
> On Sun, May 17, 2020 at 2:51 PM Arjun Satish <arjun.sat...@gmail.com>
> wrote:
>
> > Thanks for all the feedback, folks.
> >
> > re: having a callback as a parameter, I agree that at this point, it
> might
> > not add much value to the proposal.
> >
> > re: synchronous vs asynchronous, is the motivation performance/higher
> > throughput? Taking a step back, calling report(..) in the new interface
> > does a couple of things:
> >
> > 1. at a fundamental level, it is a signal to the framework that a failure
> > occurred when processing records, specifically due to the given record.
> > 2. depending on whether errors.log and errors.deadletterqueue has been
> set,
> > some messages are written to zero or more destinations.
> > 3. depending on the value of errors.tolerance (none or all), the task is
> > failed after reporters have completed.
> >
> > for kip-610, the asynchronous method has the advantage of working with
> the
> > internal dead letter queue (which has been transparent to the developer
> so
> > far). but, how does async method help if the DLQ is not enabled? in this
> > case RecordMetadata is not very useful, AFAICT? also, if we add more
> error
> > reporters in the future (say, for example, a new reporter in a future
> that
> > writes to a RDBMS), would the async version return success on all or
> > nothing, and what about partial successes?
> >
> > overall, if we really need async behavior, I'd prefer to just use
> > Future<Void>. but if we can keep it simple, then let's go with a
> > synchronous function with the parameters Randall proposed above (with
> > return type as void, and if any of the reporters fail, the task is failed
> > if error.tolerance is none, and kept alive if tolerance is all), and
> maybe
> > add asynchronous methods in a future KIP?
> >
> > Best,
> >
>

Reply via email to