Hi all, I've updated the KIP to reflect all the new agreed-upon suggestions.
Please let me know if you have any more suggestions. Thanks, Aakash On Sun, May 17, 2020 at 12:06 PM Konstantine Karantasis < konstant...@confluent.io> wrote: > Hi all, > > I'm on board with adding an interface in the Connect API as Arjun > suggested. Slightly higher commitment and maintenance but it also gives us > an easier path to future extensions in this scope (error handling). The > usage is equivalent to adding just a new method with known types to > `SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness > in the connector code, but in both suggestions this would fail with > `NoSuchMethodError` on older workers). > > With respect to the method signature, I also agree with Randall's latest > suggestion, of a two argument method such as: > > Future<Void> report(SinkTask, Throwable) > > Returning `Future<RecordMetadata>` can also be ok, but since this refers to > the DLQ I'd slightly prefer to avoid exposing information that might > confuse the users regarding what topic, partitions and offset this return > value corresponds to. But both return types should be fine and will give > plenty of flexibility to connector developers, making the sync use case > straightforward. In any case, given the interface we can extend this in a > compatible way in the future if we think we need to. > > Minor comments: > Version will be 2.6 and not 2.9 (the latter was added by accident in a few > places). > > Best, > Konstantine > > > On Sun, May 17, 2020 at 11:25 AM Magesh kumar Nandakumar < > mage...@confluent.io> wrote: > > > If that's the case, I think framework should not commit if there are any > > outstanding records in teh reporter. That would prevent the scenario > where > > we could potentially lose records frm being sent either to Sink/the > > reporter. WDYT about the KIP including that as part of the design? > > > > On Sun, May 17, 2020 at 11:13 AM Randall Hauch <rha...@gmail.com> wrote: > > > > > On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar < > > > mage...@confluent.io> wrote: > > > > > > > Randall > > > > > > > > Thanks a lot for your thoughts. I was wondering if we would ever have > > to > > > > make the API asynchronous, we could expose it as a new method right? > If > > > > that's a possibility would it be better if the API explicitly has > > > semantics > > > > of a synchronous API if the implementation is indeed going to be > > > > synchronous. > > > > > > > > > > Thanks, Magesh. > > > > > > I think it's likely that the implementation may need to be synchronous > to > > > some degree. For example, just to keep the implementation simple we > might > > > block the WorkerSinkTask after `put(Collection<SinkRecord>)` returns we > > > might latch until the reporter has received all acks, especially if it > > > simplifies the offset management and commit logic. > > > > > > Even if that's the case, having each `report(...)` call be asynchronous > > > means that the sink task doesn't *have* to wait until each failed > record > > > has been recorded to continue sending valid records to the external > > system. > > > Consider an example with 1000 records in a batch, where only the first > > > record has an error. If `record(...)` were synchronous, the `put(...)` > > > method would block reporting the first record and would then only send > > the > > > 999 after that's happened. With an asynchronous `record(...)` method, > the > > > `put(...)` method could report the first record, send the 999 records, > > and > > > then wait for the futures returned by the report method. > > > > > > > > > > > > > > On Sun, May 17, 2020, 9:27 AM Randall Hauch <rha...@gmail.com> > wrote: > > > > > > > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar < > > > > > mage...@confluent.io> wrote: > > > > > > > > > > > Thanks Randall. The suggestion i made also has a problem when > > > reporter > > > > > > isn't enabled where it could potentially write records after > error > > > > > records > > > > > > to sink before failing. > > > > > > > > > > > > The other concern i had with reporter being asynchronous. For > some > > > > reason > > > > > > if the reporter is taking longer because of say a specific broker > > > > issue, > > > > > > the connector might still move forward and commit if it's not > > waiting > > > > for > > > > > > the reporter. During this if the worker crashes we will now > lose > > > the > > > > > bad > > > > > > record > > > > > > I don't think this is desirable behavior. I think the > synchronous > > > > > reporter > > > > > > provides better guarantees for all connectors. > > > > > > > > > > > > > > > > > Thanks, Magesh. > > > > > > > > > > That's a valid concern, and maybe that will affect how the feature > is > > > > > actually implemented. I expect it to be a bit tricky to ensure that > > > > errant > > > > > records are fully written to Kafka before the offsets are > committed, > > so > > > > it > > > > > might be simplest to start out with a synchronous implementation. > But > > > the > > > > > API can still be an asynchronous design whether or not the > > > implementation > > > > > is synchronous. That gives us the ability in the future to change > the > > > > > implementation if we determine a way to handle all concerns. For > > > example, > > > > > the WorkerSinkTask may need to backoff if waiting to commit due to > > too > > > > many > > > > > incomplete/unacknowledged reporter requests. OTOH, if we make the > > > > `report` > > > > > method(s) synchronous from the beginning, it will be very > challenging > > > to > > > > > change them in the future to be asynchronous. > > > > > > > > > > I guess it boils down to this question: do we know today that we > will > > > > > *never* want the reporter to write asynchronously? > > > > > > > > > > Best regards, > > > > > > > > > > Randall > > > > > > > > > > > > > > > > > > -- > > Thanks > > Magesh > > > > *Magesh Nandakumar* > > Software Engineer > > mage...@confluent.io > > >