Hi all,

I've updated the KIP to reflect all the new agreed-upon suggestions.

Please let me know if you have any more suggestions.

Thanks,
Aakash

On Sun, May 17, 2020 at 12:06 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi all,
>
> I'm on board with adding an interface in the Connect API as Arjun
> suggested. Slightly higher commitment and maintenance but it also gives us
> an easier path to future extensions in this scope (error handling). The
> usage is equivalent to adding just a new method with known types to
> `SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness
> in the connector code, but in both suggestions this would fail with
> `NoSuchMethodError` on older workers).
>
> With respect to the method signature, I also agree with Randall's latest
> suggestion, of a two argument method such as:
>
> Future<Void> report(SinkTask, Throwable)
>
> Returning `Future<RecordMetadata>` can also be ok, but since this refers to
> the DLQ I'd slightly prefer to avoid exposing information that might
> confuse the users regarding what topic, partitions and offset this return
> value corresponds to. But both return types should be fine and will give
> plenty of flexibility to connector developers, making the sync use case
> straightforward. In any case, given the interface we can extend this in a
> compatible way in the future if we think we need to.
>
> Minor comments:
> Version will be 2.6 and not 2.9 (the latter was added by accident in a few
> places).
>
> Best,
> Konstantine
>
>
> On Sun, May 17, 2020 at 11:25 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > If that's the case, I think framework should not commit if there are any
> > outstanding records in teh reporter. That would prevent the scenario
> where
> > we could potentially lose records frm being sent either to Sink/the
> > reporter. WDYT about the KIP including that as part of the design?
> >
> > On Sun, May 17, 2020 at 11:13 AM Randall Hauch <rha...@gmail.com> wrote:
> >
> > > On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> > > mage...@confluent.io> wrote:
> > >
> > > > Randall
> > > >
> > > > Thanks a lot for your thoughts. I was wondering if we would ever have
> > to
> > > > make the API asynchronous, we could expose it as a new method right?
> If
> > > > that's a possibility would it be better if the API explicitly has
> > > semantics
> > > > of a synchronous API if the implementation is indeed going to be
> > > > synchronous.
> > > >
> > >
> > > Thanks, Magesh.
> > >
> > > I think it's likely that the implementation may need to be synchronous
> to
> > > some degree. For example, just to keep the implementation simple we
> might
> > > block the WorkerSinkTask after `put(Collection<SinkRecord>)` returns we
> > > might latch until the reporter has received all acks, especially if it
> > > simplifies the offset management and commit logic.
> > >
> > > Even if that's the case, having each `report(...)` call be asynchronous
> > > means that the sink task doesn't *have* to wait until each failed
> record
> > > has been recorded to continue sending valid records to the external
> > system.
> > > Consider an example with 1000 records in a batch, where only the first
> > > record has an error. If `record(...)` were synchronous, the `put(...)`
> > > method would block reporting the first record and would then only send
> > the
> > > 999 after that's happened. With an asynchronous `record(...)` method,
> the
> > > `put(...)` method could report the first record, send the 999 records,
> > and
> > > then wait for the futures returned by the report method.
> > >
> > >
> > > >
> > > > On Sun, May 17, 2020, 9:27 AM Randall Hauch <rha...@gmail.com>
> wrote:
> > > >
> > > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > > > mage...@confluent.io> wrote:
> > > > >
> > > > > > Thanks Randall. The suggestion i made also has a problem when
> > > reporter
> > > > > > isn't enabled where it could potentially write records after
> error
> > > > > records
> > > > > > to sink before failing.
> > > > > >
> > > > > > The other concern i had with reporter being asynchronous. For
> some
> > > > reason
> > > > > > if the reporter is taking longer because of say a specific broker
> > > > issue,
> > > > > > the connector might still move forward and commit if it's not
> > waiting
> > > > for
> > > > > > the reporter.  During  this if the worker crashes we will now
> lose
> > > the
> > > > > bad
> > > > > > record
> > > > > >  I don't think this is desirable behavior. I think the
> synchronous
> > > > > reporter
> > > > > > provides better guarantees for all connectors.
> > > > > >
> > > > > >
> > > > > Thanks, Magesh.
> > > > >
> > > > > That's a valid concern, and maybe that will affect how the feature
> is
> > > > > actually implemented. I expect it to be a bit tricky to ensure that
> > > > errant
> > > > > records are fully written to Kafka before the offsets are
> committed,
> > so
> > > > it
> > > > > might be simplest to start out with a synchronous implementation.
> But
> > > the
> > > > > API can still be an asynchronous design whether or not the
> > > implementation
> > > > > is synchronous. That gives us the ability in the future to change
> the
> > > > > implementation if we determine a way to handle all concerns. For
> > > example,
> > > > > the WorkerSinkTask may need to backoff if waiting to commit due to
> > too
> > > > many
> > > > > incomplete/unacknowledged reporter requests. OTOH, if we make the
> > > > `report`
> > > > > method(s) synchronous from the beginning, it will be very
> challenging
> > > to
> > > > > change them in the future to be asynchronous.
> > > > >
> > > > > I guess it boils down to this question: do we know today that we
> will
> > > > > *never* want the reporter to write asynchronously?
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Randall
> > > > >
> > > >
> > >
> >
> >
> > --
> > Thanks
> > Magesh
> >
> > *Magesh Nandakumar*
> > Software Engineer
> > mage...@confluent.io
> >
>

Reply via email to