Hi Arjun,

Thanks for this suggestion. I actually like this a lot because a defined
interface looks more appealing and is clearer in its intention. Since we
are still using NoSuchMethodException to account for backwards
compatibility, this works for me. I can't see any drawbacks besides having
to call the getter method for the processing of every errant record.

I would like to hear others' thoughts on if this drawback outweighs the
added benefit of clarity.

Thanks,
Aakash

On Sat, May 16, 2020 at 2:54 PM Arjun Satish <arjun.sat...@gmail.com> wrote:

> Thanks Konstantine, happy to write something up in a KIP. But I think it
> would be redundant if we add this kip. What do you think?
>
> Also, Randall, yes that API would work. But, if we expect the developers to
> catch NoSuchMethodErrors, then should we also go ahead and make a class
> that would have a report method(similar to ErrorReporter
> <
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
> >
> but maybe with different arguments on the report method)? they would also
> have to catch NoClassDefFoundError.
>
> That would modify your change in SinkTaskContext to:
>
> public interface SinkTaskContext {
>     /**
>      * Get the reporter to which the sink task can report problematic or
>      * failed {@link SinkRecord} passed to the {@link
> SinkTask#put(Collection)} method.
>      *
>      * @return a errant record reporter
>      */
>     ErrantRecordReporter failedRecordReporter();
> }
>
> where ErrantRecordReporter is:
>
> public interface ErrantRecordReporter {
>
>     /**
>      * Serialize and produce records to the error topic
>      *
>      * @param record the errant record
>      */
>     void report(SinkRecord record, Throwable error);
>
> }
>
> Usage in put would be the same though if the class is not explicitly named:
>
>         try {
>             context.failedRecordReporter().report(record, error);
>         } catch (NoSuchMethodError e) {
>             log.info("Boooooooooo!");
>         }
>
> Thoughts?
>
> On Sat, May 16, 2020 at 12:46 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Thanks for following up Randall.
> >
> > I agree with your latest suggestion. It was good that we explored several
> > options but accessing the context to obtain the reporter in Kafka Connect
> > versions that support this feature makes the most sense. The burden for
> > connector developers that want to use this reporter _and_ make connectors
> > compatible with old and new workers is minimal.
> >
> > We'll have to leave with additions like this and the appearance in both
> > KIP-131 and here in KIP-610 indeed creates a reasonable precedent.
> >
> > Konstantine
> >
> >
> > On Sat, May 16, 2020 at 12:34 PM Randall Hauch <rha...@gmail.com> wrote:
> >
> > > Thanks again for the active discussion!
> > >
> > > Regarding the future-vs-callback discussion: I did like where Chris was
> > > going with the Callback, but he raises good point that it's unclear
> what
> > to
> > > use for the reporter type, since we'd need three parameters.
> Introducing
> > a
> > > new interface makes it much harder for a sink task to be backward
> > > compatible, so sticking with BiFunction is a good compromise. Plus,
> > another
> > > significant disadvantage of a callback approach is that a sink task's
> > > callback is called from the producer thread, and this risks a
> > > poorly written sink task callback killing the reporter's producer
> without
> > > necessarily failing the task. Using a future avoids this risk
> altogether,
> > > still provides the sink task with the ability to do synchronous
> reporting
> > > using Future, which is a standard and conventional design pattern. So
> we
> > do
> > > seem to have converged on using `BiFunction<SinkRecord, Throwable,
> > > Future<Void>>` for the reporter type.
> > >
> > > Now, we still seem to not have converted upon how to pass the reporter
> to
> > > the sink task. I agree with Konstantine that the deprecation affects
> only
> > > newer versions of Connect, and that a sink task should deal with both
> put
> > > methods only when it wants to support older runtimes. I also think that
> > > this is a viable approach, but I do concede that this evolution of the
> > sink
> > > task API is more complicated than it should be.
> > >
> > > In the interest of quickly coming to consensus on how we pass the
> > reporter
> > > to the sink task, I'd like to go back to Andrew's original suggestion,
> > > which I think we disregarded too quickly: add a getter on the
> > > SinkTaskContext interface. We already have precedent for adding methods
> > to
> > > one of the context classes with the newly-adopted KIP-131, which adds a
> > > getter for the OffsetStorageReader on the (new) SourceConnectorContext.
> > > That KIP accepts the fact that a source connector wanting to use this
> > > feature while also keeping the ability to be installed into older
> Connect
> > > runtimes must guard its use of the context's getter method.
> > >
> > > I think we can use the same pattern for this KIP, and add a getter to
> the
> > > existing SinkTaskContext that is defined something like:
> > >
> > > public interface SinkTaskContext {
> > >     ...
> > >     /**
> > >      * Get the reporter to which the sink task can report problematic
> or
> > > failed {@link SinkRecord}
> > >      * passed to the {@link SinkTask#put(Collection)} method. When
> > > reporting a failed record,
> > >      * the sink task will receive a {@link Future} that the task can
> > > optionally use to wait until
> > >      * the failed record and exception have been written to Kafka via
> > > Connect's DLQ. Note that
> > >      * the result of this method may be null if this connector has not
> > been
> > > configured with a DLQ.
> > >      *
> > >      * <p>This method was added in Apache Kafka 2.9. Sink tasks that
> use
> > > this method but want to
> > >      * maintain backward compatibility so they can also be deployed to
> > > older Connect runtimes
> > >      * should guard the call to this method with a try-catch block,
> since
> > > calling this method will result in a
> > >      * {@link NoSuchMethodException} when the sink connector is
> deployed
> > to
> > > Connect runtimes
> > >      * older than Kafka 2.9. For example:
> > >      * <pre>
> > >      *     BiFunction&lt;SinkTask, Throwable, Future&lt;Void&gt;&gt;
> > > reporter;
> > >      *     try {
> > >      *         reporter = context.failedRecordReporter();
> > >      *     } catch (NoSuchMethodException e) {
> > >      *         reporter = null;
> > >      *     }
> > >      * </pre>
> > >      *
> > >      * @return the reporter function; null if no error reporter has
> been
> > > configured for the connector
> > >      * @since 2.9
> > >      */
> > >     BiFunction<SinkTask, Throwable, Future<Void>>
> failedRecordReporter();
> > > }
> > >
> > > The main advantage is that the KIP no longer has to make *any other*
> > > changes to the Sink Connector or Task API. The above is really the only
> > > change, and it's merely an addition to the API. No deprecation and no
> > > overloading methods. The KIP does need to explain how the reporter is
> > > configured and used (which it already does), but IMO the KIP doesn't
> need
> > > to describe  when this reporter can/should be used. After all, this
> > method
> > > is on the existing SinkTaskContext, so this method is really no
> different
> > > than any other existing method. I think my JavaDoc (which is just a
> > > suggestion for a starting point that Aakash can improve as needed)
> > > describes how easy it is for a sink to maintain backward compatibility.
> > > (The use of `BiFunction` helps tremendously.) Another not insignificant
> > > advantage is that a sink task can use this reporter reference
> throughout
> > > the task's lifetime (after it's started and before it is stopped),
> making
> > > it less invasive for existing sink task implementations that want to
> use
> > > it.
> > >
> > > I hope we can all get being this compromise, which IMO is actually
> super
> > > clean and makes a lot of sense. Thanks, Andrew, for originally
> suggesting
> > > it. I know we're all trying to improve the Connect API in a way that
> > makes
> > > sense, and deliberate and constructive discussion is a healthy thing.
> > > Thanks again to everyone for participating!
> > >
> > > BTW, we've agreed upon a number of other changes, but I don't see any
> of
> > > those changes on the KIP. Aakash, can you please update the KIP quickly
> > so
> > > we can make sure the other parts are the KIP are acceptable?
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> > > On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Thanks for the quick response Aakash.
> > > >
> > > > With respect to deprecation, this refers to deprecating this method
> in
> > > > newer versions of Kafka Connect (and eventually removing it).
> > > >
> > > > As a connector developer, if you want your connector to run across a
> > wide
> > > > spectrum of Connect versions, you'll have to take this into
> > consideration
> > > > and retain both methods in a functional state. The good news is that
> > both
> > > > methods can share a lot of code, so in reality both the old and the
> new
> > > put
> > > > will be thin shims over a `putRecord` method (or `process` method as
> > you
> > > > call it in the KIP).
> > > >
> > > > Given the above, there's no requirement to conditionally call one
> > method
> > > or
> > > > the other in the framework based on configuration. Once you implement
> > the
> > > > new `put` with something other than its default implementation, as a
> > > > connector developer, you'll know to adapt to the above.
> > > >
> > > > I definitely suggest extending our docs in a meaningful way in order
> to
> > > > make the upgrade path easy to follow. Maybe you'd like to add a note
> to
> > > > your compatibility section in this KIP as well.
> > > >
> > > > Regards,
> > > > Konstantine
> > > >
> > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah <as...@confluent.io>
> > wrote:
> > > >
> > > > > +1
> > > > >
> > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis <
> > > > > konstant...@confluent.io> wrote:
> > > > >
> > > > > > Hi Arjun,
> > > > > >
> > > > > > I think I agree with you that subject is interesting. Yet, I feel
> > it
> > > > > > belongs to a separate future KIP. Reading the proposal in the KIP
> > > > format
> > > > > > will help, at least myself, to understand it better.
> > > > > >
> > > > > > Having said that, for the purpose of simplifying error handling
> for
> > > > sink
> > > > > > tasks, the discussion on KIP-610 has made some good progress on
> the
> > > > > mailing
> > > > > > list. If the few open items are reflected on the proposal, maybe
> > it'd
> > > > be
> > > > > > even worthwhile to consider it for inclusion in the upcoming
> > release
> > > > with
> > > > > > its current scope.
> > > > > >
> > > > > > Konstantine
> > > > > >
> > > > > >
> > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish <
> > arjun.sat...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I'm kinda hoping that we get to an approach on how to extend
> the
> > > > > Connect
> > > > > > > framework. Adding parameters in the put method is nice, and
> maybe
> > > > works
> > > > > > for
> > > > > > > now, but I'm not sure how scalable it is. It'd great to be able
> > to
> > > > add
> > > > > > more
> > > > > > > functionality in the future. Couple of examples:
> > > > > > >
> > > > > > > - make the metrics registry available to a task, so they can
> > report
> > > > > task
> > > > > > > level metrics or
> > > > > > > - be able to pass in a RestExtension handle to the task, so the
> > > task
> > > > > can
> > > > > > > provide a rest endpoint which users can hit to get some task
> > level
> > > > > > > information (about its status, health, for example)
> > > > > > >
> > > > > > > In such scenarios, maybe adding new parameters to existing
> > methods
> > > > may
> > > > > > not
> > > > > > > be immediately acceptable.
> > > > > > >
> > > > > > > Since we are very close to a deadline, I wanted to check if the
> > one
> > > > > more
> > > > > > > possibility is acceptable :-)
> > > > > > >
> > > > > > > What if we could create a library that could be used by
> connector
> > > to
> > > > > > > independently integrated by connector developers in their
> > > connectors.
> > > > > The
> > > > > > > library would be packaged and shipped with their connector like
> > any
> > > > > other
> > > > > > > library on maven (and other similar repositories). The new
> module
> > > > would
> > > > > > be
> > > > > > > in the AK project, but its jars will *not* be added to
> classpath
> > > for
> > > > > > > Connect worker.
> > > > > > >
> > > > > > > The library would provide a public interface for an error
> > reporter,
> > > > > which
> > > > > > > provides both synchronous and asynchronous functionalities (as
> > was
> > > > > > brought
> > > > > > > up above).
> > > > > > >
> > > > > > > This would be an independent library, they can be easily
> bundled
> > > and
> > > > > > loaded
> > > > > > > with the other connectors. The connect framework will be
> > decoupled
> > > > from
> > > > > > > this utility.
> > > > > > >
> > > > > > > I understand that a similar option is in the rejected
> > alternatives,
> > > > > > mostly
> > > > > > > because of configuration overhead, but the configuration
> required
> > > > here
> > > > > > can
> > > > > > > come directly from the worker properties (and just be copy
> pasted
> > > > from
> > > > > > > there, maybe with a prefix). and I wonder (if maybe part as a
> > > future
> > > > > > KIP),
> > > > > > > we can evaluate a strategy where certain worker configs can be
> > > passed
> > > > > to
> > > > > > a
> > > > > > > connector (for example, the producer/consume/admin ones), so
> end
> > > > users
> > > > > do
> > > > > > > not have to.
> > > > > > >
> > > > > > > Overall, we would get clean APIs, contracts and developers get
> > > > freedom
> > > > > to
> > > > > > > use these libraries and functionalities however they want. The
> > only
> > > > > > > drawback is how this is configured (end-users will have to add
> > more
> > > > > lines
> > > > > > > in the json/properties files). But all configs can simply come
> > from
> > > > > > worker,
> > > > > > > I believe this is relatively minor issue. We should be able to
> > work
> > > > out
> > > > > > > compatibility issues in the implementations, so that the
> library
> > > can
> > > > > > safely
> > > > > > > run (and degrade functionality if needed) with old workers.
> > > > > > >
> > > > > > >
> > > > > > > On Fri, May 15, 2020 at 7:04 PM Aakash Shah <
> as...@confluent.io>
> > > > > wrote:
> > > > > > >
> > > > > > > > Just wanted to clarify that I am on board with adding the
> > > > overloaded
> > > > > > > > put(...) method.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Aakash
> > > > > > > >
> > > > > > > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah <
> > as...@confluent.io>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Randall and Konstantine,
> > > > > > > > >
> > > > > > > > > As Chris and Arjun mentioned, I think the main concern is
> the
> > > > > > potential
> > > > > > > > > gap in which developers don't implement the deprecated
> method
> > > due
> > > > > to
> > > > > > a
> > > > > > > > > misunderstanding of use cases. Using the setter method
> > approach
> > > > > > ensures
> > > > > > > > > that the developer won't break backwards compatibility when
> > > using
> > > > > the
> > > > > > > new
> > > > > > > > > method due to a mistake. That being said, I think the value
> > > added
> > > > > in
> > > > > > > > > clarity of contract of when the error reporter will be
> > invoked
> > > > and
> > > > > > > > overall
> > > > > > > > > aesthetic while maintaining backwards compatibility
> outweighs
> > > the
> > > > > > > > potential
> > > > > > > > > mistake of a developer in not implementing the original
> > > put(...)
> > > > > > > method.
> > > > > > > > >
> > > > > > > > > With respect to synchrony, I agree with Konstantine's
> point,
> > > that
> > > > > we
> > > > > > > > > should make it an opt-in feature of making the reporter
> only
> > > > > > > synchronous.
> > > > > > > > > At the same time, I believe it is important to relieve as
> > much
> > > of
> > > > > the
> > > > > > > > > burden of implementation as possible from the developer in
> > this
> > > > > case,
> > > > > > > and
> > > > > > > > > thus I think using a Callback rather than a Future would be
> > > > easier
> > > > > on
> > > > > > > the
> > > > > > > > > developer, while adding asynchronous functionality with the
> > > > ability
> > > > > > to
> > > > > > > > > opt-in synchronous functionality. I also believe making it
> > > opt-in
> > > > > > > > > synchronous vs. the other way simplifies implementation for
> > the
> > > > > > > developer
> > > > > > > > > (blocking vs creating a new thread).
> > > > > > > > >
> > > > > > > > > Please let me know your thoughts. I would like to come to a
> > > > > consensus
> > > > > > > > soon
> > > > > > > > > due to the AK 2.6 deadlines; I will then shortly update the
> > KIP
> > > > and
> > > > > > > > start a
> > > > > > > > > vote.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Aakash
> > > > > > > > >
> > > > > > > > > On Fri, May 15, 2020 at 2:24 PM Randall Hauch <
> > > rha...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> On Fri, May 15, 2020 at 3:13 PM Arjun Satish <
> > > > > > arjun.sat...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > Couple of thoughts:
> > > > > > > > >> >
> > > > > > > > >> > 1. If we add new parameters to put(..), and new
> connectors
> > > > > > implement
> > > > > > > > >> only
> > > > > > > > >> > this method, it makes them backward incompatible with
> > older
> > > > > > > workers. I
> > > > > > > > >> > think newer connectors may only choose to only implement
> > the
> > > > > > latest
> > > > > > > > >> method,
> > > > > > > > >> > and we are passing the compatibility problems back to
> the
> > > > > > connector
> > > > > > > > >> > developers.
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> New connectors would have to implement both if they want
> to
> > > run
> > > > in
> > > > > > > older
> > > > > > > > >> runtimes.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> > 2. if we deprecate the older put() method and eventually
> > > > remove
> > > > > > it,
> > > > > > > > then
> > > > > > > > >> > old connectors are forward incompatible. If we are not
> > going
> > > > to
> > > > > > > remove
> > > > > > > > >> it,
> > > > > > > > >> > then maybe we should not deprecate it?
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> I don't think we'll ever remove deprecated methods --
> > there's
> > > no
> > > > > > > reason
> > > > > > > > to
> > > > > > > > >> cut off older connectors.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> > 3. if a record is realized to be erroneous outside put()
> > > (say,
> > > > > in
> > > > > > > > flush
> > > > > > > > >> or
> > > > > > > > >> > preCommit), how will it be reported?
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> This is a concern no matter how the reporter is passed to
> > the
> > > > > task.
> > > > > > > > >> Actually, I think it's more clear that the reporter passed
> > > > through
> > > > > > > > >> `put(...)` should be used to record errors on the
> > SinkRecords
> > > > > passed
> > > > > > > in
> > > > > > > > >> the
> > > > > > > > >> same method call.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> > I do think the concern over aesthetics is an important
> > one,
> > > > but
> > > > > > the
> > > > > > > > >> > trade-off here is to exclude many connectors that are
> out
> > > > there
> > > > > > from
> > > > > > > > >> > running on worker versions. there may be production
> > > > deployments
> > > > > > that
> > > > > > > > >> need
> > > > > > > > >> > one old and one new connector that now cannot work on
> any
> > > > > version
> > > > > > > of a
> > > > > > > > >> > single worker. Building connectors is complex, and it's
> > > kinda
> > > > > > unfair
> > > > > > > > to
> > > > > > > > >> > expect folks to make changes over aesthetic reasons
> alone.
> > > > This
> > > > > is
> > > > > > > > >> probably
> > > > > > > > >> > the reason why popular framework APIs very rarely (and
> > > > probably
> > > > > > > never)
> > > > > > > > >> > change.
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> I don't see how passing the reporter through an overloaded
> > > > > > `put(...)`
> > > > > > > is
> > > > > > > > >> less backward compatible. Because the runtime provides the
> > > > > SinkTask
> > > > > > > base
> > > > > > > > >> class, the runtime has control over what the methods do by
> > > > > default.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> > Overall, yes, the "public void
> > > > > > > > >> errantRecordReporter(BiConsumer<SinkRecord,
> > > > > > > > >> > Throwable> reporter) {}" proposal in the original KIP is
> > > > > somewhat
> > > > > > > of a
> > > > > > > > >> > mouthful, but are there are any simpler alternatives
> that
> > do
> > > > not
> > > > > > > > exclude
> > > > > > > > >> > existing connectors, adding operational burdens and yet
> > > > provide
> > > > > a
> > > > > > > > clean
> > > > > > > > >> > contract?
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >> IMO, overloading `put(...)` is cleaner and easier to
> > > understand
> > > > --
> > > > > > > plus
> > > > > > > > >> the
> > > > > > > > >> other benefits in my earlier email.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> > Best,
> > > > > > > > >> >
> > > > > > > > >> > PS: Apologies if the language is incorrect or some
> points
> > > are
> > > > > > > unclear.
> > > > > > > > >> >
> > > > > > > > >> > On Fri, May 15, 2020 at 12:02 PM Randall Hauch <
> > > > > rha...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >> >
> > > > > > > > >> > > On Fri, May 15, 2020 at 1:45 PM Konstantine
> Karantasis <
> > > > > > > > >> > > konstant...@confluent.io> wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Thanks for the quick response Aakash.
> > > > > > > > >> > > >
> > > > > > > > >> > > > To your last point, modern APIs like this tend to be
> > > > > > > asynchronous
> > > > > > > > >> (see
> > > > > > > > >> > > > admin, producer in Kafka) and such definition
> results
> > in
> > > > > more
> > > > > > > > >> > expressive
> > > > > > > > >> > > > and well defined APIs.
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> > > +1
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > > What you describe is easily an opt-in feature for
> the
> > > > > > connector
> > > > > > > > >> > > developer.
> > > > > > > > >> > > > At the same time, the latest description above,
> gives
> > us
> > > > > > better
> > > > > > > > >> chances
> > > > > > > > >> > > for
> > > > > > > > >> > > > this API to remain like this for longer, because it
> > > covers
> > > > > > both
> > > > > > > > the
> > > > > > > > >> > sync
> > > > > > > > >> > > > and async per `put` user cases.
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > +1
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > > Given how simple the sync implementation
> > > > > > > > >> > > > is, just by complying with the return type of the
> > > method,
> > > > I
> > > > > > > still
> > > > > > > > >> think
> > > > > > > > >> > > the
> > > > > > > > >> > > > BiFunction definition that returns a Future makes
> > sense.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Konstantine
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Fri, May 15, 2020 at 11:27 AM Aakash Shah <
> > > > > > > as...@confluent.io>
> > > > > > > > >> > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Thanks for the additional feedback.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > I see the benefits of adding an overloaded
> put(...)
> > > over
> > > > > > > > >> alternatives
> > > > > > > > >> > > > and I
> > > > > > > > >> > > > > am on board going forward with this approach. It
> > will
> > > > > > > definitely
> > > > > > > > >> set
> > > > > > > > >> > > > forth
> > > > > > > > >> > > > > a contract of where the reporter will be used with
> > > > better
> > > > > > > > >> aesthetics.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > The original idea of going with a synchronous
> > approach
> > > > for
> > > > > > the
> > > > > > > > >> error
> > > > > > > > >> > > > > reporter was to ease the connector developer's job
> > > > > > interacting
> > > > > > > > >> with
> > > > > > > > >> > and
> > > > > > > > >> > > > > handling the error reporter. The tradeoff for
> > having a
> > > > > > > > >> > synchronous-only
> > > > > > > > >> > > > > reporter would be lower throughput on the
> reporter;
> > > this
> > > > > was
> > > > > > > > >> thought
> > > > > > > > >> > to
> > > > > > > > >> > > > be
> > > > > > > > >> > > > > fine since arguably most circumstances would not
> > > include
> > > > > > > > >> consistently
> > > > > > > > >> > > > large
> > > > > > > > >> > > > > amounts of records being sent to the error
> reporter.
> > > > Even
> > > > > if
> > > > > > > > this
> > > > > > > > >> was
> > > > > > > > >> > > the
> > > > > > > > >> > > > > case, an argument can be made that the lower
> > > throughput
> > > > > > would
> > > > > > > be
> > > > > > > > >> of
> > > > > > > > >> > > > > assistance in this case, as it would allow more
> time
> > > for
> > > > > the
> > > > > > > > user
> > > > > > > > >> to
> > > > > > > > >> > > > > realize the connector is having records sent to
> the
> > > > error
> > > > > > > > reporter
> > > > > > > > >> > > before
> > > > > > > > >> > > > > many are sent. However, if we are strongly in
> favor
> > of
> > > > > > having
> > > > > > > > the
> > > > > > > > >> > > option
> > > > > > > > >> > > > of
> > > > > > > > >> > > > > asynchronous functionality available for the
> > > developer,
> > > > > > then I
> > > > > > > > am
> > > > > > > > >> > fine
> > > > > > > > >> > > > with
> > > > > > > > >> > > > > that as well.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Lastly, I am on board with changing the name to
> > > > > > > > >> failedRecordReporter,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Please let me know your thoughts.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Thanks,
> > > > > > > > >> > > > > Aakash
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Fri, May 15, 2020 at 9:10 AM Randall Hauch <
> > > > > > > rha...@gmail.com
> > > > > > > > >
> > > > > > > > >> > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > Konstantine said:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > > I notice Randall also used BiFunction in his
> > > > example,
> > > > > I
> > > > > > > > >> wonder if
> > > > > > > > >> > > > it's
> > > > > > > > >> > > > > > for
> > > > > > > > >> > > > > > > similar reasons.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Nope. Just a typo on my part.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > There appear to be three outstanding questions.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > First, Konstantine suggested calling this
> > > > > > > > >> "failedRecordReporter". I
> > > > > > > > >> > > > think
> > > > > > > > >> > > > > > this is minor, but using this new name may be a
> > bit
> > > > more
> > > > > > > > precise
> > > > > > > > >> > and
> > > > > > > > >> > > > I'd
> > > > > > > > >> > > > > be
> > > > > > > > >> > > > > > fine with this.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Second, should the reporter method be
> > synchronous? I
> > > > > think
> > > > > > > the
> > > > > > > > >> two
> > > > > > > > >> > > > > options
> > > > > > > > >> > > > > > are:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 2a. Use `BiConsumer<SinkRecord, Throwable>` that
> > > > returns
> > > > > > > > nothing
> > > > > > > > >> > and
> > > > > > > > >> > > > > blocks
> > > > > > > > >> > > > > > (at this time).
> > > > > > > > >> > > > > > 2b. Use `BiFunction<SinkRecord, Throwable,
> > > > > Future<Void>>`
> > > > > > > that
> > > > > > > > >> > > returns
> > > > > > > > >> > > > a
> > > > > > > > >> > > > > > future that the user can optionally use to be
> > > > > synchronous.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > I do agree with Konstantine that option 2b gives
> > us
> > > > more
> > > > > > > room
> > > > > > > > >> for
> > > > > > > > >> > > > future
> > > > > > > > >> > > > > > semantic changes, and since the producer write
> is
> > > > > already
> > > > > > > > >> > > asynchronous
> > > > > > > > >> > > > > this
> > > > > > > > >> > > > > > should be straightforward to implement. I think
> > the
> > > > > > concern
> > > > > > > > >> here is
> > > > > > > > >> > > > that
> > > > > > > > >> > > > > if
> > > > > > > > >> > > > > > the sink task does not *use* the future to make
> > this
> > > > > > > > >> synchronous,
> > > > > > > > >> > it
> > > > > > > > >> > > is
> > > > > > > > >> > > > > > very possible that the error records could be
> > > written
> > > > > out
> > > > > > of
> > > > > > > > >> order
> > > > > > > > >> > > (due
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > retries). But this won't be an issue if the
> > > > > implementation
> > > > > > > > uses
> > > > > > > > >> > > > > > `max.in.flight.requests.per.connection=1` for
> > > writing
> > > > > the
> > > > > > > > error
> > > > > > > > >> > > > records.
> > > > > > > > >> > > > > > It's a little less clear, but honestly IMO
> passing
> > > the
> > > > > > > > reporter
> > > > > > > > >> in
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > `put(...)` method helps make this lambda easier
> to
> > > > > > > understand,
> > > > > > > > >> for
> > > > > > > > >> > > some
> > > > > > > > >> > > > > > strange reason. So unless there are good reasons
> > to
> > > > > avoid
> > > > > > > > this,
> > > > > > > > >> I'd
> > > > > > > > >> > > be
> > > > > > > > >> > > > in
> > > > > > > > >> > > > > > favor of 2b and returning a Future.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Third, how do we pass the reporter lambda /
> method
> > > > > > reference
> > > > > > > > to
> > > > > > > > >> the
> > > > > > > > >> > > > task?
> > > > > > > > >> > > > > > My proposal to pass the reporter via an overload
> > > > > > `put(...)`
> > > > > > > > >> still
> > > > > > > > >> > is
> > > > > > > > >> > > > the
> > > > > > > > >> > > > > > most attractive to me, for several reasons:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > 3a. There's no need to pass the reporter
> > separately
> > > > > *and*
> > > > > > to
> > > > > > > > >> > describe
> > > > > > > > >> > > > the
> > > > > > > > >> > > > > > changes in method call ordering.
> > > > > > > > >> > > > > > 3b. As mentioned above, for some reason passing
> it
> > > via
> > > > > > > > >> `put(...)`
> > > > > > > > >> > > makes
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > intent more clear that it be used when
> processing
> > > the
> > > > > > > > >> SinkRecord,
> > > > > > > > >> > and
> > > > > > > > >> > > > > that
> > > > > > > > >> > > > > > it shouldn't be used in `start(...)`,
> > > > `preCommit(...)`,
> > > > > > > > >> > > > > > `onPartitionsAssigned(...)`, or any of the other
> > > task
> > > > > > > methods.
> > > > > > > > >> As
> > > > > > > > >> > > > Andrew
> > > > > > > > >> > > > > > pointed out earlier, *describing* this in the
> KIP
> > > and
> > > > in
> > > > > > > > JavaDoc
> > > > > > > > >> > will
> > > > > > > > >> > > > be
> > > > > > > > >> > > > > > tough to be exact yet succinct.
> > > > > > > > >> > > > > > 3c. There is already precedence for evolving
> > > > > > > > >> > > > > > `SourceTask.commitRecord(...)`, and the pattern
> is
> > > > > > > identical.
> > > > > > > > >> > > > > > 3d. Backward compatibility is easy to
> understand,
> > > and
> > > > at
> > > > > > the
> > > > > > > > >> same
> > > > > > > > >> > > time
> > > > > > > > >> > > > > it's
> > > > > > > > >> > > > > > pretty easy to describe what implementations
> that
> > > want
> > > > > to
> > > > > > > take
> > > > > > > > >> > > > advantage
> > > > > > > > >> > > > > of
> > > > > > > > >> > > > > > this feature should do.
> > > > > > > > >> > > > > > 3e. Minimal changes to the interface: we're just
> > > > > *adding*
> > > > > > > one
> > > > > > > > >> > default
> > > > > > > > >> > > > > > method that calls the existing method and
> > > deprecating
> > > > > the
> > > > > > > > >> existing
> > > > > > > > >> > > > > > `put(...)`.
> > > > > > > > >> > > > > > 3f. Deprecating the existing `put(...)` makes it
> > > more
> > > > > > clear
> > > > > > > > in a
> > > > > > > > >> > > > > > programmatic sense that new sink implementations
> > > > should
> > > > > > use
> > > > > > > > the
> > > > > > > > >> > > > reporter,
> > > > > > > > >> > > > > > and that we recommend old sinks evolve to use
> it.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Some of these benefits apply to some of the
> other
> > > > > > > suggestions,
> > > > > > > > >> but
> > > > > > > > >> > I
> > > > > > > > >> > > > > think
> > > > > > > > >> > > > > > none of the other suggestions have all of these
> > > > > benefits.
> > > > > > > For
> > > > > > > > >> > > example,
> > > > > > > > >> > > > > > overloading `initialize(...)` is more difficult
> > > since
> > > > > most
> > > > > > > > sink
> > > > > > > > >> > > > > connectors
> > > > > > > > >> > > > > > don't override it and therefore would be less
> > > subject
> > > > to
> > > > > > > > >> > deprecations
> > > > > > > > >> > > > > > warnings. Overloading `start(...)` is less
> > > attractive.
> > > > > > > Adding
> > > > > > > > a
> > > > > > > > >> > > method
> > > > > > > > >> > > > > IMO
> > > > > > > > >> > > > > > shares the fewest of these benefits.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > The one disadvantage of this approach is that
> sink
> > > > task
> > > > > > > > >> > > implementations
> > > > > > > > >> > > > > > can't rely upon the reporter upon startup. IMO
> > > that's
> > > > an
> > > > > > > > >> acceptable
> > > > > > > > >> > > > > > tradeoff to get the cleaner and more explicit
> API,
> > > > > > > especially
> > > > > > > > if
> > > > > > > > >> > the
> > > > > > > > >> > > > API
> > > > > > > > >> > > > > > contract is that Connect will pass the same
> > reporter
> > > > > > > instance
> > > > > > > > to
> > > > > > > > >> > each
> > > > > > > > >> > > > > call
> > > > > > > > >> > > > > > to `put(...)` on a single task instance.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Best regards,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Randall
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > On Fri, May 15, 2020 at 6:59 AM Andrew
> Schofield <
> > > > > > > > >> > > > > > andrew_schofi...@live.com>
> > > > > > > > >> > > > > > wrote:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > > Hi,
> > > > > > > > >> > > > > > > Randall's suggestion is really good. I think
> it
> > > > gives
> > > > > > the
> > > > > > > > >> > > flexibility
> > > > > > > > >> > > > > > > required and also
> > > > > > > > >> > > > > > > keeps the interface the right way round.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Thanks,
> > > > > > > > >> > > > > > > Andrew Schofield
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > On 15/05/2020, 02:07, "Aakash Shah" <
> > > > > > as...@confluent.io>
> > > > > > > > >> wrote:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > > Hi Randall,
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Thanks for the feedback.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 1. This is a great suggestion, but I find
> that
> > > > > adding
> > > > > > an
> > > > > > > > >> > > overloaded
> > > > > > > > >> > > > > > > > put(...) which essentially deprecates the
> old
> > > > > put(...)
> > > > > > > to
> > > > > > > > >> only
> > > > > > > > >> > be
> > > > > > > > >> > > > > used
> > > > > > > > >> > > > > > > when
> > > > > > > > >> > > > > > > > a connector is deployed on older versions of
> > > > Connect
> > > > > > > adds
> > > > > > > > >> > enough
> > > > > > > > >> > > > of a
> > > > > > > > >> > > > > > > > complication that could cause connectors to
> > > break
> > > > if
> > > > > > the
> > > > > > > > old
> > > > > > > > >> > > > put(...)
> > > > > > > > >> > > > > > > > doesn't correctly invoke the overloaded
> > > put(...);
> > > > > > either
> > > > > > > > >> that,
> > > > > > > > >> > or
> > > > > > > > >> > > > it
> > > > > > > > >> > > > > > will
> > > > > > > > >> > > > > > > > add duplication of functionality across the
> > two
> > > > > > put(...)
> > > > > > > > >> > > methods. I
> > > > > > > > >> > > > > > think
> > > > > > > > >> > > > > > > > the older method simplifies things with the
> > idea
> > > > > that
> > > > > > a
> > > > > > > > >> > DLQ/error
> > > > > > > > >> > > > > > > reporter
> > > > > > > > >> > > > > > > > will or will not be passed into the method
> > > > depending
> > > > > > on
> > > > > > > > the
> > > > > > > > >> > > version
> > > > > > > > >> > > > > of
> > > > > > > > >> > > > > > > AK.
> > > > > > > > >> > > > > > > > However, I also understand the aesthetic
> > > advantage
> > > > > of
> > > > > > > this
> > > > > > > > >> > method
> > > > > > > > >> > > > vs
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > > setter method, so I am okay with going in
> this
> > > > > > direction
> > > > > > > > if
> > > > > > > > >> > > others
> > > > > > > > >> > > > > > agree
> > > > > > > > >> > > > > > > > with adding the overloaded put(...).
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 2. Yes, your assumption is correct. Yes, we
> > can
> > > > > remove
> > > > > > > the
> > > > > > > > >> > "Order
> > > > > > > > >> > > > of
> > > > > > > > >> > > > > > > > Operations" if we go with the overloaded
> > > put(...)
> > > > > > > > direction.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 3. Great point, I will remove them from the
> > KIP.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 4. Yeah, accept(...) will be synchronous. I
> > will
> > > > > > change
> > > > > > > it
> > > > > > > > >> to
> > > > > > > > >> > be
> > > > > > > > >> > > > > > clearer,
> > > > > > > > >> > > > > > > > thanks.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 5. This KIP will use existing metrics as
> well
> > > > > > introduce
> > > > > > > > new
> > > > > > > > >> > > > metrics.
> > > > > > > > >> > > > > I
> > > > > > > > >> > > > > > > will
> > > > > > > > >> > > > > > > > update this section to fully specify the
> > > metrics.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Please let me know what you think.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > >> > > > > > > > Aakash
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > On Thu, May 14, 2020 at 3:52 PM Randall
> Hauch
> > <
> > > > > > > > >> > rha...@gmail.com>
> > > > > > > > >> > > > > > wrote:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > > Hi, Aakash.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Thanks for the KIP. Connect does need an
> > > > improved
> > > > > > > > ability
> > > > > > > > >> for
> > > > > > > > >> > > > sink
> > > > > > > > >> > > > > > > > > connectors to report individual records as
> > > being
> > > > > > > > >> problematic,
> > > > > > > > >> > > and
> > > > > > > > >> > > > > > this
> > > > > > > > >> > > > > > > > > integrates nicely with the existing DLQ
> > > feature.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > I also appreciate the desire to maintain
> > > > > > compatibility
> > > > > > > > so
> > > > > > > > >> > that
> > > > > > > > >> > > > > > > connectors
> > > > > > > > >> > > > > > > > > can take advantage of this feature when
> > > deployed
> > > > > in
> > > > > > a
> > > > > > > > >> runtime
> > > > > > > > >> > > > that
> > > > > > > > >> > > > > > > supports
> > > > > > > > >> > > > > > > > > this feature, but can safely and easily do
> > > > without
> > > > > > the
> > > > > > > > >> > feature
> > > > > > > > >> > > > when
> > > > > > > > >> > > > > > > > > deployed to an older runtime. But I do
> > > > understand
> > > > > > > > Andrew's
> > > > > > > > >> > > > concern
> > > > > > > > >> > > > > > > about
> > > > > > > > >> > > > > > > > > the aesthetics. Have you considered
> > > overloading
> > > > > the
> > > > > > > > >> > `put(...)`
> > > > > > > > >> > > > > method
> > > > > > > > >> > > > > > > and
> > > > > > > > >> > > > > > > > > adding the `reporter` as a second
> parameter?
> > > > > > > Essentially
> > > > > > > > >> it
> > > > > > > > >> > > would
> > > > > > > > >> > > > > add
> > > > > > > > >> > > > > > > the
> > > > > > > > >> > > > > > > > > one method (with proper JavaDoc) to
> > `SinkTask`
> > > > > only:
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > ```
> > > > > > > > >> > > > > > > > >     public void put(Collection<SinkRecord>
> > > > > records,
> > > > > > > > >> > > > > > > BiFunction<SinkRecord,
> > > > > > > > >> > > > > > > > > Throwable> reporter) {
> > > > > > > > >> > > > > > > > >         put(records);
> > > > > > > > >> > > > > > > > >     }
> > > > > > > > >> > > > > > > > > ```
> > > > > > > > >> > > > > > > > > and the WorkerSinkTask would be changed to
> > > call
> > > > > > > > >> > > `put(Collection,
> > > > > > > > >> > > > > > > > > BiFunction)` instead.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Sink connector implementations that don't
> do
> > > > > > anything
> > > > > > > > >> > different
> > > > > > > > >> > > > can
> > > > > > > > >> > > > > > > still
> > > > > > > > >> > > > > > > > > override `put(Collection)`, and it still
> > works
> > > > as
> > > > > > > > before.
> > > > > > > > >> > > > > Developers
> > > > > > > > >> > > > > > > that
> > > > > > > > >> > > > > > > > > want to change their sink connector
> > > > > implementations
> > > > > > to
> > > > > > > > >> > support
> > > > > > > > >> > > > this
> > > > > > > > >> > > > > > new
> > > > > > > > >> > > > > > > > > feature would do the following, which
> would
> > > work
> > > > > in
> > > > > > > > older
> > > > > > > > >> and
> > > > > > > > >> > > > newer
> > > > > > > > >> > > > > > > Connect
> > > > > > > > >> > > > > > > > > runtimes:
> > > > > > > > >> > > > > > > > > ```
> > > > > > > > >> > > > > > > > >     public void put(Collection<SinkRecord>
> > > > > records)
> > > > > > {
> > > > > > > > >> > > > > > > > >         put(records, null);
> > > > > > > > >> > > > > > > > >     }
> > > > > > > > >> > > > > > > > >     public void put(Collection<SinkRecord>
> > > > > records,
> > > > > > > > >> > > > > > > BiFunction<SinkRecord,
> > > > > > > > >> > > > > > > > > Throwable> reporter) {
> > > > > > > > >> > > > > > > > >         // the normal `put(Collection)`
> > logic
> > > > goes
> > > > > > > here,
> > > > > > > > >> but
> > > > > > > > >> > > can
> > > > > > > > >> > > > > > > optionally
> > > > > > > > >> > > > > > > > > use `reporter` if non-null
> > > > > > > > >> > > > > > > > >     }
> > > > > > > > >> > > > > > > > > ```
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > I think this has all the same benefits of
> > the
> > > > > > current
> > > > > > > > KIP,
> > > > > > > > >> > but
> > > > > > > > >> > > > > > > > > it's noticeably simpler and hopefully more
> > > > > > > aesthetically
> > > > > > > > >> > > > pleasing.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > As for Andrew's second concern about "the
> > task
> > > > can
> > > > > > > send
> > > > > > > > >> > errant
> > > > > > > > >> > > > > > records
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > > > it within put(...)" being too restrictive.
> > My
> > > > > guess
> > > > > > is
> > > > > > > > >> that
> > > > > > > > >> > > this
> > > > > > > > >> > > > > was
> > > > > > > > >> > > > > > > more
> > > > > > > > >> > > > > > > > > an attempt at describing the basic
> behavior,
> > > and
> > > > > > less
> > > > > > > > >> about
> > > > > > > > >> > > > > requiring
> > > > > > > > >> > > > > > > the
> > > > > > > > >> > > > > > > > > reporter only being called within the
> > > `put(...)`
> > > > > > > method
> > > > > > > > >> and
> > > > > > > > >> > not
> > > > > > > > >> > > > by
> > > > > > > > >> > > > > > > methods
> > > > > > > > >> > > > > > > > > to which `put(...)` synchronously or
> > > > > asynchronously
> > > > > > > > >> > delegates.
> > > > > > > > >> > > > Can
> > > > > > > > >> > > > > > you
> > > > > > > > >> > > > > > > > > confirm whether my assumption is correct?
> If
> > > so,
> > > > > > then
> > > > > > > > >> perhaps
> > > > > > > > >> > > my
> > > > > > > > >> > > > > > > suggestion
> > > > > > > > >> > > > > > > > > helps work around this issue as well,
> since
> > > > there
> > > > > > > would
> > > > > > > > >> be no
> > > > > > > > >> > > > > > > restriction
> > > > > > > > >> > > > > > > > > on when the reporter is called, and the
> > whole
> > > > > "Order
> > > > > > > of
> > > > > > > > >> > > > Operations"
> > > > > > > > >> > > > > > > section
> > > > > > > > >> > > > > > > > > could potentially be removed.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Third, it's not clear to me why the "Error
> > > > > Reporter
> > > > > > > > >> Object"
> > > > > > > > >> > > > > > subsection
> > > > > > > > >> > > > > > > in
> > > > > > > > >> > > > > > > > > the "Proposal" section lists the worker
> > > > > > configuration
> > > > > > > > >> > > properties
> > > > > > > > >> > > > > that
> > > > > > > > >> > > > > > > were
> > > > > > > > >> > > > > > > > > previously introduced with
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> > > > > > > > >> > > > > > > > > .
> > > > > > > > >> > > > > > > > > Maybe it's worth mentioning that the error
> > > > > reporter
> > > > > > > > >> > > functionality
> > > > > > > > >> > > > > > will
> > > > > > > > >> > > > > > > > > reuse or build upon KIP-298, including
> > reusing
> > > > the
> > > > > > > > >> > > configuration
> > > > > > > > >> > > > > > > properties
> > > > > > > > >> > > > > > > > > defined in KIP-298. But IIUC, the KIP does
> > not
> > > > > > propose
> > > > > > > > >> > changing
> > > > > > > > >> > > > any
> > > > > > > > >> > > > > > > > > technical or semantic aspect of these
> > > > > configuration
> > > > > > > > >> > properties,
> > > > > > > > >> > > > and
> > > > > > > > >> > > > > > > > > therefore the KIP would be more clear and
> > > > succinct
> > > > > > > > without
> > > > > > > > >> > > them.
> > > > > > > > >> > > > > > > *That* the
> > > > > > > > >> > > > > > > > > error reporter will use these properties
> is
> > > part
> > > > > of
> > > > > > > the
> > > > > > > > UX
> > > > > > > > >> > and
> > > > > > > > >> > > > > > > therefore
> > > > > > > > >> > > > > > > > > necessary to mention, but *how* it uses
> > those
> > > > > > > properties
> > > > > > > > >> is
> > > > > > > > >> > > > really
> > > > > > > > >> > > > > up
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > > > the implementation.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Fourth, the "Synchrony" section has a
> > sentence
> > > > > that
> > > > > > is
> > > > > > > > >> > > confusing,
> > > > > > > > >> > > > > or
> > > > > > > > >> > > > > > > not as
> > > > > > > > >> > > > > > > > > clear as it could be.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > >     "If a record is sent to the error
> > > reporter,
> > > > > > > > >> processing of
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > next
> > > > > > > > >> > > > > > > > > errant record in accept(...) will not
> begin
> > > > until
> > > > > > the
> > > > > > > > >> > producer
> > > > > > > > >> > > > > > > successfully
> > > > > > > > >> > > > > > > > > sends the errant record to Kafka."
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > This sentence is a bit difficult to
> > > understand,
> > > > > but
> > > > > > > IIUC
> > > > > > > > >> this
> > > > > > > > >> > > > > really
> > > > > > > > >> > > > > > > just
> > > > > > > > >> > > > > > > > > means that "accept(...)" will be
> synchronous
> > > and
> > > > > > will
> > > > > > > > >> block
> > > > > > > > >> > > until
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > > > errant record has been successfully
> written
> > to
> > > > > > Kafka.
> > > > > > > If
> > > > > > > > >> so,
> > > > > > > > >> > > > let's
> > > > > > > > >> > > > > > say
> > > > > > > > >> > > > > > > > > that. The rest of the paragraph is fine.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Finally, is this KIP proposing new
> metrics,
> > or
> > > > > that
> > > > > > > > >> existing
> > > > > > > > >> > > > > metrics
> > > > > > > > >> > > > > > > would
> > > > > > > > >> > > > > > > > > be used to track the error reporter usage?
> > If
> > > > the
> > > > > > > > former,
> > > > > > > > >> > then
> > > > > > > > >> > > > > please
> > > > > > > > >> > > > > > > > > fully-specify what these metrics will be,
> > > > > similarly
> > > > > > to
> > > > > > > > how
> > > > > > > > >> > > > metrics
> > > > > > > > >> > > > > > are
> > > > > > > > >> > > > > > > > > specified in
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> > > > > > > > >> > > > > > > > > .
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Thoughts?
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Best regards,
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Randall
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > On Mon, May 11, 2020 at 4:49 PM Andrew
> > > > Schofield <
> > > > > > > > >> > > > > > > > > andrew_schofi...@live.com>
> > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > > Hi Aakash,
> > > > > > > > >> > > > > > > > > > Thanks for sorting out the replies to
> the
> > > > > mailing
> > > > > > > > list.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > First, I do like the idea of improving
> > error
> > > > > > > reporting
> > > > > > > > >> in
> > > > > > > > >> > > sink
> > > > > > > > >> > > > > > > > > connectors.
> > > > > > > > >> > > > > > > > > > I'd like a simple
> > > > > > > > >> > > > > > > > > > way to put bad records onto the DLQ.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > I think this KIP is considerably more
> > > > > complicated
> > > > > > > than
> > > > > > > > >> it
> > > > > > > > >> > > > seems.
> > > > > > > > >> > > > > > The
> > > > > > > > >> > > > > > > > > > guidance on the
> > > > > > > > >> > > > > > > > > > SinkTask.put() method is that it should
> > send
> > > > the
> > > > > > > > records
> > > > > > > > >> > > > > > > asynchronously
> > > > > > > > >> > > > > > > > > > and immediately
> > > > > > > > >> > > > > > > > > > return, so the task is likely to want to
> > > > report
> > > > > > > errors
> > > > > > > > >> > > > > > asynchronously
> > > > > > > > >> > > > > > > > > > too.  Currently the KIP
> > > > > > > > >> > > > > > > > > > states that "the task can send errant
> > > records
> > > > to
> > > > > > it
> > > > > > > > >> within
> > > > > > > > >> > > > > > put(...)"
> > > > > > > > >> > > > > > > and
> > > > > > > > >> > > > > > > > > > that's too restrictive.
> > > > > > > > >> > > > > > > > > > The task ought to be able to report any
> > > > > unflushed
> > > > > > > > >> records,
> > > > > > > > >> > > but
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > > > > synchronisation of this is going
> > > > > > > > >> > > > > > > > > > to be tricky. I suppose the connector
> > author
> > > > > needs
> > > > > > > to
> > > > > > > > >> make
> > > > > > > > >> > > sure
> > > > > > > > >> > > > > > that
> > > > > > > > >> > > > > > > all
> > > > > > > > >> > > > > > > > > > errant records have
> > > > > > > > >> > > > > > > > > > been reported before returning control
> > from
> > > > > > > > >> > > SinkTask.flush(...)
> > > > > > > > >> > > > > or
> > > > > > > > >> > > > > > > > > perhaps
> > > > > > > > >> > > > > > > > > > SinkTask.preCommit(...).
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > I think the interface is a little
> strange
> > > > too. I
> > > > > > can
> > > > > > > > see
> > > > > > > > >> > that
> > > > > > > > >> > > > > this
> > > > > > > > >> > > > > > > was
> > > > > > > > >> > > > > > > > > > done so it's possible to deliver a
> > connector
> > > > > > > > >> > > > > > > > > > that supports error reporting but it can
> > > also
> > > > > work
> > > > > > > in
> > > > > > > > >> > earlier
> > > > > > > > >> > > > > > > versions of
> > > > > > > > >> > > > > > > > > > the KC runtime. But, the
> > > > > > > > >> > > > > > > > > > pattern so far is that the task uses the
> > > > methods
> > > > > > of
> > > > > > > > >> > > > > SinkTaskContext
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > > > > access utilities in the Kafka
> > > > > > > > >> > > > > > > > > > Connect runtime, and I suggest that
> > > reporting
> > > > a
> > > > > > bad
> > > > > > > > >> record
> > > > > > > > >> > is
> > > > > > > > >> > > > > such
> > > > > > > > >> > > > > > a
> > > > > > > > >> > > > > > > > > > utility. SinkTaskContext has
> > > > > > > > >> > > > > > > > > > changed before when the configs()
> methods
> > > was
> > > > > > added,
> > > > > > > > so
> > > > > > > > >> I
> > > > > > > > >> > > think
> > > > > > > > >> > > > > > > there is
> > > > > > > > >> > > > > > > > > > precedent for adding a method.
> > > > > > > > >> > > > > > > > > > The way the KIP adds a method to
> SinkTask
> > > that
> > > > > the
> > > > > > > KC
> > > > > > > > >> > runtime
> > > > > > > > >> > > > > calls
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > > > > provide the error reporting utility
> > > > > > > > >> > > > > > > > > > seems not to match what has gone before.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > Thanks,
> > > > > > > > >> > > > > > > > > > Andrew
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > On 11/05/2020, 19:05, "Aakash Shah" <
> > > > > > > > as...@confluent.io
> > > > > > > > >> >
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     I wasn't previously added to the dev
> > > > mailing
> > > > > > > list,
> > > > > > > > >> so
> > > > > > > > >> > I'd
> > > > > > > > >> > > > > like
> > > > > > > > >> > > > > > to
> > > > > > > > >> > > > > > > > > post
> > > > > > > > >> > > > > > > > > > my
> > > > > > > > >> > > > > > > > > >     discussion with Andrew Schofield
> below
> > > for
> > > > > > > > >> visibility
> > > > > > > > >> > and
> > > > > > > > >> > > > > > further
> > > > > > > > >> > > > > > > > > >     discussion:
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Hi Andrew,
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Thanks for the reply. The main
> concern
> > > > with
> > > > > > this
> > > > > > > > >> > approach
> > > > > > > > >> > > > > would
> > > > > > > > >> > > > > > > be
> > > > > > > > >> > > > > > > > > its
> > > > > > > > >> > > > > > > > > >     backward compatibility. I’ve
> > highlighted
> > > > the
> > > > > > > > >> thoughts
> > > > > > > > >> > > > around
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > > > > backwards
> > > > > > > > >> > > > > > > > > >     compatibility of the initial
> approach,
> > > > > please
> > > > > > > let
> > > > > > > > me
> > > > > > > > >> > know
> > > > > > > > >> > > > > what
> > > > > > > > >> > > > > > > you
> > > > > > > > >> > > > > > > > > > think.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Thanks,
> > > > > > > > >> > > > > > > > > >     Aakash
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Hi,
> > > > > > > > >> > > > > > > > > >     By adding a new method to the
> > > SinkContext
> > > > > > > > interface
> > > > > > > > >> in
> > > > > > > > >> > > say
> > > > > > > > >> > > > > > Kafka
> > > > > > > > >> > > > > > > > > 2.6, a
> > > > > > > > >> > > > > > > > > >     connector that calls it would
> require
> > a
> > > > > Kafka
> > > > > > > 2.6
> > > > > > > > >> > connect
> > > > > > > > >> > > > > > > runtime. I
> > > > > > > > >> > > > > > > > > > don't
> > > > > > > > >> > > > > > > > > >     quite see how that's a backward
> > > > > compatibility
> > > > > > > > >> problem.
> > > > > > > > >> > > It's
> > > > > > > > >> > > > > > just
> > > > > > > > >> > > > > > > that
> > > > > > > > >> > > > > > > > > > new
> > > > > > > > >> > > > > > > > > >     connectors need the latest
> interface.
> > I
> > > > > might
> > > > > > > not
> > > > > > > > >> quite
> > > > > > > > >> > > be
> > > > > > > > >> > > > > > > > > > understanding,
> > > > > > > > >> > > > > > > > > >     but I think it would be fine.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Thanks,
> > > > > > > > >> > > > > > > > > >     Andrew
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ____________________________________________________________________________________________________________________________
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Hi Andrew,
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     I apologize for the way the reply
> was
> > > > sent.
> > > > > I
> > > > > > > just
> > > > > > > > >> > > > subscribed
> > > > > > > > >> > > > > > to
> > > > > > > > >> > > > > > > the
> > > > > > > > >> > > > > > > > > > dev
> > > > > > > > >> > > > > > > > > >     mailing list so it should be
> resolved
> > > now.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     You are correct, new connectors
> would
> > > > simply
> > > > > > > > require
> > > > > > > > >> > the
> > > > > > > > >> > > > > latest
> > > > > > > > >> > > > > > > > > > interface.
> > > > > > > > >> > > > > > > > > >     However, we want to remove that
> > > > requirement
> > > > > -
> > > > > > in
> > > > > > > > >> other
> > > > > > > > >> > > > words,
> > > > > > > > >> > > > > > we
> > > > > > > > >> > > > > > > want
> > > > > > > > >> > > > > > > > > > to
> > > > > > > > >> > > > > > > > > >     allow the possibility that someone
> > wants
> > > > the
> > > > > > > > latest
> > > > > > > > >> > > > > > connector/to
> > > > > > > > >> > > > > > > > > > upgrade to
> > > > > > > > >> > > > > > > > > >     the latest version, but deploys it
> on
> > an
> > > > > older
> > > > > > > > >> version
> > > > > > > > >> > of
> > > > > > > > >> > > > AK.
> > > > > > > > >> > > > > > > > > > Basically, we
> > > > > > > > >> > > > > > > > > >     don't want to enforce the necessity
> of
> > > > > > upgrading
> > > > > > > > AK
> > > > > > > > >> to
> > > > > > > > >> > > get
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > latest
> > > > > > > > >> > > > > > > > > >     interface. In the current approach,
> > > there
> > > > > > would
> > > > > > > be
> > > > > > > > >> no
> > > > > > > > >> > > issue
> > > > > > > > >> > > > > of
> > > > > > > > >> > > > > > > > > > deploying a
> > > > > > > > >> > > > > > > > > >     new connector on an older version of
> > AK,
> > > > as
> > > > > > the
> > > > > > > > >> Connect
> > > > > > > > >> > > > > > framework
> > > > > > > > >> > > > > > > > > would
> > > > > > > > >> > > > > > > > > >     simply not invoke the new method.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Please let me know what you think
> and
> > > if I
> > > > > > need
> > > > > > > to
> > > > > > > > >> > > clarify
> > > > > > > > >> > > > > > > anything.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >     Thanks,
> > > > > > > > >> > > > > > > > > >     Aakash
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to