Hi Konstantine,

Thanks a lot for your feedback.

These are all good points, especially that we already have the threads we
need and that we'd rather not spin up additional. It is also true we should
consider the level of control we want to provide to the developer rather
than overstating the burden. In that case, I like the idea of using Futures
to provide asynchronous functionality.

One last proposal/thought I have, please let me know if it is
feasible/viable: because of the concerns some of us have for deprecating
the current put(...), what if we were to add the overloaded
put(Collection<SinkRecord>, BiFunction) as mentioned before that gets
invoked *only* if the error reporter config is toggled. In other words,
connector developers would implement both, and based on the error reporter
configuration, one of the put(...) functions would be invoked. This would
remove the confusion as to which method should be implemented since both
are considered functions in use, but for different use cases. If the
connector is deployed on an older version of AK, the original put(...)
would be implemented anyway and there would be no compatibility issues.The
main drawback would be some duplication of code.

Please let me know what you think.

Thanks,
Aakash

On Sat, May 16, 2020 at 9:42 AM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> > I believe it is important to relieve as much of the burden of
> > implementation as possible from the developer in this case, and thus I
> > think using a Callback rather than a Future would be easier on the
> > developer, while adding asynchronous functionality with the ability to
> > opt-in synchronous functionality. I also believe making it opt-in
> > synchronous vs. the other way simplifies implementation for the developer
> > (blocking vs creating a new thread).
>
>
> What's probably important to highlight here is that, sync or async, the
> control needs to return to the connector developer at some point, because
> she'll decide what to do with the fact that the record for the failed sink
> record was reported (e.g. was produced to the DLQ). Before we overstate the
> burden to the connector developer as opposed to the level of control we
> want to offer to her, let's see how that burden would look like in a couple
> of examples:
>
> First, the exact equivalent of your initial proposal (this corresponds
> precisely to what a blocking BiConsumer would do in your example).
>
> public void put(Collection<SinkRecord> sinkRecords, BiFunction<SinkRecord,
> Throwable, Future<Void>> failedRecordsReporter) {
>   for (SinkRecord record : sinkRecords) {
>     try {
>       process(record);
>     } catch (Throwable t) {
>       try {
>         failedRecordsReporter.apply(record, t).get();
>       } catch (InterruptedException | ExecutionException e) {
>         throw new ConnectException(e);
>       }
>     }
>   }
> }
>
> Essentially same lines as your initial example plus the exception handling
> which I added for the sake of completeness and of course it'd be fleshed
> out to separate methods in production code). Basically the burden is:
> failedRecordsReporter.apply(record, t).get(); vs
> failedRecordsReporter.apply(record, t);
>
>
> Additionally, the async equivalent does not differ much:
>
> public void put(Collection<SinkRecord> sinkRecords, BiFunction<SinkRecord,
> Throwable, Future<Void>> failedRecordsReporter) {
>   Map<SinkRecord, Future<Void>> futures = new HashMap<>();
>   for (SinkRecord record : sinkRecords) {
>     try {
>       process(record);
>     } catch (Throwable t) {
>       futures.put(record, failedRecordsReporter.apply(record, t));
>     }
>   }
>
>   futures.forEach((record, future) -> {
>     try {
>       future.get();
>     } catch (InterruptedException | ExecutionException e) {
>       throw new ConnectException(e);
>     }
>   });
> }
>
> And of course the Map doesn't even have to be processed in `put`. It can
> also be processed in `preCommit` or another place in the connector code.
> But let's not divert from the main use case too much.
>
> I don't easily see how an API definition based on callbacks would simplify
> things here. Keep in mind that we already have the threads we need and we'd
> rather not spin additional. That's the Worker thread that runs the sink
> task and the Kafka producer thread. Also, as I said, we need to return
> control to the developer, so the above leads to a more intuitive
> implementation.
>
> I hope this helps.
>
> Konstantine
>
>
> On Fri, May 15, 2020 at 7:01 PM Aakash Shah <as...@confluent.io> wrote:
>
> > Hi Randall and Konstantine,
> >
> > As Chris and Arjun mentioned, I think the main concern is the potential
> gap
> > in which developers don't implement the deprecated method due to a
> > misunderstanding of use cases. Using the setter method approach ensures
> > that the developer won't break backwards compatibility when using the new
> > method due to a mistake. That being said, I think the value added in
> > clarity of contract of when the error reporter will be invoked and
> overall
> > aesthetic while maintaining backwards compatibility outweighs the
> potential
> > mistake of a developer in not implementing the original put(...) method.
> >
> > With respect to synchrony, I agree with Konstantine's point, that we
> should
> > make it an opt-in feature of making the reporter only synchronous. At the
> > same time, I believe it is important to relieve as much of the burden of
> > implementation as possible from the developer in this case, and thus I
> > think using a Callback rather than a Future would be easier on the
> > developer, while adding asynchronous functionality with the ability to
> > opt-in synchronous functionality. I also believe making it opt-in
> > synchronous vs. the other way simplifies implementation for the developer
> > (blocking vs creating a new thread).
> >
> > Please let me know your thoughts. I would like to come to a consensus
> soon
> > due to the AK 2.6 deadlines; I will then shortly update the KIP and
> start a
> > vote.
> >
> > Thanks,
> > Aakash
> >
> > On Fri, May 15, 2020 at 2:24 PM Randall Hauch <rha...@gmail.com> wrote:
> >
> > > On Fri, May 15, 2020 at 3:13 PM Arjun Satish <arjun.sat...@gmail.com>
> > > wrote:
> > >
> > > > Couple of thoughts:
> > > >
> > > > 1. If we add new parameters to put(..), and new connectors implement
> > only
> > > > this method, it makes them backward incompatible with older workers.
> I
> > > > think newer connectors may only choose to only implement the latest
> > > method,
> > > > and we are passing the compatibility problems back to the connector
> > > > developers.
> > > >
> > >
> > > New connectors would have to implement both if they want to run in
> older
> > > runtimes.
> > >
> > >
> > > > 2. if we deprecate the older put() method and eventually remove it,
> > then
> > > > old connectors are forward incompatible. If we are not going to
> remove
> > > it,
> > > > then maybe we should not deprecate it?
> > > >
> > >
> > > I don't think we'll ever remove deprecated methods -- there's no reason
> > to
> > > cut off older connectors.
> > >
> > >
> > > > 3. if a record is realized to be erroneous outside put() (say, in
> flush
> > > or
> > > > preCommit), how will it be reported?
> > > >
> > >
> > > This is a concern no matter how the reporter is passed to the task.
> > > Actually, I think it's more clear that the reporter passed through
> > > `put(...)` should be used to record errors on the SinkRecords passed in
> > the
> > > same method call.
> > >
> > >
> > > >
> > > > I do think the concern over aesthetics is an important one, but the
> > > > trade-off here is to exclude many connectors that are out there from
> > > > running on worker versions. there may be production deployments that
> > need
> > > > one old and one new connector that now cannot work on any version of
> a
> > > > single worker. Building connectors is complex, and it's kinda unfair
> to
> > > > expect folks to make changes over aesthetic reasons alone. This is
> > > probably
> > > > the reason why popular framework APIs very rarely (and probably
> never)
> > > > change.
> > > >
> > >
> > > I don't see how passing the reporter through an overloaded `put(...)`
> is
> > > less backward compatible. Because the runtime provides the SinkTask
> base
> > > class, the runtime has control over what the methods do by default.
> > >
> > >
> > > >
> > > > Overall, yes, the "public void
> > > errantRecordReporter(BiConsumer<SinkRecord,
> > > > Throwable> reporter) {}" proposal in the original KIP is somewhat of
> a
> > > > mouthful, but are there are any simpler alternatives that do not
> > exclude
> > > > existing connectors, adding operational burdens and yet provide a
> clean
> > > > contract?
> > > >
> > >
> > > IMO, overloading `put(...)` is cleaner and easier to understand -- plus
> > the
> > > other benefits in my earlier email.
> > >
> > >
> > > >
> > > > Best,
> > > >
> > > > PS: Apologies if the language is incorrect or some points are
> unclear.
> > > >
> > > > On Fri, May 15, 2020 at 12:02 PM Randall Hauch <rha...@gmail.com>
> > wrote:
> > > >
> > > > > On Fri, May 15, 2020 at 1:45 PM Konstantine Karantasis <
> > > > > konstant...@confluent.io> wrote:
> > > > >
> > > > > > Thanks for the quick response Aakash.
> > > > > >
> > > > > > To your last point, modern APIs like this tend to be asynchronous
> > > (see
> > > > > > admin, producer in Kafka) and such definition results in more
> > > > expressive
> > > > > > and well defined APIs.
> > > > > >
> > > > >
> > > > > +1
> > > > >
> > > > >
> > > > > > What you describe is easily an opt-in feature for the connector
> > > > > developer.
> > > > > > At the same time, the latest description above, gives us better
> > > chances
> > > > > for
> > > > > > this API to remain like this for longer, because it covers both
> the
> > > > sync
> > > > > > and async per `put` user cases.
> > > > >
> > > > >
> > > > > +1
> > > > >
> > > > >
> > > > > > Given how simple the sync implementation
> > > > > > is, just by complying with the return type of the method, I still
> > > think
> > > > > the
> > > > > > BiFunction definition that returns a Future makes sense.
> > > > > >
> > > > > > Konstantine
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, May 15, 2020 at 11:27 AM Aakash Shah <as...@confluent.io
> >
> > > > wrote:
> > > > > >
> > > > > > > Thanks for the additional feedback.
> > > > > > >
> > > > > > > I see the benefits of adding an overloaded put(...) over
> > > alternatives
> > > > > > and I
> > > > > > > am on board going forward with this approach. It will
> definitely
> > > set
> > > > > > forth
> > > > > > > a contract of where the reporter will be used with better
> > > aesthetics.
> > > > > > >
> > > > > > > The original idea of going with a synchronous approach for the
> > > error
> > > > > > > reporter was to ease the connector developer's job interacting
> > with
> > > > and
> > > > > > > handling the error reporter. The tradeoff for having a
> > > > synchronous-only
> > > > > > > reporter would be lower throughput on the reporter; this was
> > > thought
> > > > to
> > > > > > be
> > > > > > > fine since arguably most circumstances would not include
> > > consistently
> > > > > > large
> > > > > > > amounts of records being sent to the error reporter. Even if
> this
> > > was
> > > > > the
> > > > > > > case, an argument can be made that the lower throughput would
> be
> > of
> > > > > > > assistance in this case, as it would allow more time for the
> user
> > > to
> > > > > > > realize the connector is having records sent to the error
> > reporter
> > > > > before
> > > > > > > many are sent. However, if we are strongly in favor of having
> the
> > > > > option
> > > > > > of
> > > > > > > asynchronous functionality available for the developer, then I
> am
> > > > fine
> > > > > > with
> > > > > > > that as well.
> > > > > > >
> > > > > > > Lastly, I am on board with changing the name to
> > > failedRecordReporter,
> > > > > > >
> > > > > > > Please let me know your thoughts.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Aakash
> > > > > > >
> > > > > > > On Fri, May 15, 2020 at 9:10 AM Randall Hauch <
> rha...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Konstantine said:
> > > > > > > >
> > > > > > > > > I notice Randall also used BiFunction in his example, I
> > wonder
> > > if
> > > > > > it's
> > > > > > > > for
> > > > > > > > > similar reasons.
> > > > > > > > >
> > > > > > > >
> > > > > > > > Nope. Just a typo on my part.
> > > > > > > >
> > > > > > > > There appear to be three outstanding questions.
> > > > > > > >
> > > > > > > > First, Konstantine suggested calling this
> > > "failedRecordReporter". I
> > > > > > think
> > > > > > > > this is minor, but using this new name may be a bit more
> > precise
> > > > and
> > > > > > I'd
> > > > > > > be
> > > > > > > > fine with this.
> > > > > > > >
> > > > > > > > Second, should the reporter method be synchronous? I think
> the
> > > two
> > > > > > > options
> > > > > > > > are:
> > > > > > > >
> > > > > > > > 2a. Use `BiConsumer<SinkRecord, Throwable>` that returns
> > nothing
> > > > and
> > > > > > > blocks
> > > > > > > > (at this time).
> > > > > > > > 2b. Use `BiFunction<SinkRecord, Throwable, Future<Void>>`
> that
> > > > > returns
> > > > > > a
> > > > > > > > future that the user can optionally use to be synchronous.
> > > > > > > >
> > > > > > > > I do agree with Konstantine that option 2b gives us more room
> > for
> > > > > > future
> > > > > > > > semantic changes, and since the producer write is already
> > > > > asynchronous
> > > > > > > this
> > > > > > > > should be straightforward to implement. I think the concern
> > here
> > > is
> > > > > > that
> > > > > > > if
> > > > > > > > the sink task does not *use* the future to make this
> > synchronous,
> > > > it
> > > > > is
> > > > > > > > very possible that the error records could be written out of
> > > order
> > > > > (due
> > > > > > > to
> > > > > > > > retries). But this won't be an issue if the implementation
> uses
> > > > > > > > `max.in.flight.requests.per.connection=1` for writing the
> error
> > > > > > records.
> > > > > > > > It's a little less clear, but honestly IMO passing the
> reporter
> > > in
> > > > > the
> > > > > > > > `put(...)` method helps make this lambda easier to
> understand,
> > > for
> > > > > some
> > > > > > > > strange reason. So unless there are good reasons to avoid
> this,
> > > I'd
> > > > > be
> > > > > > in
> > > > > > > > favor of 2b and returning a Future.
> > > > > > > >
> > > > > > > > Third, how do we pass the reporter lambda / method reference
> to
> > > the
> > > > > > task?
> > > > > > > > My proposal to pass the reporter via an overload `put(...)`
> > still
> > > > is
> > > > > > the
> > > > > > > > most attractive to me, for several reasons:
> > > > > > > >
> > > > > > > > 3a. There's no need to pass the reporter separately *and* to
> > > > describe
> > > > > > the
> > > > > > > > changes in method call ordering.
> > > > > > > > 3b. As mentioned above, for some reason passing it via
> > `put(...)`
> > > > > makes
> > > > > > > the
> > > > > > > > intent more clear that it be used when processing the
> > SinkRecord,
> > > > and
> > > > > > > that
> > > > > > > > it shouldn't be used in `start(...)`, `preCommit(...)`,
> > > > > > > > `onPartitionsAssigned(...)`, or any of the other task
> methods.
> > As
> > > > > > Andrew
> > > > > > > > pointed out earlier, *describing* this in the KIP and in
> > JavaDoc
> > > > will
> > > > > > be
> > > > > > > > tough to be exact yet succinct.
> > > > > > > > 3c. There is already precedence for evolving
> > > > > > > > `SourceTask.commitRecord(...)`, and the pattern is identical.
> > > > > > > > 3d. Backward compatibility is easy to understand, and at the
> > same
> > > > > time
> > > > > > > it's
> > > > > > > > pretty easy to describe what implementations that want to
> take
> > > > > > advantage
> > > > > > > of
> > > > > > > > this feature should do.
> > > > > > > > 3e. Minimal changes to the interface: we're just *adding* one
> > > > default
> > > > > > > > method that calls the existing method and deprecating the
> > > existing
> > > > > > > > `put(...)`.
> > > > > > > > 3f. Deprecating the existing `put(...)` makes it more clear
> in
> > a
> > > > > > > > programmatic sense that new sink implementations should use
> the
> > > > > > reporter,
> > > > > > > > and that we recommend old sinks evolve to use it.
> > > > > > > >
> > > > > > > > Some of these benefits apply to some of the other
> suggestions,
> > > but
> > > > I
> > > > > > > think
> > > > > > > > none of the other suggestions have all of these benefits. For
> > > > > example,
> > > > > > > > overloading `initialize(...)` is more difficult since most
> sink
> > > > > > > connectors
> > > > > > > > don't override it and therefore would be less subject to
> > > > deprecations
> > > > > > > > warnings. Overloading `start(...)` is less attractive.
> Adding a
> > > > > method
> > > > > > > IMO
> > > > > > > > shares the fewest of these benefits.
> > > > > > > >
> > > > > > > > The one disadvantage of this approach is that sink task
> > > > > implementations
> > > > > > > > can't rely upon the reporter upon startup. IMO that's an
> > > acceptable
> > > > > > > > tradeoff to get the cleaner and more explicit API, especially
> > if
> > > > the
> > > > > > API
> > > > > > > > contract is that Connect will pass the same reporter instance
> > to
> > > > each
> > > > > > > call
> > > > > > > > to `put(...)` on a single task instance.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Randall
> > > > > > > >
> > > > > > > > On Fri, May 15, 2020 at 6:59 AM Andrew Schofield <
> > > > > > > > andrew_schofi...@live.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > Randall's suggestion is really good. I think it gives the
> > > > > flexibility
> > > > > > > > > required and also
> > > > > > > > > keeps the interface the right way round.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Andrew Schofield
> > > > > > > > >
> > > > > > > > > On 15/05/2020, 02:07, "Aakash Shah" <as...@confluent.io>
> > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Randall,
> > > > > > > > > >
> > > > > > > > > > Thanks for the feedback.
> > > > > > > > > >
> > > > > > > > > > 1. This is a great suggestion, but I find that adding an
> > > > > overloaded
> > > > > > > > > > put(...) which essentially deprecates the old put(...) to
> > > only
> > > > be
> > > > > > > used
> > > > > > > > > when
> > > > > > > > > > a connector is deployed on older versions of Connect adds
> > > > enough
> > > > > > of a
> > > > > > > > > > complication that could cause connectors to break if the
> > old
> > > > > > put(...)
> > > > > > > > > > doesn't correctly invoke the overloaded put(...); either
> > > that,
> > > > or
> > > > > > it
> > > > > > > > will
> > > > > > > > > > add duplication of functionality across the two put(...)
> > > > > methods. I
> > > > > > > > think
> > > > > > > > > > the older method simplifies things with the idea that a
> > > > DLQ/error
> > > > > > > > > reporter
> > > > > > > > > > will or will not be passed into the method depending on
> the
> > > > > version
> > > > > > > of
> > > > > > > > > AK.
> > > > > > > > > > However, I also understand the aesthetic advantage of
> this
> > > > method
> > > > > > vs
> > > > > > > > the
> > > > > > > > > > setter method, so I am okay with going in this direction
> if
> > > > > others
> > > > > > > > agree
> > > > > > > > > > with adding the overloaded put(...).
> > > > > > > > > >
> > > > > > > > > > 2. Yes, your assumption is correct. Yes, we can remove
> the
> > > > "Order
> > > > > > of
> > > > > > > > > > Operations" if we go with the overloaded put(...)
> > direction.
> > > > > > > > > >
> > > > > > > > > > 3. Great point, I will remove them from the KIP.
> > > > > > > > > >
> > > > > > > > > > 4. Yeah, accept(...) will be synchronous. I will change
> it
> > to
> > > > be
> > > > > > > > clearer,
> > > > > > > > > > thanks.
> > > > > > > > > >
> > > > > > > > > > 5. This KIP will use existing metrics as well introduce
> new
> > > > > > metrics.
> > > > > > > I
> > > > > > > > > will
> > > > > > > > > > update this section to fully specify the metrics.
> > > > > > > > > >
> > > > > > > > > > Please let me know what you think.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Aakash
> > > > > > > > > >
> > > > > > > > > > On Thu, May 14, 2020 at 3:52 PM Randall Hauch <
> > > > rha...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, Aakash.
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP. Connect does need an improved
> ability
> > > for
> > > > > > sink
> > > > > > > > > > > connectors to report individual records as being
> > > problematic,
> > > > > and
> > > > > > > > this
> > > > > > > > > > > integrates nicely with the existing DLQ feature.
> > > > > > > > > > >
> > > > > > > > > > > I also appreciate the desire to maintain compatibility
> so
> > > > that
> > > > > > > > > connectors
> > > > > > > > > > > can take advantage of this feature when deployed in a
> > > runtime
> > > > > > that
> > > > > > > > > supports
> > > > > > > > > > > this feature, but can safely and easily do without the
> > > > feature
> > > > > > when
> > > > > > > > > > > deployed to an older runtime. But I do understand
> > Andrew's
> > > > > > concern
> > > > > > > > > about
> > > > > > > > > > > the aesthetics. Have you considered overloading the
> > > > `put(...)`
> > > > > > > method
> > > > > > > > > and
> > > > > > > > > > > adding the `reporter` as a second parameter?
> Essentially
> > it
> > > > > would
> > > > > > > add
> > > > > > > > > the
> > > > > > > > > > > one method (with proper JavaDoc) to `SinkTask` only:
> > > > > > > > > > >
> > > > > > > > > > > ```
> > > > > > > > > > >     public void put(Collection<SinkRecord> records,
> > > > > > > > > BiFunction<SinkRecord,
> > > > > > > > > > > Throwable> reporter) {
> > > > > > > > > > >         put(records);
> > > > > > > > > > >     }
> > > > > > > > > > > ```
> > > > > > > > > > > and the WorkerSinkTask would be changed to call
> > > > > `put(Collection,
> > > > > > > > > > > BiFunction)` instead.
> > > > > > > > > > >
> > > > > > > > > > > Sink connector implementations that don't do anything
> > > > different
> > > > > > can
> > > > > > > > > still
> > > > > > > > > > > override `put(Collection)`, and it still works as
> before.
> > > > > > > Developers
> > > > > > > > > that
> > > > > > > > > > > want to change their sink connector implementations to
> > > > support
> > > > > > this
> > > > > > > > new
> > > > > > > > > > > feature would do the following, which would work in
> older
> > > and
> > > > > > newer
> > > > > > > > > Connect
> > > > > > > > > > > runtimes:
> > > > > > > > > > > ```
> > > > > > > > > > >     public void put(Collection<SinkRecord> records) {
> > > > > > > > > > >         put(records, null);
> > > > > > > > > > >     }
> > > > > > > > > > >     public void put(Collection<SinkRecord> records,
> > > > > > > > > BiFunction<SinkRecord,
> > > > > > > > > > > Throwable> reporter) {
> > > > > > > > > > >         // the normal `put(Collection)` logic goes
> here,
> > > but
> > > > > can
> > > > > > > > > optionally
> > > > > > > > > > > use `reporter` if non-null
> > > > > > > > > > >     }
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > I think this has all the same benefits of the current
> > KIP,
> > > > but
> > > > > > > > > > > it's noticeably simpler and hopefully more
> aesthetically
> > > > > > pleasing.
> > > > > > > > > > >
> > > > > > > > > > > As for Andrew's second concern about "the task can send
> > > > errant
> > > > > > > > records
> > > > > > > > > to
> > > > > > > > > > > it within put(...)" being too restrictive. My guess is
> > that
> > > > > this
> > > > > > > was
> > > > > > > > > more
> > > > > > > > > > > an attempt at describing the basic behavior, and less
> > about
> > > > > > > requiring
> > > > > > > > > the
> > > > > > > > > > > reporter only being called within the `put(...)` method
> > and
> > > > not
> > > > > > by
> > > > > > > > > methods
> > > > > > > > > > > to which `put(...)` synchronously or asynchronously
> > > > delegates.
> > > > > > Can
> > > > > > > > you
> > > > > > > > > > > confirm whether my assumption is correct? If so, then
> > > perhaps
> > > > > my
> > > > > > > > > suggestion
> > > > > > > > > > > helps work around this issue as well, since there would
> > be
> > > no
> > > > > > > > > restriction
> > > > > > > > > > > on when the reporter is called, and the whole "Order of
> > > > > > Operations"
> > > > > > > > > section
> > > > > > > > > > > could potentially be removed.
> > > > > > > > > > >
> > > > > > > > > > > Third, it's not clear to me why the "Error Reporter
> > Object"
> > > > > > > > subsection
> > > > > > > > > in
> > > > > > > > > > > the "Proposal" section lists the worker configuration
> > > > > properties
> > > > > > > that
> > > > > > > > > were
> > > > > > > > > > > previously introduced with
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> > > > > > > > > > > .
> > > > > > > > > > > Maybe it's worth mentioning that the error reporter
> > > > > functionality
> > > > > > > > will
> > > > > > > > > > > reuse or build upon KIP-298, including reusing the
> > > > > configuration
> > > > > > > > > properties
> > > > > > > > > > > defined in KIP-298. But IIUC, the KIP does not propose
> > > > changing
> > > > > > any
> > > > > > > > > > > technical or semantic aspect of these configuration
> > > > properties,
> > > > > > and
> > > > > > > > > > > therefore the KIP would be more clear and succinct
> > without
> > > > > them.
> > > > > > > > > *That* the
> > > > > > > > > > > error reporter will use these properties is part of the
> > UX
> > > > and
> > > > > > > > > therefore
> > > > > > > > > > > necessary to mention, but *how* it uses those
> properties
> > is
> > > > > > really
> > > > > > > up
> > > > > > > > > to
> > > > > > > > > > > the implementation.
> > > > > > > > > > >
> > > > > > > > > > > Fourth, the "Synchrony" section has a sentence that is
> > > > > confusing,
> > > > > > > or
> > > > > > > > > not as
> > > > > > > > > > > clear as it could be.
> > > > > > > > > > >
> > > > > > > > > > >     "If a record is sent to the error reporter,
> > processing
> > > of
> > > > > the
> > > > > > > > next
> > > > > > > > > > > errant record in accept(...) will not begin until the
> > > > producer
> > > > > > > > > successfully
> > > > > > > > > > > sends the errant record to Kafka."
> > > > > > > > > > >
> > > > > > > > > > > This sentence is a bit difficult to understand, but
> IIUC
> > > this
> > > > > > > really
> > > > > > > > > just
> > > > > > > > > > > means that "accept(...)" will be synchronous and will
> > block
> > > > > until
> > > > > > > the
> > > > > > > > > > > errant record has been successfully written to Kafka.
> If
> > > so,
> > > > > > let's
> > > > > > > > say
> > > > > > > > > > > that. The rest of the paragraph is fine.
> > > > > > > > > > >
> > > > > > > > > > > Finally, is this KIP proposing new metrics, or that
> > > existing
> > > > > > > metrics
> > > > > > > > > would
> > > > > > > > > > > be used to track the error reporter usage? If the
> former,
> > > > then
> > > > > > > please
> > > > > > > > > > > fully-specify what these metrics will be, similarly to
> > how
> > > > > > metrics
> > > > > > > > are
> > > > > > > > > > > specified in
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> > > > > > > > > > > .
> > > > > > > > > > >
> > > > > > > > > > > Thoughts?
> > > > > > > > > > >
> > > > > > > > > > > Best regards,
> > > > > > > > > > >
> > > > > > > > > > > Randall
> > > > > > > > > > >
> > > > > > > > > > > On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
> > > > > > > > > > > andrew_schofi...@live.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Aakash,
> > > > > > > > > > > > Thanks for sorting out the replies to the mailing
> list.
> > > > > > > > > > > >
> > > > > > > > > > > > First, I do like the idea of improving error
> reporting
> > in
> > > > > sink
> > > > > > > > > > > connectors.
> > > > > > > > > > > > I'd like a simple
> > > > > > > > > > > > way to put bad records onto the DLQ.
> > > > > > > > > > > >
> > > > > > > > > > > > I think this KIP is considerably more complicated
> than
> > it
> > > > > > seems.
> > > > > > > > The
> > > > > > > > > > > > guidance on the
> > > > > > > > > > > > SinkTask.put() method is that it should send the
> > records
> > > > > > > > > asynchronously
> > > > > > > > > > > > and immediately
> > > > > > > > > > > > return, so the task is likely to want to report
> errors
> > > > > > > > asynchronously
> > > > > > > > > > > > too.  Currently the KIP
> > > > > > > > > > > > states that "the task can send errant records to it
> > > within
> > > > > > > > put(...)"
> > > > > > > > > and
> > > > > > > > > > > > that's too restrictive.
> > > > > > > > > > > > The task ought to be able to report any unflushed
> > > records,
> > > > > but
> > > > > > > the
> > > > > > > > > > > > synchronisation of this is going
> > > > > > > > > > > > to be tricky. I suppose the connector author needs to
> > > make
> > > > > sure
> > > > > > > > that
> > > > > > > > > all
> > > > > > > > > > > > errant records have
> > > > > > > > > > > > been reported before returning control from
> > > > > SinkTask.flush(...)
> > > > > > > or
> > > > > > > > > > > perhaps
> > > > > > > > > > > > SinkTask.preCommit(...).
> > > > > > > > > > > >
> > > > > > > > > > > > I think the interface is a little strange too. I can
> > see
> > > > that
> > > > > > > this
> > > > > > > > > was
> > > > > > > > > > > > done so it's possible to deliver a connector
> > > > > > > > > > > > that supports error reporting but it can also work in
> > > > earlier
> > > > > > > > > versions of
> > > > > > > > > > > > the KC runtime. But, the
> > > > > > > > > > > > pattern so far is that the task uses the methods of
> > > > > > > SinkTaskContext
> > > > > > > > > to
> > > > > > > > > > > > access utilities in the Kafka
> > > > > > > > > > > > Connect runtime, and I suggest that reporting a bad
> > > record
> > > > is
> > > > > > > such
> > > > > > > > a
> > > > > > > > > > > > utility. SinkTaskContext has
> > > > > > > > > > > > changed before when the configs() methods was added,
> > so I
> > > > > think
> > > > > > > > > there is
> > > > > > > > > > > > precedent for adding a method.
> > > > > > > > > > > > The way the KIP adds a method to SinkTask that the KC
> > > > runtime
> > > > > > > calls
> > > > > > > > > to
> > > > > > > > > > > > provide the error reporting utility
> > > > > > > > > > > > seems not to match what has gone before.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Andrew
> > > > > > > > > > > >
> > > > > > > > > > > > On 11/05/2020, 19:05, "Aakash Shah" <
> > as...@confluent.io>
> > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >     I wasn't previously added to the dev mailing
> list,
> > so
> > > > I'd
> > > > > > > like
> > > > > > > > to
> > > > > > > > > > > post
> > > > > > > > > > > > my
> > > > > > > > > > > >     discussion with Andrew Schofield below for
> > visibility
> > > > and
> > > > > > > > further
> > > > > > > > > > > >     discussion:
> > > > > > > > > > > >
> > > > > > > > > > > >     Hi Andrew,
> > > > > > > > > > > >
> > > > > > > > > > > >     Thanks for the reply. The main concern with this
> > > > approach
> > > > > > > would
> > > > > > > > > be
> > > > > > > > > > > its
> > > > > > > > > > > >     backward compatibility. I’ve highlighted the
> > thoughts
> > > > > > around
> > > > > > > > the
> > > > > > > > > > > > backwards
> > > > > > > > > > > >     compatibility of the initial approach, please let
> > me
> > > > know
> > > > > > > what
> > > > > > > > > you
> > > > > > > > > > > > think.
> > > > > > > > > > > >
> > > > > > > > > > > >     Thanks,
> > > > > > > > > > > >     Aakash
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > > > > > > > > > > >
> > > > > > > > > > > >     Hi,
> > > > > > > > > > > >     By adding a new method to the SinkContext
> interface
> > > in
> > > > > say
> > > > > > > > Kafka
> > > > > > > > > > > 2.6, a
> > > > > > > > > > > >     connector that calls it would require a Kafka 2.6
> > > > connect
> > > > > > > > > runtime. I
> > > > > > > > > > > > don't
> > > > > > > > > > > >     quite see how that's a backward compatibility
> > > problem.
> > > > > It's
> > > > > > > > just
> > > > > > > > > that
> > > > > > > > > > > > new
> > > > > > > > > > > >     connectors need the latest interface. I might not
> > > quite
> > > > > be
> > > > > > > > > > > > understanding,
> > > > > > > > > > > >     but I think it would be fine.
> > > > > > > > > > > >
> > > > > > > > > > > >     Thanks,
> > > > > > > > > > > >     Andrew
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > > > > > > > > > > >
> > > > > > > > > > > >     Hi Andrew,
> > > > > > > > > > > >
> > > > > > > > > > > >     I apologize for the way the reply was sent. I
> just
> > > > > > subscribed
> > > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > dev
> > > > > > > > > > > >     mailing list so it should be resolved now.
> > > > > > > > > > > >
> > > > > > > > > > > >     You are correct, new connectors would simply
> > require
> > > > the
> > > > > > > latest
> > > > > > > > > > > > interface.
> > > > > > > > > > > >     However, we want to remove that requirement - in
> > > other
> > > > > > words,
> > > > > > > > we
> > > > > > > > > want
> > > > > > > > > > > > to
> > > > > > > > > > > >     allow the possibility that someone wants the
> latest
> > > > > > > > connector/to
> > > > > > > > > > > > upgrade to
> > > > > > > > > > > >     the latest version, but deploys it on an older
> > > version
> > > > of
> > > > > > AK.
> > > > > > > > > > > > Basically, we
> > > > > > > > > > > >     don't want to enforce the necessity of upgrading
> AK
> > > to
> > > > > get
> > > > > > > the
> > > > > > > > > latest
> > > > > > > > > > > >     interface. In the current approach, there would
> be
> > no
> > > > > issue
> > > > > > > of
> > > > > > > > > > > > deploying a
> > > > > > > > > > > >     new connector on an older version of AK, as the
> > > Connect
> > > > > > > > framework
> > > > > > > > > > > would
> > > > > > > > > > > >     simply not invoke the new method.
> > > > > > > > > > > >
> > > > > > > > > > > >     Please let me know what you think and if I need
> to
> > > > > clarify
> > > > > > > > > anything.
> > > > > > > > > > > >
> > > > > > > > > > > >     Thanks,
> > > > > > > > > > > >     Aakash
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to