Have you tried this? IIUC the problem is with the new type, and any class
that uses ‘ErrantRecordReporter’ with an import would fail to be loaded by
the classloader if the type does not exist (I.e., pre-2.9 Connect
runtimes). Catching that ClassNotFoundException and dynamically importing
the type is actually much harder.

On Sat, May 16, 2020 at 5:07 PM Aakash Shah <as...@confluent.io> wrote:

> Hi Arjun,
>
> Thanks for this suggestion. I actually like this a lot because a defined
> interface looks more appealing and is clearer in its intention. Since we
> are still using NoSuchMethodException to account for backwards
> compatibility, this works for me. I can't see any drawbacks besides having
> to call the getter method for the processing of every errant record.
>
> I would like to hear others' thoughts on if this drawback outweighs the
> added benefit of clarity.
>
> Thanks,
> Aakash
>
> On Sat, May 16, 2020 at 2:54 PM Arjun Satish <arjun.sat...@gmail.com>
> wrote:
>
> > Thanks Konstantine, happy to write something up in a KIP. But I think it
> > would be redundant if we add this kip. What do you think?
> >
> > Also, Randall, yes that API would work. But, if we expect the developers
> to
> > catch NoSuchMethodErrors, then should we also go ahead and make a class
> > that would have a report method(similar to ErrorReporter
> > <
> >
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
> > >
> > but maybe with different arguments on the report method)? they would also
> > have to catch NoClassDefFoundError.
> >
> > That would modify your change in SinkTaskContext to:
> >
> > public interface SinkTaskContext {
> >     /**
> >      * Get the reporter to which the sink task can report problematic or
> >      * failed {@link SinkRecord} passed to the {@link
> > SinkTask#put(Collection)} method.
> >      *
> >      * @return a errant record reporter
> >      */
> >     ErrantRecordReporter failedRecordReporter();
> > }
> >
> > where ErrantRecordReporter is:
> >
> > public interface ErrantRecordReporter {
> >
> >     /**
> >      * Serialize and produce records to the error topic
> >      *
> >      * @param record the errant record
> >      */
> >     void report(SinkRecord record, Throwable error);
> >
> > }
> >
> > Usage in put would be the same though if the class is not explicitly
> named:
> >
> >         try {
> >             context.failedRecordReporter().report(record, error);
> >         } catch (NoSuchMethodError e) {
> >             log.info("Boooooooooo!");
> >         }
> >
> > Thoughts?
> >
> > On Sat, May 16, 2020 at 12:46 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Thanks for following up Randall.
> > >
> > > I agree with your latest suggestion. It was good that we explored
> several
> > > options but accessing the context to obtain the reporter in Kafka
> Connect
> > > versions that support this feature makes the most sense. The burden for
> > > connector developers that want to use this reporter _and_ make
> connectors
> > > compatible with old and new workers is minimal.
> > >
> > > We'll have to leave with additions like this and the appearance in both
> > > KIP-131 and here in KIP-610 indeed creates a reasonable precedent.
> > >
> > > Konstantine
> > >
> > >
> > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch <rha...@gmail.com>
> wrote:
> > >
> > > > Thanks again for the active discussion!
> > > >
> > > > Regarding the future-vs-callback discussion: I did like where Chris
> was
> > > > going with the Callback, but he raises good point that it's unclear
> > what
> > > to
> > > > use for the reporter type, since we'd need three parameters.
> > Introducing
> > > a
> > > > new interface makes it much harder for a sink task to be backward
> > > > compatible, so sticking with BiFunction is a good compromise. Plus,
> > > another
> > > > significant disadvantage of a callback approach is that a sink task's
> > > > callback is called from the producer thread, and this risks a
> > > > poorly written sink task callback killing the reporter's producer
> > without
> > > > necessarily failing the task. Using a future avoids this risk
> > altogether,
> > > > still provides the sink task with the ability to do synchronous
> > reporting
> > > > using Future, which is a standard and conventional design pattern. So
> > we
> > > do
> > > > seem to have converged on using `BiFunction<SinkRecord, Throwable,
> > > > Future<Void>>` for the reporter type.
> > > >
> > > > Now, we still seem to not have converted upon how to pass the
> reporter
> > to
> > > > the sink task. I agree with Konstantine that the deprecation affects
> > only
> > > > newer versions of Connect, and that a sink task should deal with both
> > put
> > > > methods only when it wants to support older runtimes. I also think
> that
> > > > this is a viable approach, but I do concede that this evolution of
> the
> > > sink
> > > > task API is more complicated than it should be.
> > > >
> > > > In the interest of quickly coming to consensus on how we pass the
> > > reporter
> > > > to the sink task, I'd like to go back to Andrew's original
> suggestion,
> > > > which I think we disregarded too quickly: add a getter on the
> > > > SinkTaskContext interface. We already have precedent for adding
> methods
> > > to
> > > > one of the context classes with the newly-adopted KIP-131, which
> adds a
> > > > getter for the OffsetStorageReader on the (new)
> SourceConnectorContext.
> > > > That KIP accepts the fact that a source connector wanting to use this
> > > > feature while also keeping the ability to be installed into older
> > Connect
> > > > runtimes must guard its use of the context's getter method.
> > > >
> > > > I think we can use the same pattern for this KIP, and add a getter to
> > the
> > > > existing SinkTaskContext that is defined something like:
> > > >
> > > > public interface SinkTaskContext {
> > > >     ...
> > > >     /**
> > > >      * Get the reporter to which the sink task can report problematic
> > or
> > > > failed {@link SinkRecord}
> > > >      * passed to the {@link SinkTask#put(Collection)} method. When
> > > > reporting a failed record,
> > > >      * the sink task will receive a {@link Future} that the task can
> > > > optionally use to wait until
> > > >      * the failed record and exception have been written to Kafka via
> > > > Connect's DLQ. Note that
> > > >      * the result of this method may be null if this connector has
> not
> > > been
> > > > configured with a DLQ.
> > > >      *
> > > >      * <p>This method was added in Apache Kafka 2.9. Sink tasks that
> > use
> > > > this method but want to
> > > >      * maintain backward compatibility so they can also be deployed
> to
> > > > older Connect runtimes
> > > >      * should guard the call to this method with a try-catch block,
> > since
> > > > calling this method will result in a
> > > >      * {@link NoSuchMethodException} when the sink connector is
> > deployed
> > > to
> > > > Connect runtimes
> > > >      * older than Kafka 2.9. For example:
> > > >      * <pre>
> > > >      *     BiFunction&lt;SinkTask, Throwable, Future&lt;Void&gt;&gt;
> > > > reporter;
> > > >      *     try {
> > > >      *         reporter = context.failedRecordReporter();
> > > >      *     } catch (NoSuchMethodException e) {
> > > >      *         reporter = null;
> > > >      *     }
> > > >      * </pre>
> > > >      *
> > > >      * @return the reporter function; null if no error reporter has
> > been
> > > > configured for the connector
> > > >      * @since 2.9
> > > >      */
> > > >     BiFunction<SinkTask, Throwable, Future<Void>>
> > failedRecordReporter();
> > > > }
> > > >
> > > > The main advantage is that the KIP no longer has to make *any other*
> > > > changes to the Sink Connector or Task API. The above is really the
> only
> > > > change, and it's merely an addition to the API. No deprecation and no
> > > > overloading methods. The KIP does need to explain how the reporter is
> > > > configured and used (which it already does), but IMO the KIP doesn't
> > need
> > > > to describe  when this reporter can/should be used. After all, this
> > > method
> > > > is on the existing SinkTaskContext, so this method is really no
> > different
> > > > than any other existing method. I think my JavaDoc (which is just a
> > > > suggestion for a starting point that Aakash can improve as needed)
> > > > describes how easy it is for a sink to maintain backward
> compatibility.
> > > > (The use of `BiFunction` helps tremendously.) Another not
> insignificant
> > > > advantage is that a sink task can use this reporter reference
> > throughout
> > > > the task's lifetime (after it's started and before it is stopped),
> > making
> > > > it less invasive for existing sink task implementations that want to
> > use
> > > > it.
> > > >
> > > > I hope we can all get being this compromise, which IMO is actually
> > super
> > > > clean and makes a lot of sense. Thanks, Andrew, for originally
> > suggesting
> > > > it. I know we're all trying to improve the Connect API in a way that
> > > makes
> > > > sense, and deliberate and constructive discussion is a healthy thing.
> > > > Thanks again to everyone for participating!
> > > >
> > > > BTW, we've agreed upon a number of other changes, but I don't see any
> > of
> > > > those changes on the KIP. Aakash, can you please update the KIP
> quickly
> > > so
> > > > we can make sure the other parts are the KIP are acceptable?
> > > >
> > > > Best regards,
> > > >
> > > > Randall
> > > >
> > > > On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis <
> > > > konstant...@confluent.io> wrote:
> > > >
> > > > > Thanks for the quick response Aakash.
> > > > >
> > > > > With respect to deprecation, this refers to deprecating this method
> > in
> > > > > newer versions of Kafka Connect (and eventually removing it).
> > > > >
> > > > > As a connector developer, if you want your connector to run across
> a
> > > wide
> > > > > spectrum of Connect versions, you'll have to take this into
> > > consideration
> > > > > and retain both methods in a functional state. The good news is
> that
> > > both
> > > > > methods can share a lot of code, so in reality both the old and the
> > new
> > > > put
> > > > > will be thin shims over a `putRecord` method (or `process` method
> as
> > > you
> > > > > call it in the KIP).
> > > > >
> > > > > Given the above, there's no requirement to conditionally call one
> > > method
> > > > or
> > > > > the other in the framework based on configuration. Once you
> implement
> > > the
> > > > > new `put` with something other than its default implementation, as
> a
> > > > > connector developer, you'll know to adapt to the above.
> > > > >
> > > > > I definitely suggest extending our docs in a meaningful way in
> order
> > to
> > > > > make the upgrade path easy to follow. Maybe you'd like to add a
> note
> > to
> > > > > your compatibility section in this KIP as well.
> > > > >
> > > > > Regards,
> > > > > Konstantine
> > > > >
> > > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah <as...@confluent.io>
> > > wrote:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis <
> > > > > > konstant...@confluent.io> wrote:
> > > > > >
> > > > > > > Hi Arjun,
> > > > > > >
> > > > > > > I think I agree with you that subject is interesting. Yet, I
> feel
> > > it
> > > > > > > belongs to a separate future KIP. Reading the proposal in the
> KIP
> > > > > format
> > > > > > > will help, at least myself, to understand it better.
> > > > > > >
> > > > > > > Having said that, for the purpose of simplifying error handling
> > for
> > > > > sink
> > > > > > > tasks, the discussion on KIP-610 has made some good progress on
> > the
> > > > > > mailing
> > > > > > > list. If the few open items are reflected on the proposal,
> maybe
> > > it'd
> > > > > be
> > > > > > > even worthwhile to consider it for inclusion in the upcoming
> > > release
> > > > > with
> > > > > > > its current scope.
> > > > > > >
> > > > > > > Konstantine
> > > > > > >
> > > > > > >
> > > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish <
> > > arjun.sat...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I'm kinda hoping that we get to an approach on how to extend
> > the
> > > > > > Connect
> > > > > > > > framework. Adding parameters in the put method is nice, and
> > maybe
> > > > > works
> > > > > > > for
> > > > > > > > now, but I'm not sure how scalable it is. It'd great to be
> able
> > > to
> > > > > add
> > > > > > > more
> > > > > > > > functionality in the future. Couple of examples:
> > > > > > > >
> > > > > > > > - make the metrics registry available to a task, so they can
> > > report
> > > > > > task
> > > > > > > > level metrics or
> > > > > > > > - be able to pass in a RestExtension handle to the task, so
> the
> > > > task
> > > > > > can
> > > > > > > > provide a rest endpoint which users can hit to get some task
> > > level
> > > > > > > > information (about its status, health, for example)
> > > > > > > >
> > > > > > > > In such scenarios, maybe adding new parameters to existing
> > > methods
> > > > > may
> > > > > > > not
> > > > > > > > be immediately acceptable.
> > > > > > > >
> > > > > > > > Since we are very close to a deadline, I wanted to check if
> the
> > > one
> > > > > > more
> > > > > > > > possibility is acceptable :-)
> > > > > > > >
> > > > > > > > What if we could create a library that could be used by
> > connector
> > > > to
> > > > > > > > independently integrated by connector developers in their
> > > > connectors.
> > > > > > The
> > > > > > > > library would be packaged and shipped with their connector
> like
> > > any
> > > > > > other
> > > > > > > > library on maven (and other similar repositories). The new
> > module
> > > > > would
> > > > > > > be
> > > > > > > > in the AK project, but its jars will *not* be added to
> > classpath
> > > > for
> > > > > > > > Connect worker.
> > > > > > > >
> > > > > > > > The library would provide a public interface for an error
> > > reporter,
> > > > > > which
> > > > > > > > provides both synchronous and asynchronous functionalities
> (as
> > > was
> > > > > > > brought
> > > > > > > > up above).
> > > > > > > >
> > > > > > > > This would be an independent library, they can be easily
> > bundled
> > > > and
> > > > > > > loaded
> > > > > > > > with the other connectors. The connect framework will be
> > > decoupled
> > > > > from
> > > > > > > > this utility.
> > > > > > > >
> > > > > > > > I understand that a similar option is in the rejected
> > > alternatives,
> > > > > > > mostly
> > > > > > > > because of configuration overhead, but the configuration
> > required
> > > > > here
> > > > > > > can
> > > > > > > > come directly from the worker properties (and just be copy
> > pasted
> > > > > from
> > > > > > > > there, maybe with a prefix). and I wonder (if maybe part as a
> > > > future
> > > > > > > KIP),
> > > > > > > > we can evaluate a strategy where certain worker configs can
> be
> > > > passed
> > > > > > to
> > > > > > > a
> > > > > > > > connector (for example, the producer/consume/admin ones), so
> > end
> > > > > users
> > > > > > do
> > > > > > > > not have to.
> > > > > > > >
> > > > > > > > Overall, we would get clean APIs, contracts and developers
> get
> > > > > freedom
> > > > > > to
> > > > > > > > use these libraries and functionalities however they want.
> The
> > > only
> > > > > > > > drawback is how this is configured (end-users will have to
> add
> > > more
> > > > > > lines
> > > > > > > > in the json/properties files). But all configs can simply
> come
> > > from
> > > > > > > worker,
> > > > > > > > I believe this is relatively minor issue. We should be able
> to
> > > work
> > > > > out
> > > > > > > > compatibility issues in the implementations, so that the
> > library
> > > > can
> > > > > > > safely
> > > > > > > > run (and degrade functionality if needed) with old workers.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, May 15, 2020 at 7:04 PM Aakash Shah <
> > as...@confluent.io>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Just wanted to clarify that I am on board with adding the
> > > > > overloaded
> > > > > > > > > put(...) method.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Aakash
> > > > > > > > >
> > > > > > > > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah <
> > > as...@confluent.io>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Randall and Konstantine,
> > > > > > > > > >
> > > > > > > > > > As Chris and Arjun mentioned, I think the main concern is
> > the
> > > > > > > potential
> > > > > > > > > > gap in which developers don't implement the deprecated
> > method
> > > > due
> > > > > > to
> > > > > > > a
> > > > > > > > > > misunderstanding of use cases. Using the setter method
> > > approach
> > > > > > > ensures
> > > > > > > > > > that the developer won't break backwards compatibility
> when
> > > > using
> > > > > > the
> > > > > > > > new
> > > > > > > > > > method due to a mistake. That being said, I think the
> value
> > > > added
> > > > > > in
> > > > > > > > > > clarity of contract of when the error reporter will be
> > > invoked
> > > > > and
> > > > > > > > > overall
> > > > > > > > > > aesthetic while maintaining backwards compatibility
> > outweighs
> > > > the
> > > > > > > > > potential
> > > > > > > > > > mistake of a developer in not implementing the original
> > > > put(...)
> > > > > > > > method.
> > > > > > > > > >
> > > > > > > > > > With respect to synchrony, I agree with Konstantine's
> > point,
> > > > that
> > > > > > we
> > > > > > > > > > should make it an opt-in feature of making the reporter
> > only
> > > > > > > > synchronous.
> > > > > > > > > > At the same time, I believe it is important to relieve as
> > > much
> > > > of
> > > > > > the
> > > > > > > > > > burden of implementation as possible from the developer
> in
> > > this
> > > > > > case,
> > > > > > > > and
> > > > > > > > > > thus I think using a Callback rather than a Future would
> be
> > > > > easier
> > > > > > on
> > > > > > > > the
> > > > > > > > > > developer, while adding asynchronous functionality with
> the
> > > > > ability
> > > > > > > to
> > > > > > > > > > opt-in synchronous functionality. I also believe making
> it
> > > > opt-in
> > > > > > > > > > synchronous vs. the other way simplifies implementation
> for
> > > the
> > > > > > > > developer
> > > > > > > > > > (blocking vs creating a new thread).
> > > > > > > > > >
> > > > > > > > > > Please let me know your thoughts. I would like to come
> to a
> > > > > > consensus
> > > > > > > > > soon
> > > > > > > > > > due to the AK 2.6 deadlines; I will then shortly update
> the
> > > KIP
> > > > > and
> > > > > > > > > start a
> > > > > > > > > > vote.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Aakash
> > > > > > > > > >
> > > > > > > > > > On Fri, May 15, 2020 at 2:24 PM Randall Hauch <
> > > > rha...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> On Fri, May 15, 2020 at 3:13 PM Arjun Satish <
> > > > > > > arjun.sat...@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Couple of thoughts:
> > > > > > > > > >> >
> > > > > > > > > >> > 1. If we add new parameters to put(..), and new
> > connectors
> > > > > > > implement
> > > > > > > > > >> only
> > > > > > > > > >> > this method, it makes them backward incompatible with
> > > older
> > > > > > > > workers. I
> > > > > > > > > >> > think newer connectors may only choose to only
> implement
> > > the
> > > > > > > latest
> > > > > > > > > >> method,
> > > > > > > > > >> > and we are passing the compatibility problems back to
> > the
> > > > > > > connector
> > > > > > > > > >> > developers.
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >> New connectors would have to implement both if they want
> > to
> > > > run
> > > > > in
> > > > > > > > older
> > > > > > > > > >> runtimes.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> > 2. if we deprecate the older put() method and
> eventually
> > > > > remove
> > > > > > > it,
> > > > > > > > > then
> > > > > > > > > >> > old connectors are forward incompatible. If we are not
> > > going
> > > > > to
> > > > > > > > remove
> > > > > > > > > >> it,
> > > > > > > > > >> > then maybe we should not deprecate it?
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >> I don't think we'll ever remove deprecated methods --
> > > there's
> > > > no
> > > > > > > > reason
> > > > > > > > > to
> > > > > > > > > >> cut off older connectors.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> > 3. if a record is realized to be erroneous outside
> put()
> > > > (say,
> > > > > > in
> > > > > > > > > flush
> > > > > > > > > >> or
> > > > > > > > > >> > preCommit), how will it be reported?
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >> This is a concern no matter how the reporter is passed
> to
> > > the
> > > > > > task.
> > > > > > > > > >> Actually, I think it's more clear that the reporter
> passed
> > > > > through
> > > > > > > > > >> `put(...)` should be used to record errors on the
> > > SinkRecords
> > > > > > passed
> > > > > > > > in
> > > > > > > > > >> the
> > > > > > > > > >> same method call.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> >
> > > > > > > > > >> > I do think the concern over aesthetics is an important
> > > one,
> > > > > but
> > > > > > > the
> > > > > > > > > >> > trade-off here is to exclude many connectors that are
> > out
> > > > > there
> > > > > > > from
> > > > > > > > > >> > running on worker versions. there may be production
> > > > > deployments
> > > > > > > that
> > > > > > > > > >> need
> > > > > > > > > >> > one old and one new connector that now cannot work on
> > any
> > > > > > version
> > > > > > > > of a
> > > > > > > > > >> > single worker. Building connectors is complex, and
> it's
> > > > kinda
> > > > > > > unfair
> > > > > > > > > to
> > > > > > > > > >> > expect folks to make changes over aesthetic reasons
> > alone.
> > > > > This
> > > > > > is
> > > > > > > > > >> probably
> > > > > > > > > >> > the reason why popular framework APIs very rarely (and
> > > > > probably
> > > > > > > > never)
> > > > > > > > > >> > change.
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >> I don't see how passing the reporter through an
> overloaded
> > > > > > > `put(...)`
> > > > > > > > is
> > > > > > > > > >> less backward compatible. Because the runtime provides
> the
> > > > > > SinkTask
> > > > > > > > base
> > > > > > > > > >> class, the runtime has control over what the methods do
> by
> > > > > > default.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> >
> > > > > > > > > >> > Overall, yes, the "public void
> > > > > > > > > >> errantRecordReporter(BiConsumer<SinkRecord,
> > > > > > > > > >> > Throwable> reporter) {}" proposal in the original KIP
> is
> > > > > > somewhat
> > > > > > > > of a
> > > > > > > > > >> > mouthful, but are there are any simpler alternatives
> > that
> > > do
> > > > > not
> > > > > > > > > exclude
> > > > > > > > > >> > existing connectors, adding operational burdens and
> yet
> > > > > provide
> > > > > > a
> > > > > > > > > clean
> > > > > > > > > >> > contract?
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >> IMO, overloading `put(...)` is cleaner and easier to
> > > > understand
> > > > > --
> > > > > > > > plus
> > > > > > > > > >> the
> > > > > > > > > >> other benefits in my earlier email.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> >
> > > > > > > > > >> > Best,
> > > > > > > > > >> >
> > > > > > > > > >> > PS: Apologies if the language is incorrect or some
> > points
> > > > are
> > > > > > > > unclear.
> > > > > > > > > >> >
> > > > > > > > > >> > On Fri, May 15, 2020 at 12:02 PM Randall Hauch <
> > > > > > rha...@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > On Fri, May 15, 2020 at 1:45 PM Konstantine
> > Karantasis <
> > > > > > > > > >> > > konstant...@confluent.io> wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Thanks for the quick response Aakash.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > To your last point, modern APIs like this tend to
> be
> > > > > > > > asynchronous
> > > > > > > > > >> (see
> > > > > > > > > >> > > > admin, producer in Kafka) and such definition
> > results
> > > in
> > > > > > more
> > > > > > > > > >> > expressive
> > > > > > > > > >> > > > and well defined APIs.
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > +1
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > > What you describe is easily an opt-in feature for
> > the
> > > > > > > connector
> > > > > > > > > >> > > developer.
> > > > > > > > > >> > > > At the same time, the latest description above,
> > gives
> > > us
> > > > > > > better
> > > > > > > > > >> chances
> > > > > > > > > >> > > for
> > > > > > > > > >> > > > this API to remain like this for longer, because
> it
> > > > covers
> > > > > > > both
> > > > > > > > > the
> > > > > > > > > >> > sync
> > > > > > > > > >> > > > and async per `put` user cases.
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > +1
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Given how simple the sync implementation
> > > > > > > > > >> > > > is, just by complying with the return type of the
> > > > method,
> > > > > I
> > > > > > > > still
> > > > > > > > > >> think
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > BiFunction definition that returns a Future makes
> > > sense.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Konstantine
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Fri, May 15, 2020 at 11:27 AM Aakash Shah <
> > > > > > > > as...@confluent.io>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > Thanks for the additional feedback.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > I see the benefits of adding an overloaded
> > put(...)
> > > > over
> > > > > > > > > >> alternatives
> > > > > > > > > >> > > > and I
> > > > > > > > > >> > > > > am on board going forward with this approach. It
> > > will
> > > > > > > > definitely
> > > > > > > > > >> set
> > > > > > > > > >> > > > forth
> > > > > > > > > >> > > > > a contract of where the reporter will be used
> with
> > > > > better
> > > > > > > > > >> aesthetics.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > The original idea of going with a synchronous
> > > approach
> > > > > for
> > > > > > > the
> > > > > > > > > >> error
> > > > > > > > > >> > > > > reporter was to ease the connector developer's
> job
> > > > > > > interacting
> > > > > > > > > >> with
> > > > > > > > > >> > and
> > > > > > > > > >> > > > > handling the error reporter. The tradeoff for
> > > having a
> > > > > > > > > >> > synchronous-only
> > > > > > > > > >> > > > > reporter would be lower throughput on the
> > reporter;
> > > > this
> > > > > > was
> > > > > > > > > >> thought
> > > > > > > > > >> > to
> > > > > > > > > >> > > > be
> > > > > > > > > >> > > > > fine since arguably most circumstances would not
> > > > include
> > > > > > > > > >> consistently
> > > > > > > > > >> > > > large
> > > > > > > > > >> > > > > amounts of records being sent to the error
> > reporter.
> > > > > Even
> > > > > > if
> > > > > > > > > this
> > > > > > > > > >> was
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > > case, an argument can be made that the lower
> > > > throughput
> > > > > > > would
> > > > > > > > be
> > > > > > > > > >> of
> > > > > > > > > >> > > > > assistance in this case, as it would allow more
> > time
> > > > for
> > > > > > the
> > > > > > > > > user
> > > > > > > > > >> to
> > > > > > > > > >> > > > > realize the connector is having records sent to
> > the
> > > > > error
> > > > > > > > > reporter
> > > > > > > > > >> > > before
> > > > > > > > > >> > > > > many are sent. However, if we are strongly in
> > favor
> > > of
> > > > > > > having
> > > > > > > > > the
> > > > > > > > > >> > > option
> > > > > > > > > >> > > > of
> > > > > > > > > >> > > > > asynchronous functionality available for the
> > > > developer,
> > > > > > > then I
> > > > > > > > > am
> > > > > > > > > >> > fine
> > > > > > > > > >> > > > with
> > > > > > > > > >> > > > > that as well.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Lastly, I am on board with changing the name to
> > > > > > > > > >> failedRecordReporter,
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Please let me know your thoughts.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Thanks,
> > > > > > > > > >> > > > > Aakash
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > On Fri, May 15, 2020 at 9:10 AM Randall Hauch <
> > > > > > > > rha...@gmail.com
> > > > > > > > > >
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > > Konstantine said:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > > I notice Randall also used BiFunction in his
> > > > > example,
> > > > > > I
> > > > > > > > > >> wonder if
> > > > > > > > > >> > > > it's
> > > > > > > > > >> > > > > > for
> > > > > > > > > >> > > > > > > similar reasons.
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Nope. Just a typo on my part.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > There appear to be three outstanding
> questions.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > First, Konstantine suggested calling this
> > > > > > > > > >> "failedRecordReporter". I
> > > > > > > > > >> > > > think
> > > > > > > > > >> > > > > > this is minor, but using this new name may be
> a
> > > bit
> > > > > more
> > > > > > > > > precise
> > > > > > > > > >> > and
> > > > > > > > > >> > > > I'd
> > > > > > > > > >> > > > > be
> > > > > > > > > >> > > > > > fine with this.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Second, should the reporter method be
> > > synchronous? I
> > > > > > think
> > > > > > > > the
> > > > > > > > > >> two
> > > > > > > > > >> > > > > options
> > > > > > > > > >> > > > > > are:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > 2a. Use `BiConsumer<SinkRecord, Throwable>`
> that
> > > > > returns
> > > > > > > > > nothing
> > > > > > > > > >> > and
> > > > > > > > > >> > > > > blocks
> > > > > > > > > >> > > > > > (at this time).
> > > > > > > > > >> > > > > > 2b. Use `BiFunction<SinkRecord, Throwable,
> > > > > > Future<Void>>`
> > > > > > > > that
> > > > > > > > > >> > > returns
> > > > > > > > > >> > > > a
> > > > > > > > > >> > > > > > future that the user can optionally use to be
> > > > > > synchronous.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > I do agree with Konstantine that option 2b
> gives
> > > us
> > > > > more
> > > > > > > > room
> > > > > > > > > >> for
> > > > > > > > > >> > > > future
> > > > > > > > > >> > > > > > semantic changes, and since the producer write
> > is
> > > > > > already
> > > > > > > > > >> > > asynchronous
> > > > > > > > > >> > > > > this
> > > > > > > > > >> > > > > > should be straightforward to implement. I
> think
> > > the
> > > > > > > concern
> > > > > > > > > >> here is
> > > > > > > > > >> > > > that
> > > > > > > > > >> > > > > if
> > > > > > > > > >> > > > > > the sink task does not *use* the future to
> make
> > > this
> > > > > > > > > >> synchronous,
> > > > > > > > > >> > it
> > > > > > > > > >> > > is
> > > > > > > > > >> > > > > > very possible that the error records could be
> > > > written
> > > > > > out
> > > > > > > of
> > > > > > > > > >> order
> > > > > > > > > >> > > (due
> > > > > > > > > >> > > > > to
> > > > > > > > > >> > > > > > retries). But this won't be an issue if the
> > > > > > implementation
> > > > > > > > > uses
> > > > > > > > > >> > > > > > `max.in.flight.requests.per.connection=1` for
> > > > writing
> > > > > > the
> > > > > > > > > error
> > > > > > > > > >> > > > records.
> > > > > > > > > >> > > > > > It's a little less clear, but honestly IMO
> > passing
> > > > the
> > > > > > > > > reporter
> > > > > > > > > >> in
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > > > `put(...)` method helps make this lambda
> easier
> > to
> > > > > > > > understand,
> > > > > > > > > >> for
> > > > > > > > > >> > > some
> > > > > > > > > >> > > > > > strange reason. So unless there are good
> reasons
> > > to
> > > > > > avoid
> > > > > > > > > this,
> > > > > > > > > >> I'd
> > > > > > > > > >> > > be
> > > > > > > > > >> > > > in
> > > > > > > > > >> > > > > > favor of 2b and returning a Future.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Third, how do we pass the reporter lambda /
> > method
> > > > > > > reference
> > > > > > > > > to
> > > > > > > > > >> the
> > > > > > > > > >> > > > task?
> > > > > > > > > >> > > > > > My proposal to pass the reporter via an
> overload
> > > > > > > `put(...)`
> > > > > > > > > >> still
> > > > > > > > > >> > is
> > > > > > > > > >> > > > the
> > > > > > > > > >> > > > > > most attractive to me, for several reasons:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > 3a. There's no need to pass the reporter
> > > separately
> > > > > > *and*
> > > > > > > to
> > > > > > > > > >> > describe
> > > > > > > > > >> > > > the
> > > > > > > > > >> > > > > > changes in method call ordering.
> > > > > > > > > >> > > > > > 3b. As mentioned above, for some reason
> passing
> > it
> > > > via
> > > > > > > > > >> `put(...)`
> > > > > > > > > >> > > makes
> > > > > > > > > >> > > > > the
> > > > > > > > > >> > > > > > intent more clear that it be used when
> > processing
> > > > the
> > > > > > > > > >> SinkRecord,
> > > > > > > > > >> > and
> > > > > > > > > >> > > > > that
> > > > > > > > > >> > > > > > it shouldn't be used in `start(...)`,
> > > > > `preCommit(...)`,
> > > > > > > > > >> > > > > > `onPartitionsAssigned(...)`, or any of the
> other
> > > > task
> > > > > > > > methods.
> > > > > > > > > >> As
> > > > > > > > > >> > > > Andrew
> > > > > > > > > >> > > > > > pointed out earlier, *describing* this in the
> > KIP
> > > > and
> > > > > in
> > > > > > > > > JavaDoc
> > > > > > > > > >> > will
> > > > > > > > > >> > > > be
> > > > > > > > > >> > > > > > tough to be exact yet succinct.
> > > > > > > > > >> > > > > > 3c. There is already precedence for evolving
> > > > > > > > > >> > > > > > `SourceTask.commitRecord(...)`, and the
> pattern
> > is
> > > > > > > > identical.
> > > > > > > > > >> > > > > > 3d. Backward compatibility is easy to
> > understand,
> > > > and
> > > > > at
> > > > > > > the
> > > > > > > > > >> same
> > > > > > > > > >> > > time
> > > > > > > > > >> > > > > it's
> > > > > > > > > >> > > > > > pretty easy to describe what implementations
> > that
> > > > want
> > > > > > to
> > > > > > > > take
> > > > > > > > > >> > > > advantage
> > > > > > > > > >> > > > > of
> > > > > > > > > >> > > > > > this feature should do.
> > > > > > > > > >> > > > > > 3e. Minimal changes to the interface: we're
> just
> > > > > > *adding*
> > > > > > > > one
> > > > > > > > > >> > default
> > > > > > > > > >> > > > > > method that calls the existing method and
> > > > deprecating
> > > > > > the
> > > > > > > > > >> existing
> > > > > > > > > >> > > > > > `put(...)`.
> > > > > > > > > >> > > > > > 3f. Deprecating the existing `put(...)` makes
> it
> > > > more
> > > > > > > clear
> > > > > > > > > in a
> > > > > > > > > >> > > > > > programmatic sense that new sink
> implementations
> > > > > should
> > > > > > > use
> > > > > > > > > the
> > > > > > > > > >> > > > reporter,
> > > > > > > > > >> > > > > > and that we recommend old sinks evolve to use
> > it.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Some of these benefits apply to some of the
> > other
> > > > > > > > suggestions,
> > > > > > > > > >> but
> > > > > > > > > >> > I
> > > > > > > > > >> > > > > think
> > > > > > > > > >> > > > > > none of the other suggestions have all of
> these
> > > > > > benefits.
> > > > > > > > For
> > > > > > > > > >> > > example,
> > > > > > > > > >> > > > > > overloading `initialize(...)` is more
> difficult
> > > > since
> > > > > > most
> > > > > > > > > sink
> > > > > > > > > >> > > > > connectors
> > > > > > > > > >> > > > > > don't override it and therefore would be less
> > > > subject
> > > > > to
> > > > > > > > > >> > deprecations
> > > > > > > > > >> > > > > > warnings. Overloading `start(...)` is less
> > > > attractive.
> > > > > > > > Adding
> > > > > > > > > a
> > > > > > > > > >> > > method
> > > > > > > > > >> > > > > IMO
> > > > > > > > > >> > > > > > shares the fewest of these benefits.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > The one disadvantage of this approach is that
> > sink
> > > > > task
> > > > > > > > > >> > > implementations
> > > > > > > > > >> > > > > > can't rely upon the reporter upon startup. IMO
> > > > that's
> > > > > an
> > > > > > > > > >> acceptable
> > > > > > > > > >> > > > > > tradeoff to get the cleaner and more explicit
> > API,
> > > > > > > > especially
> > > > > > > > > if
> > > > > > > > > >> > the
> > > > > > > > > >> > > > API
> > > > > > > > > >> > > > > > contract is that Connect will pass the same
> > > reporter
> > > > > > > > instance
> > > > > > > > > to
> > > > > > > > > >> > each
> > > > > > > > > >> > > > > call
> > > > > > > > > >> > > > > > to `put(...)` on a single task instance.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Best regards,
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Randall
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > On Fri, May 15, 2020 at 6:59 AM Andrew
> > Schofield <
> > > > > > > > > >> > > > > > andrew_schofi...@live.com>
> > > > > > > > > >> > > > > > wrote:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > > Hi,
> > > > > > > > > >> > > > > > > Randall's suggestion is really good. I think
> > it
> > > > > gives
> > > > > > > the
> > > > > > > > > >> > > flexibility
> > > > > > > > > >> > > > > > > required and also
> > > > > > > > > >> > > > > > > keeps the interface the right way round.
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > Thanks,
> > > > > > > > > >> > > > > > > Andrew Schofield
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > On 15/05/2020, 02:07, "Aakash Shah" <
> > > > > > > as...@confluent.io>
> > > > > > > > > >> wrote:
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > > Hi Randall,
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Thanks for the feedback.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > 1. This is a great suggestion, but I find
> > that
> > > > > > adding
> > > > > > > an
> > > > > > > > > >> > > overloaded
> > > > > > > > > >> > > > > > > > put(...) which essentially deprecates the
> > old
> > > > > > put(...)
> > > > > > > > to
> > > > > > > > > >> only
> > > > > > > > > >> > be
> > > > > > > > > >> > > > > used
> > > > > > > > > >> > > > > > > when
> > > > > > > > > >> > > > > > > > a connector is deployed on older versions
> of
> > > > > Connect
> > > > > > > > adds
> > > > > > > > > >> > enough
> > > > > > > > > >> > > > of a
> > > > > > > > > >> > > > > > > > complication that could cause connectors
> to
> > > > break
> > > > > if
> > > > > > > the
> > > > > > > > > old
> > > > > > > > > >> > > > put(...)
> > > > > > > > > >> > > > > > > > doesn't correctly invoke the overloaded
> > > > put(...);
> > > > > > > either
> > > > > > > > > >> that,
> > > > > > > > > >> > or
> > > > > > > > > >> > > > it
> > > > > > > > > >> > > > > > will
> > > > > > > > > >> > > > > > > > add duplication of functionality across
> the
> > > two
> > > > > > > put(...)
> > > > > > > > > >> > > methods. I
> > > > > > > > > >> > > > > > think
> > > > > > > > > >> > > > > > > > the older method simplifies things with
> the
> > > idea
> > > > > > that
> > > > > > > a
> > > > > > > > > >> > DLQ/error
> > > > > > > > > >> > > > > > > reporter
> > > > > > > > > >> > > > > > > > will or will not be passed into the method
> > > > > depending
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > >> > > version
> > > > > > > > > >> > > > > of
> > > > > > > > > >> > > > > > > AK.
> > > > > > > > > >> > > > > > > > However, I also understand the aesthetic
> > > > advantage
> > > > > > of
> > > > > > > > this
> > > > > > > > > >> > method
> > > > > > > > > >> > > > vs
> > > > > > > > > >> > > > > > the
> > > > > > > > > >> > > > > > > > setter method, so I am okay with going in
> > this
> > > > > > > direction
> > > > > > > > > if
> > > > > > > > > >> > > others
> > > > > > > > > >> > > > > > agree
> > > > > > > > > >> > > > > > > > with adding the overloaded put(...).
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > 2. Yes, your assumption is correct. Yes,
> we
> > > can
> > > > > > remove
> > > > > > > > the
> > > > > > > > > >> > "Order
> > > > > > > > > >> > > > of
> > > > > > > > > >> > > > > > > > Operations" if we go with the overloaded
> > > > put(...)
> > > > > > > > > direction.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > 3. Great point, I will remove them from
> the
> > > KIP.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > 4. Yeah, accept(...) will be synchronous.
> I
> > > will
> > > > > > > change
> > > > > > > > it
> > > > > > > > > >> to
> > > > > > > > > >> > be
> > > > > > > > > >> > > > > > clearer,
> > > > > > > > > >> > > > > > > > thanks.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > 5. This KIP will use existing metrics as
> > well
> > > > > > > introduce
> > > > > > > > > new
> > > > > > > > > >> > > > metrics.
> > > > > > > > > >> > > > > I
> > > > > > > > > >> > > > > > > will
> > > > > > > > > >> > > > > > > > update this section to fully specify the
> > > > metrics.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Please let me know what you think.
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > > >> > > > > > > > Aakash
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > On Thu, May 14, 2020 at 3:52 PM Randall
> > Hauch
> > > <
> > > > > > > > > >> > rha...@gmail.com>
> > > > > > > > > >> > > > > > wrote:
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > > Hi, Aakash.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Thanks for the KIP. Connect does need an
> > > > > improved
> > > > > > > > > ability
> > > > > > > > > >> for
> > > > > > > > > >> > > > sink
> > > > > > > > > >> > > > > > > > > connectors to report individual records
> as
> > > > being
> > > > > > > > > >> problematic,
> > > > > > > > > >> > > and
> > > > > > > > > >> > > > > > this
> > > > > > > > > >> > > > > > > > > integrates nicely with the existing DLQ
> > > > feature.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > I also appreciate the desire to maintain
> > > > > > > compatibility
> > > > > > > > > so
> > > > > > > > > >> > that
> > > > > > > > > >> > > > > > > connectors
> > > > > > > > > >> > > > > > > > > can take advantage of this feature when
> > > > deployed
> > > > > > in
> > > > > > > a
> > > > > > > > > >> runtime
> > > > > > > > > >> > > > that
> > > > > > > > > >> > > > > > > supports
> > > > > > > > > >> > > > > > > > > this feature, but can safely and easily
> do
> > > > > without
> > > > > > > the
> > > > > > > > > >> > feature
> > > > > > > > > >> > > > when
> > > > > > > > > >> > > > > > > > > deployed to an older runtime. But I do
> > > > > understand
> > > > > > > > > Andrew's
> > > > > > > > > >> > > > concern
> > > > > > > > > >> > > > > > > about
> > > > > > > > > >> > > > > > > > > the aesthetics. Have you considered
> > > > overloading
> > > > > > the
> > > > > > > > > >> > `put(...)`
> > > > > > > > > >> > > > > method
> > > > > > > > > >> > > > > > > and
> > > > > > > > > >> > > > > > > > > adding the `reporter` as a second
> > parameter?
> > > > > > > > Essentially
> > > > > > > > > >> it
> > > > > > > > > >> > > would
> > > > > > > > > >> > > > > add
> > > > > > > > > >> > > > > > > the
> > > > > > > > > >> > > > > > > > > one method (with proper JavaDoc) to
> > > `SinkTask`
> > > > > > only:
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > ```
> > > > > > > > > >> > > > > > > > >     public void
> put(Collection<SinkRecord>
> > > > > > records,
> > > > > > > > > >> > > > > > > BiFunction<SinkRecord,
> > > > > > > > > >> > > > > > > > > Throwable> reporter) {
> > > > > > > > > >> > > > > > > > >         put(records);
> > > > > > > > > >> > > > > > > > >     }
> > > > > > > > > >> > > > > > > > > ```
> > > > > > > > > >> > > > > > > > > and the WorkerSinkTask would be changed
> to
> > > > call
> > > > > > > > > >> > > `put(Collection,
> > > > > > > > > >> > > > > > > > > BiFunction)` instead.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Sink connector implementations that
> don't
> > do
> > > > > > > anything
> > > > > > > > > >> > different
> > > > > > > > > >> > > > can
> > > > > > > > > >> > > > > > > still
> > > > > > > > > >> > > > > > > > > override `put(Collection)`, and it still
> > > works
> > > > > as
> > > > > > > > > before.
> > > > > > > > > >> > > > > Developers
> > > > > > > > > >> > > > > > > that
> > > > > > > > > >> > > > > > > > > want to change their sink connector
> > > > > > implementations
> > > > > > > to
> > > > > > > > > >> > support
> > > > > > > > > >> > > > this
> > > > > > > > > >> > > > > > new
> > > > > > > > > >> > > > > > > > > feature would do the following, which
> > would
> > > > work
> > > > > > in
> > > > > > > > > older
> > > > > > > > > >> and
> > > > > > > > > >> > > > newer
> > > > > > > > > >> > > > > > > Connect
> > > > > > > > > >> > > > > > > > > runtimes:
> > > > > > > > > >> > > > > > > > > ```
> > > > > > > > > >> > > > > > > > >     public void
> put(Collection<SinkRecord>
> > > > > > records)
> > > > > > > {
> > > > > > > > > >> > > > > > > > >         put(records, null);
> > > > > > > > > >> > > > > > > > >     }
> > > > > > > > > >> > > > > > > > >     public void
> put(Collection<SinkRecord>
> > > > > > records,
> > > > > > > > > >> > > > > > > BiFunction<SinkRecord,
> > > > > > > > > >> > > > > > > > > Throwable> reporter) {
> > > > > > > > > >> > > > > > > > >         // the normal `put(Collection)`
> > > logic
> > > > > goes
> > > > > > > > here,
> > > > > > > > > >> but
> > > > > > > > > >> > > can
> > > > > > > > > >> > > > > > > optionally
> > > > > > > > > >> > > > > > > > > use `reporter` if non-null
> > > > > > > > > >> > > > > > > > >     }
> > > > > > > > > >> > > > > > > > > ```
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > I think this has all the same benefits
> of
> > > the
> > > > > > > current
> > > > > > > > > KIP,
> > > > > > > > > >> > but
> > > > > > > > > >> > > > > > > > > it's noticeably simpler and hopefully
> more
> > > > > > > > aesthetically
> > > > > > > > > >> > > > pleasing.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > As for Andrew's second concern about
> "the
> > > task
> > > > > can
> > > > > > > > send
> > > > > > > > > >> > errant
> > > > > > > > > >> > > > > > records
> > > > > > > > > >> > > > > > > to
> > > > > > > > > >> > > > > > > > > it within put(...)" being too
> restrictive.
> > > My
> > > > > > guess
> > > > > > > is
> > > > > > > > > >> that
> > > > > > > > > >> > > this
> > > > > > > > > >> > > > > was
> > > > > > > > > >> > > > > > > more
> > > > > > > > > >> > > > > > > > > an attempt at describing the basic
> > behavior,
> > > > and
> > > > > > > less
> > > > > > > > > >> about
> > > > > > > > > >> > > > > requiring
> > > > > > > > > >> > > > > > > the
> > > > > > > > > >> > > > > > > > > reporter only being called within the
> > > > `put(...)`
> > > > > > > > method
> > > > > > > > > >> and
> > > > > > > > > >> > not
> > > > > > > > > >> > > > by
> > > > > > > > > >> > > > > > > methods
> > > > > > > > > >> > > > > > > > > to which `put(...)` synchronously or
> > > > > > asynchronously
> > > > > > > > > >> > delegates.
> > > > > > > > > >> > > > Can
> > > > > > > > > >> > > > > > you
> > > > > > > > > >> > > > > > > > > confirm whether my assumption is
> correct?
> > If
> > > > so,
> > > > > > > then
> > > > > > > > > >> perhaps
> > > > > > > > > >> > > my
> > > > > > > > > >> > > > > > > suggestion
> > > > > > > > > >> > > > > > > > > helps work around this issue as well,
> > since
> > > > > there
> > > > > > > > would
> > > > > > > > > >> be no
> > > > > > > > > >> > > > > > > restriction
> > > > > > > > > >> > > > > > > > > on when the reporter is called, and the
> > > whole
> > > > > > "Order
> > > > > > > > of
> > > > > > > > > >> > > > Operations"
> > > > > > > > > >> > > > > > > section
> > > > > > > > > >> > > > > > > > > could potentially be removed.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Third, it's not clear to me why the
> "Error
> > > > > > Reporter
> > > > > > > > > >> Object"
> > > > > > > > > >> > > > > > subsection
> > > > > > > > > >> > > > > > > in
> > > > > > > > > >> > > > > > > > > the "Proposal" section lists the worker
> > > > > > > configuration
> > > > > > > > > >> > > properties
> > > > > > > > > >> > > > > that
> > > > > > > > > >> > > > > > > were
> > > > > > > > > >> > > > > > > > > previously introduced with
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> > > > > > > > > >> > > > > > > > > .
> > > > > > > > > >> > > > > > > > > Maybe it's worth mentioning that the
> error
> > > > > > reporter
> > > > > > > > > >> > > functionality
> > > > > > > > > >> > > > > > will
> > > > > > > > > >> > > > > > > > > reuse or build upon KIP-298, including
> > > reusing
> > > > > the
> > > > > > > > > >> > > configuration
> > > > > > > > > >> > > > > > > properties
> > > > > > > > > >> > > > > > > > > defined in KIP-298. But IIUC, the KIP
> does
> > > not
> > > > > > > propose
> > > > > > > > > >> > changing
> > > > > > > > > >> > > > any
> > > > > > > > > >> > > > > > > > > technical or semantic aspect of these
> > > > > > configuration
> > > > > > > > > >> > properties,
> > > > > > > > > >> > > > and
> > > > > > > > > >> > > > > > > > > therefore the KIP would be more clear
> and
> > > > > succinct
> > > > > > > > > without
> > > > > > > > > >> > > them.
> > > > > > > > > >> > > > > > > *That* the
> > > > > > > > > >> > > > > > > > > error reporter will use these properties
> > is
> > > > part
> > > > > > of
> > > > > > > > the
> > > > > > > > > UX
> > > > > > > > > >> > and
> > > > > > > > > >> > > > > > > therefore
> > > > > > > > > >> > > > > > > > > necessary to mention, but *how* it uses
> > > those
> > > > > > > > properties
> > > > > > > > > >> is
> > > > > > > > > >> > > > really
> > > > > > > > > >> > > > > up
> > > > > > > > > >> > > > > > > to
> > > > > > > > > >> > > > > > > > > the implementation.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Fourth, the "Synchrony" section has a
> > > sentence
> > > > > > that
> > > > > > > is
> > > > > > > > > >> > > confusing,
> > > > > > > > > >> > > > > or
> > > > > > > > > >> > > > > > > not as
> > > > > > > > > >> > > > > > > > > clear as it could be.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > >     "If a record is sent to the error
> > > > reporter,
> > > > > > > > > >> processing of
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > > > next
> > > > > > > > > >> > > > > > > > > errant record in accept(...) will not
> > begin
> > > > > until
> > > > > > > the
> > > > > > > > > >> > producer
> > > > > > > > > >> > > > > > > successfully
> > > > > > > > > >> > > > > > > > > sends the errant record to Kafka."
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > This sentence is a bit difficult to
> > > > understand,
> > > > > > but
> > > > > > > > IIUC
> > > > > > > > > >> this
> > > > > > > > > >> > > > > really
> > > > > > > > > >> > > > > > > just
> > > > > > > > > >> > > > > > > > > means that "accept(...)" will be
> > synchronous
> > > > and
> > > > > > > will
> > > > > > > > > >> block
> > > > > > > > > >> > > until
> > > > > > > > > >> > > > > the
> > > > > > > > > >> > > > > > > > > errant record has been successfully
> > written
> > > to
> > > > > > > Kafka.
> > > > > > > > If
> > > > > > > > > >> so,
> > > > > > > > > >> > > > let's
> > > > > > > > > >> > > > > > say
> > > > > > > > > >> > > > > > > > > that. The rest of the paragraph is fine.
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Finally, is this KIP proposing new
> > metrics,
> > > or
> > > > > > that
> > > > > > > > > >> existing
> > > > > > > > > >> > > > > metrics
> > > > > > > > > >> > > > > > > would
> > > > > > > > > >> > > > > > > > > be used to track the error reporter
> usage?
> > > If
> > > > > the
> > > > > > > > > former,
> > > > > > > > > >> > then
> > > > > > > > > >> > > > > please
> > > > > > > > > >> > > > > > > > > fully-specify what these metrics will
> be,
> > > > > > similarly
> > > > > > > to
> > > > > > > > > how
> > > > > > > > > >> > > > metrics
> > > > > > > > > >> > > > > > are
> > > > > > > > > >> > > > > > > > > specified in
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> > > > > > > > > >> > > > > > > > > .
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Thoughts?
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Best regards,
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Randall
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > On Mon, May 11, 2020 at 4:49 PM Andrew
> > > > > Schofield <
> > > > > > > > > >> > > > > > > > > andrew_schofi...@live.com>
> > > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > > Hi Aakash,
> > > > > > > > > >> > > > > > > > > > Thanks for sorting out the replies to
> > the
> > > > > > mailing
> > > > > > > > > list.
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > First, I do like the idea of improving
> > > error
> > > > > > > > reporting
> > > > > > > > > >> in
> > > > > > > > > >> > > sink
> > > > > > > > > >> > > > > > > > > connectors.
> > > > > > > > > >> > > > > > > > > > I'd like a simple
> > > > > > > > > >> > > > > > > > > > way to put bad records onto the DLQ.
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > I think this KIP is considerably more
> > > > > > complicated
> > > > > > > > than
> > > > > > > > > >> it
> > > > > >

Reply via email to