Hi Arjun,

This is great news. Given that we're already willing to ask developers to
catch a method-not-found exception and it sounds like a new interface can
be handled in a similar try/catch block in a same place, I like the idea of
a new fleshed-out interface instead of a BiConsumer or a BiFunction.

Cheers,

Chris

On Sat, May 16, 2020, 21:29 Arjun Satish <arjun.sat...@gmail.com> wrote:

> ok folks, this is my POC PR:
> https://github.com/wicknicks/kafka/tree/kip-610-2.4.1. connectors built
> from this were copied into a fresh installation of Kafka Connect (v2.5, the
> latest version), and ran. Without proper try-catch, the tasks would fail.
> But when the appropriate exceptions were handled, the task proceeded
> without any issues. I've added some comments here
>
> https://github.com/apache/kafka/commit/295e6a46925993060a60b79e646e06432480ed0d
> that document these errors. Tested this with both java 8 and 11.
>
> To check if it is safe to add methods and new classes, I dug around the JVM
> spec <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html>. The
> following points are being made here:
>
> 1. During the loading phase, a number of structural details of the class
> are added to the run-time constant pool (class descriptor, fields,
> methods). what these methods themselves do are not considered.
> 2. During the linking phase, resolution of symbolic references is optional.
> But the important bit is this
> <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4>:
> Whichever strategy (lazy or eager) is followed, any error detected during
> resolution must be thrown at a point in the program that (directly or
> indirectly) uses a symbolic reference to the class or interface.
>
> So this means that any new classes that were originally relied upon the
> connector that are missing now, will be in some internal "not found" say
> but no errors will be thrown till resolution stage.
>
> 3. Section 5.4.3
> <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3
> >
> says execution of any of the (anewarray, checkcast etc) instructions
> requires resolution of its symbolic reference, which is good since the
> instructions are executed in order, and the connector developer has time to
> execute some code before program has to look for a unknown object.
>
> We are also safe in that the new interface will be used only during or
> after start() is called, so there should not be any fields that do
> something like:
>
> private final ErrantRecordReporter reporter =
> sinkTaskContext.failedRecordReporter();
>
> The initialization will only be valid in start(), where we can safely wrap
> around try catch issues. The errors being thrown are very precise.
> NoSuchMethodError
> <
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3.3
> >
> for method not found, and NoClassDefFoundError for any classes loaded
> during resolution
> <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.3>.
>
> BTW, one more example that comes to mind is the JDBC spec (the java.sql
> package has been changing with every major version of Java). Newer classes
> are added (look at java.sql.DriverAction (in Java 8) and
> java.sql.ShardingKey (in java 9)). New methods to existing classes are
> being added as well (for example,
> java.sql.DriverManager#registerDriver(java.sql.Driver,
> java.sql.DriverAction)
> <
> https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#registerDriver-java.sql.Driver-java.sql.DriverAction-
> >).
> and the expectation is that the driver should handle any runtime
> inconsistencies.
>
> Overall, I think it should be safe to have new interfaces, and have them
> loaded safely, with correct checks.
>
> BTW, one more opportunity we have is that the connector can check if these
> new methods are present or not. For example:
>
>
> Class.forName("org.apache.kafka.connect.sink.SinkTaskContext").getMethod("failedRecordReporter");
>
> And based on this, chose to return old a different task in
> SinkConnector#taskClass() if they want to hyper-safe.
>
> Apologies for throwing in such a wrench at the last minute! But IMHO it'd
> be good to take this opportunity if we can.
>
> Best,
>
> On Sat, May 16, 2020 at 6:05 PM Randall Hauch <rha...@gmail.com> wrote:
>
> > Thanks for updating the KIP, Aakash. A few comments on the updated
> content
> > there:
> >
> > In order to avoid error records being written out of order (for example,
> > > due to retries), the developer can use
> > > `max.in.flight.requests.per.connection=1` in their implementation for
> > > writing error records.
> > >
> >
> > IMO, the DLQ should always use this setting to prevent retries, and the
> > developer can always set this property for the DLQ to something larger
> than
> > 1 if order is not important and they need the extreme performance.
> >
> > Along with the original errant sink record, the exception thrown will be
> > > sent to the error reporter to provide additional context.
> > >
> >
> > This is also not really clear. What thrown exception is this referring
> to?
> > Isn't the exception *passed* to the reporter method?
> >
> > I do think we need to better explain from the sink task's perspective
> what
> > behavior it should expect? Is there any case when the reporter will be
> null
> > or be a no-op, and if so what should the sink task do? Should it simply
> > wrap and throw a ConnectException? And if there is a reporter, won't
> > Connect treat this sink record as "processed" with respect to the
> existing
> > behavior of passing "processed" offsets back to the sink task's
> > `preCommit(...)` method.
> >
> > Thanks,
> >
> > Randall
> >
> > On Sat, May 16, 2020 at 6:38 PM Arjun Satish <arjun.sat...@gmail.com>
> > wrote:
> >
> > > Yeah I had tried this locally on java 8 and 11, and it had seemed to
> > work.
> > > Let me clean up and publish my code in a branch somewhere so we can
> take
> > a
> > > look at it.
> > >
> > > Thanks,
> > >
> > > On Sat, May 16, 2020 at 3:39 PM Randall Hauch <rha...@gmail.com>
> wrote:
> > >
> > > > Have you tried this? IIUC the problem is with the new type, and any
> > class
> > > > that uses ‘ErrantRecordReporter’ with an import would fail to be
> loaded
> > > by
> > > > the classloader if the type does not exist (I.e., pre-2.9 Connect
> > > > runtimes). Catching that ClassNotFoundException and dynamically
> > importing
> > > > the type is actually much harder.
> > > >
> > > > On Sat, May 16, 2020 at 5:07 PM Aakash Shah <as...@confluent.io>
> > wrote:
> > > >
> > > > > Hi Arjun,
> > > > >
> > > > > Thanks for this suggestion. I actually like this a lot because a
> > > defined
> > > > > interface looks more appealing and is clearer in its intention.
> Since
> > > we
> > > > > are still using NoSuchMethodException to account for backwards
> > > > > compatibility, this works for me. I can't see any drawbacks besides
> > > > having
> > > > > to call the getter method for the processing of every errant
> record.
> > > > >
> > > > > I would like to hear others' thoughts on if this drawback outweighs
> > the
> > > > > added benefit of clarity.
> > > > >
> > > > > Thanks,
> > > > > Aakash
> > > > >
> > > > > On Sat, May 16, 2020 at 2:54 PM Arjun Satish <
> arjun.sat...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Thanks Konstantine, happy to write something up in a KIP. But I
> > think
> > > > it
> > > > > > would be redundant if we add this kip. What do you think?
> > > > > >
> > > > > > Also, Randall, yes that API would work. But, if we expect the
> > > > developers
> > > > > to
> > > > > > catch NoSuchMethodErrors, then should we also go ahead and make a
> > > class
> > > > > > that would have a report method(similar to ErrorReporter
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
> > > > > > >
> > > > > > but maybe with different arguments on the report method)? they
> > would
> > > > also
> > > > > > have to catch NoClassDefFoundError.
> > > > > >
> > > > > > That would modify your change in SinkTaskContext to:
> > > > > >
> > > > > > public interface SinkTaskContext {
> > > > > >     /**
> > > > > >      * Get the reporter to which the sink task can report
> > problematic
> > > > or
> > > > > >      * failed {@link SinkRecord} passed to the {@link
> > > > > > SinkTask#put(Collection)} method.
> > > > > >      *
> > > > > >      * @return a errant record reporter
> > > > > >      */
> > > > > >     ErrantRecordReporter failedRecordReporter();
> > > > > > }
> > > > > >
> > > > > > where ErrantRecordReporter is:
> > > > > >
> > > > > > public interface ErrantRecordReporter {
> > > > > >
> > > > > >     /**
> > > > > >      * Serialize and produce records to the error topic
> > > > > >      *
> > > > > >      * @param record the errant record
> > > > > >      */
> > > > > >     void report(SinkRecord record, Throwable error);
> > > > > >
> > > > > > }
> > > > > >
> > > > > > Usage in put would be the same though if the class is not
> > explicitly
> > > > > named:
> > > > > >
> > > > > >         try {
> > > > > >             context.failedRecordReporter().report(record, error);
> > > > > >         } catch (NoSuchMethodError e) {
> > > > > >             log.info("Boooooooooo!");
> > > > > >         }
> > > > > >
> > > > > > Thoughts?
> > > > > >
> > > > > > On Sat, May 16, 2020 at 12:46 PM Konstantine Karantasis <
> > > > > > konstant...@confluent.io> wrote:
> > > > > >
> > > > > > > Thanks for following up Randall.
> > > > > > >
> > > > > > > I agree with your latest suggestion. It was good that we
> explored
> > > > > several
> > > > > > > options but accessing the context to obtain the reporter in
> Kafka
> > > > > Connect
> > > > > > > versions that support this feature makes the most sense. The
> > burden
> > > > for
> > > > > > > connector developers that want to use this reporter _and_ make
> > > > > connectors
> > > > > > > compatible with old and new workers is minimal.
> > > > > > >
> > > > > > > We'll have to leave with additions like this and the appearance
> > in
> > > > both
> > > > > > > KIP-131 and here in KIP-610 indeed creates a reasonable
> > precedent.
> > > > > > >
> > > > > > > Konstantine
> > > > > > >
> > > > > > >
> > > > > > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch <
> rha...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks again for the active discussion!
> > > > > > > >
> > > > > > > > Regarding the future-vs-callback discussion: I did like where
> > > Chris
> > > > > was
> > > > > > > > going with the Callback, but he raises good point that it's
> > > unclear
> > > > > > what
> > > > > > > to
> > > > > > > > use for the reporter type, since we'd need three parameters.
> > > > > > Introducing
> > > > > > > a
> > > > > > > > new interface makes it much harder for a sink task to be
> > backward
> > > > > > > > compatible, so sticking with BiFunction is a good compromise.
> > > Plus,
> > > > > > > another
> > > > > > > > significant disadvantage of a callback approach is that a
> sink
> > > > task's
> > > > > > > > callback is called from the producer thread, and this risks a
> > > > > > > > poorly written sink task callback killing the reporter's
> > producer
> > > > > > without
> > > > > > > > necessarily failing the task. Using a future avoids this risk
> > > > > > altogether,
> > > > > > > > still provides the sink task with the ability to do
> synchronous
> > > > > > reporting
> > > > > > > > using Future, which is a standard and conventional design
> > > pattern.
> > > > So
> > > > > > we
> > > > > > > do
> > > > > > > > seem to have converged on using `BiFunction<SinkRecord,
> > > Throwable,
> > > > > > > > Future<Void>>` for the reporter type.
> > > > > > > >
> > > > > > > > Now, we still seem to not have converted upon how to pass the
> > > > > reporter
> > > > > > to
> > > > > > > > the sink task. I agree with Konstantine that the deprecation
> > > > affects
> > > > > > only
> > > > > > > > newer versions of Connect, and that a sink task should deal
> > with
> > > > both
> > > > > > put
> > > > > > > > methods only when it wants to support older runtimes. I also
> > > think
> > > > > that
> > > > > > > > this is a viable approach, but I do concede that this
> evolution
> > > of
> > > > > the
> > > > > > > sink
> > > > > > > > task API is more complicated than it should be.
> > > > > > > >
> > > > > > > > In the interest of quickly coming to consensus on how we pass
> > the
> > > > > > > reporter
> > > > > > > > to the sink task, I'd like to go back to Andrew's original
> > > > > suggestion,
> > > > > > > > which I think we disregarded too quickly: add a getter on the
> > > > > > > > SinkTaskContext interface. We already have precedent for
> adding
> > > > > methods
> > > > > > > to
> > > > > > > > one of the context classes with the newly-adopted KIP-131,
> > which
> > > > > adds a
> > > > > > > > getter for the OffsetStorageReader on the (new)
> > > > > SourceConnectorContext.
> > > > > > > > That KIP accepts the fact that a source connector wanting to
> > use
> > > > this
> > > > > > > > feature while also keeping the ability to be installed into
> > older
> > > > > > Connect
> > > > > > > > runtimes must guard its use of the context's getter method.
> > > > > > > >
> > > > > > > > I think we can use the same pattern for this KIP, and add a
> > > getter
> > > > to
> > > > > > the
> > > > > > > > existing SinkTaskContext that is defined something like:
> > > > > > > >
> > > > > > > > public interface SinkTaskContext {
> > > > > > > >     ...
> > > > > > > >     /**
> > > > > > > >      * Get the reporter to which the sink task can report
> > > > problematic
> > > > > > or
> > > > > > > > failed {@link SinkRecord}
> > > > > > > >      * passed to the {@link SinkTask#put(Collection)} method.
> > > When
> > > > > > > > reporting a failed record,
> > > > > > > >      * the sink task will receive a {@link Future} that the
> > task
> > > > can
> > > > > > > > optionally use to wait until
> > > > > > > >      * the failed record and exception have been written to
> > Kafka
> > > > via
> > > > > > > > Connect's DLQ. Note that
> > > > > > > >      * the result of this method may be null if this
> connector
> > > has
> > > > > not
> > > > > > > been
> > > > > > > > configured with a DLQ.
> > > > > > > >      *
> > > > > > > >      * <p>This method was added in Apache Kafka 2.9. Sink
> tasks
> > > > that
> > > > > > use
> > > > > > > > this method but want to
> > > > > > > >      * maintain backward compatibility so they can also be
> > > deployed
> > > > > to
> > > > > > > > older Connect runtimes
> > > > > > > >      * should guard the call to this method with a try-catch
> > > block,
> > > > > > since
> > > > > > > > calling this method will result in a
> > > > > > > >      * {@link NoSuchMethodException} when the sink connector
> is
> > > > > > deployed
> > > > > > > to
> > > > > > > > Connect runtimes
> > > > > > > >      * older than Kafka 2.9. For example:
> > > > > > > >      * <pre>
> > > > > > > >      *     BiFunction&lt;SinkTask, Throwable,
> > > > Future&lt;Void&gt;&gt;
> > > > > > > > reporter;
> > > > > > > >      *     try {
> > > > > > > >      *         reporter = context.failedRecordReporter();
> > > > > > > >      *     } catch (NoSuchMethodException e) {
> > > > > > > >      *         reporter = null;
> > > > > > > >      *     }
> > > > > > > >      * </pre>
> > > > > > > >      *
> > > > > > > >      * @return the reporter function; null if no error
> reporter
> > > has
> > > > > > been
> > > > > > > > configured for the connector
> > > > > > > >      * @since 2.9
> > > > > > > >      */
> > > > > > > >     BiFunction<SinkTask, Throwable, Future<Void>>
> > > > > > failedRecordReporter();
> > > > > > > > }
> > > > > > > >
> > > > > > > > The main advantage is that the KIP no longer has to make *any
> > > > other*
> > > > > > > > changes to the Sink Connector or Task API. The above is
> really
> > > the
> > > > > only
> > > > > > > > change, and it's merely an addition to the API. No
> deprecation
> > > and
> > > > no
> > > > > > > > overloading methods. The KIP does need to explain how the
> > > reporter
> > > > is
> > > > > > > > configured and used (which it already does), but IMO the KIP
> > > > doesn't
> > > > > > need
> > > > > > > > to describe  when this reporter can/should be used. After
> all,
> > > this
> > > > > > > method
> > > > > > > > is on the existing SinkTaskContext, so this method is really
> no
> > > > > > different
> > > > > > > > than any other existing method. I think my JavaDoc (which is
> > > just a
> > > > > > > > suggestion for a starting point that Aakash can improve as
> > > needed)
> > > > > > > > describes how easy it is for a sink to maintain backward
> > > > > compatibility.
> > > > > > > > (The use of `BiFunction` helps tremendously.) Another not
> > > > > insignificant
> > > > > > > > advantage is that a sink task can use this reporter reference
> > > > > > throughout
> > > > > > > > the task's lifetime (after it's started and before it is
> > > stopped),
> > > > > > making
> > > > > > > > it less invasive for existing sink task implementations that
> > want
> > > > to
> > > > > > use
> > > > > > > > it.
> > > > > > > >
> > > > > > > > I hope we can all get being this compromise, which IMO is
> > > actually
> > > > > > super
> > > > > > > > clean and makes a lot of sense. Thanks, Andrew, for
> originally
> > > > > > suggesting
> > > > > > > > it. I know we're all trying to improve the Connect API in a
> way
> > > > that
> > > > > > > makes
> > > > > > > > sense, and deliberate and constructive discussion is a
> healthy
> > > > thing.
> > > > > > > > Thanks again to everyone for participating!
> > > > > > > >
> > > > > > > > BTW, we've agreed upon a number of other changes, but I don't
> > see
> > > > any
> > > > > > of
> > > > > > > > those changes on the KIP. Aakash, can you please update the
> KIP
> > > > > quickly
> > > > > > > so
> > > > > > > > we can make sure the other parts are the KIP are acceptable?
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Randall
> > > > > > > >
> > > > > > > > On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis <
> > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > >
> > > > > > > > > Thanks for the quick response Aakash.
> > > > > > > > >
> > > > > > > > > With respect to deprecation, this refers to deprecating
> this
> > > > method
> > > > > > in
> > > > > > > > > newer versions of Kafka Connect (and eventually removing
> it).
> > > > > > > > >
> > > > > > > > > As a connector developer, if you want your connector to run
> > > > across
> > > > > a
> > > > > > > wide
> > > > > > > > > spectrum of Connect versions, you'll have to take this into
> > > > > > > consideration
> > > > > > > > > and retain both methods in a functional state. The good
> news
> > is
> > > > > that
> > > > > > > both
> > > > > > > > > methods can share a lot of code, so in reality both the old
> > and
> > > > the
> > > > > > new
> > > > > > > > put
> > > > > > > > > will be thin shims over a `putRecord` method (or `process`
> > > method
> > > > > as
> > > > > > > you
> > > > > > > > > call it in the KIP).
> > > > > > > > >
> > > > > > > > > Given the above, there's no requirement to conditionally
> call
> > > one
> > > > > > > method
> > > > > > > > or
> > > > > > > > > the other in the framework based on configuration. Once you
> > > > > implement
> > > > > > > the
> > > > > > > > > new `put` with something other than its default
> > implementation,
> > > > as
> > > > > a
> > > > > > > > > connector developer, you'll know to adapt to the above.
> > > > > > > > >
> > > > > > > > > I definitely suggest extending our docs in a meaningful way
> > in
> > > > > order
> > > > > > to
> > > > > > > > > make the upgrade path easy to follow. Maybe you'd like to
> > add a
> > > > > note
> > > > > > to
> > > > > > > > > your compatibility section in this KIP as well.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Konstantine
> > > > > > > > >
> > > > > > > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah <
> > > as...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1
> > > > > > > > > >
> > > > > > > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis <
> > > > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Arjun,
> > > > > > > > > > >
> > > > > > > > > > > I think I agree with you that subject is interesting.
> > Yet,
> > > I
> > > > > feel
> > > > > > > it
> > > > > > > > > > > belongs to a separate future KIP. Reading the proposal
> in
> > > the
> > > > > KIP
> > > > > > > > > format
> > > > > > > > > > > will help, at least myself, to understand it better.
> > > > > > > > > > >
> > > > > > > > > > > Having said that, for the purpose of simplifying error
> > > > handling
> > > > > > for
> > > > > > > > > sink
> > > > > > > > > > > tasks, the discussion on KIP-610 has made some good
> > > progress
> > > > on
> > > > > > the
> > > > > > > > > > mailing
> > > > > > > > > > > list. If the few open items are reflected on the
> > proposal,
> > > > > maybe
> > > > > > > it'd
> > > > > > > > > be
> > > > > > > > > > > even worthwhile to consider it for inclusion in the
> > > upcoming
> > > > > > > release
> > > > > > > > > with
> > > > > > > > > > > its current scope.
> > > > > > > > > > >
> > > > > > > > > > > Konstantine
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish <
> > > > > > > arjun.sat...@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > I'm kinda hoping that we get to an approach on how to
> > > > extend
> > > > > > the
> > > > > > > > > > Connect
> > > > > > > > > > > > framework. Adding parameters in the put method is
> nice,
> > > and
> > > > > > maybe
> > > > > > > > > works
> > > > > > > > > > > for
> > > > > > > > > > > > now, but I'm not sure how scalable it is. It'd great
> to
> > > be
> > > > > able
> > > > > > > to
> > > > > > > > > add
> > > > > > > > > > > more
> > > > > > > > > > > > functionality in the future. Couple of examples:
> > > > > > > > > > > >
> > > > > > > > > > > > - make the metrics registry available to a task, so
> > they
> > > > can
> > > > > > > report
> > > > > > > > > > task
> > > > > > > > > > > > level metrics or
> > > > > > > > > > > > - be able to pass in a RestExtension handle to the
> > task,
> > > so
> > > > > the
> > > > > > > > task
> > > > > > > > > > can
> > > > > > > > > > > > provide a rest endpoint which users can hit to get
> some
> > > > task
> > > > > > > level
> > > > > > > > > > > > information (about its status, health, for example)
> > > > > > > > > > > >
> > > > > > > > > > > > In such scenarios, maybe adding new parameters to
> > > existing
> > > > > > > methods
> > > > > > > > > may
> > > > > > > > > > > not
> > > > > > > > > > > > be immediately acceptable.
> > > > > > > > > > > >
> > > > > > > > > > > > Since we are very close to a deadline, I wanted to
> > check
> > > if
> > > > > the
> > > > > > > one
> > > > > > > > > > more
> > > > > > > > > > > > possibility is acceptable :-)
> > > > > > > > > > > >
> > > > > > > > > > > > What if we could create a library that could be used
> by
> > > > > > connector
> > > > > > > > to
> > > > > > > > > > > > independently integrated by connector developers in
> > their
> > > > > > > > connectors.
> > > > > > > > > > The
> > > > > > > > > > > > library would be packaged and shipped with their
> > > connector
> > > > > like
> > > > > > > any
> > > > > > > > > > other
> > > > > > > > > > > > library on maven (and other similar repositories).
> The
> > > new
> > > > > > module
> > > > > > > > > would
> > > > > > > > > > > be
> > > > > > > > > > > > in the AK project, but its jars will *not* be added
> to
> > > > > > classpath
> > > > > > > > for
> > > > > > > > > > > > Connect worker.
> > > > > > > > > > > >
> > > > > > > > > > > > The library would provide a public interface for an
> > error
> > > > > > > reporter,
> > > > > > > > > > which
> > > > > > > > > > > > provides both synchronous and asynchronous
> > > functionalities
> > > > > (as
> > > > > > > was
> > > > > > > > > > > brought
> > > > > > > > > > > > up above).
> > > > > > > > > > > >
> > > > > > > > > > > > This would be an independent library, they can be
> > easily
> > > > > > bundled
> > > > > > > > and
> > > > > > > > > > > loaded
> > > > > > > > > > > > with the other connectors. The connect framework will
> > be
> > > > > > > decoupled
> > > > > > > > > from
> > > > > > > > > > > > this utility.
> > > > > > > > > > > >
> > > > > > > > > > > > I understand that a similar option is in the rejected
> > > > > > > alternatives,
> > > > > > > > > > > mostly
> > > > > > > > > > > > because of configuration overhead, but the
> > configuration
> > > > > > required
> > > > > > > > > here
> > > > > > > > > > > can
> > > > > > > > > > > > come directly from the worker properties (and just be
> > > copy
> > > > > > pasted
> > > > > > > > > from
> > > > > > > > > > > > there, maybe with a prefix). and I wonder (if maybe
> > part
> > > > as a
> > > > > > > > future
> > > > > > > > > > > KIP),
> > > > > > > > > > > > we can evaluate a strategy where certain worker
> configs
> > > can
> > > > > be
> > > > > > > > passed
> > > > > > > > > > to
> > > > > > > > > > > a
> > > > > > > > > > > > connector (for example, the producer/consume/admin
> > ones),
> > > > so
> > > > > > end
> > > > > > > > > users
> > > > > > > > > > do
> > > > > > > > > > > > not have to.
> > > > > > > > > > > >
> > > > > > > > > > > > Overall, we would get clean APIs, contracts and
> > > developers
> > > > > get
> > > > > > > > > freedom
> > > > > > > > > > to
> > > > > > > > > > > > use these libraries and functionalities however they
> > > want.
> > > > > The
> > > > > > > only
> > > > > > > > > > > > drawback is how this is configured (end-users will
> have
> > > to
> > > > > add
> > > > > > > more
> > > > > > > > > > lines
> > > > > > > > > > > > in the json/properties files). But all configs can
> > simply
> > > > > come
> > > > > > > from
> > > > > > > > > > > worker,
> > > > > > > > > > > > I believe this is relatively minor issue. We should
> be
> > > able
> > > > > to
> > > > > > > work
> > > > > > > > > out
> > > > > > > > > > > > compatibility issues in the implementations, so that
> > the
> > > > > > library
> > > > > > > > can
> > > > > > > > > > > safely
> > > > > > > > > > > > run (and degrade functionality if needed) with old
> > > workers.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, May 15, 2020 at 7:04 PM Aakash Shah <
> > > > > > as...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Just wanted to clarify that I am on board with
> adding
> > > the
> > > > > > > > > overloaded
> > > > > > > > > > > > > put(...) method.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Aakash
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah <
> > > > > > > as...@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Randall and Konstantine,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > As Chris and Arjun mentioned, I think the main
> > > concern
> > > > is
> > > > > > the
> > > > > > > > > > > potential
> > > > > > > > > > > > > > gap in which developers don't implement the
> > > deprecated
> > > > > > method
> > > > > > > > due
> > > > > > > > > > to
> > > > > > > > > > > a
> > > > > > > > > > > > > > misunderstanding of use cases. Using the setter
> > > method
> > > > > > > approach
> > > > > > > > > > > ensures
> > > > > > > > > > > > > > that the developer won't break backwards
> > > compatibility
> > > > > when
> > > > > > > > using
> > > > > > > > > > the
> > > > > > > > > > > > new
> > > > > > > > > > > > > > method due to a mistake. That being said, I think
> > the
> > > > > value
> > > > > > > > added
> > > > > > > > > > in
> > > > > > > > > > > > > > clarity of contract of when the error reporter
> will
> > > be
> > > > > > > invoked
> > > > > > > > > and
> > > > > > > > > > > > > overall
> > > > > > > > > > > > > > aesthetic while maintaining backwards
> compatibility
> > > > > > outweighs
> > > > > > > > the
> > > > > > > > > > > > > potential
> > > > > > > > > > > > > > mistake of a developer in not implementing the
> > > original
> > > > > > > > put(...)
> > > > > > > > > > > > method.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > With respect to synchrony, I agree with
> > Konstantine's
> > > > > > point,
> > > > > > > > that
> > > > > > > > > > we
> > > > > > > > > > > > > > should make it an opt-in feature of making the
> > > reporter
> > > > > > only
> > > > > > > > > > > > synchronous.
> > > > > > > > > > > > > > At the same time, I believe it is important to
> > > relieve
> > > > as
> > > > > > > much
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > > > burden of implementation as possible from the
> > > developer
> > > > > in
> > > > > > > this
> > > > > > > > > > case,
> > > > > > > > > > > > and
> > > > > > > > > > > > > > thus I think using a Callback rather than a
> Future
> > > > would
> > > > > be
> > > > > > > > > easier
> > > > > > > > > > on
> > > > > > > > > > > > the
> > > > > > > > > > > > > > developer, while adding asynchronous
> functionality
> > > with
> > > > > the
> > > > > > > > > ability
> > > > > > > > > > > to
> > > > > > > > > > > > > > opt-in synchronous functionality. I also believe
> > > making
> > > > > it
> > > > > > > > opt-in
> > > > > > > > > > > > > > synchronous vs. the other way simplifies
> > > implementation
> > > > > for
> > > > > > > the
> > > > > > > > > > > > developer
> > > > > > > > > > > > > > (blocking vs creating a new thread).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Please let me know your thoughts. I would like to
> > > come
> > > > > to a
> > > > > > > > > > consensus
> > > > > > > > > > > > > soon
> > > > > > > > > > > > > > due to the AK 2.6 deadlines; I will then shortly
> > > update
> > > > > the
> > > > > > > KIP
> > > > > > > > > and
> > > > > > > > > > > > > start a
> > > > > > > > > > > > > > vote.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Aakash
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, May 15, 2020 at 2:24 PM Randall Hauch <
> > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> On Fri, May 15, 2020 at 3:13 PM Arjun Satish <
> > > > > > > > > > > arjun.sat...@gmail.com>
> > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Couple of thoughts:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > 1. If we add new parameters to put(..), and
> new
> > > > > > connectors
> > > > > > > > > > > implement
> > > > > > > > > > > > > >> only
> > > > > > > > > > > > > >> > this method, it makes them backward
> incompatible
> > > > with
> > > > > > > older
> > > > > > > > > > > > workers. I
> > > > > > > > > > > > > >> > think newer connectors may only choose to only
> > > > > implement
> > > > > > > the
> > > > > > > > > > > latest
> > > > > > > > > > > > > >> method,
> > > > > > > > > > > > > >> > and we are passing the compatibility problems
> > back
> > > > to
> > > > > > the
> > > > > > > > > > > connector
> > > > > > > > > > > > > >> > developers.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> New connectors would have to implement both if
> > they
> > > > want
> > > > > > to
> > > > > > > > run
> > > > > > > > > in
> > > > > > > > > > > > older
> > > > > > > > > > > > > >> runtimes.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > 2. if we deprecate the older put() method and
> > > > > eventually
> > > > > > > > > remove
> > > > > > > > > > > it,
> > > > > > > > > > > > > then
> > > > > > > > > > > > > >> > old connectors are forward incompatible. If we
> > are
> > > > not
> > > > > > > going
> > > > > > > > > to
> > > > > > > > > > > > remove
> > > > > > > > > > > > > >> it,
> > > > > > > > > > > > > >> > then maybe we should not deprecate it?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> I don't think we'll ever remove deprecated
> methods
> > > --
> > > > > > > there's
> > > > > > > > no
> > > > > > > > > > > > reason
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> cut off older connectors.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > 3. if a record is realized to be erroneous
> > outside
> > > > > put()
> > > > > > > > (say,
> > > > > > > > > > in
> > > > > > > > > > > > > flush
> > > > > > > > > > > > > >> or
> > > > > > > > > > > > > >> > preCommit), how will it be reported?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> This is a concern no matter how the reporter is
> > > passed
> > > > > to
> > > > > > > the
> > > > > > > > > > task.
> > > > > > > > > > > > > >> Actually, I think it's more clear that the
> > reporter
> > > > > passed
> > > > > > > > > through
> > > > > > > > > > > > > >> `put(...)` should be used to record errors on
> the
> > > > > > > SinkRecords
> > > > > > > > > > passed
> > > > > > > > > > > > in
> > > > > > > > > > > > > >> the
> > > > > > > > > > > > > >> same method call.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > I do think the concern over aesthetics is an
> > > > important
> > > > > > > one,
> > > > > > > > > but
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > trade-off here is to exclude many connectors
> > that
> > > > are
> > > > > > out
> > > > > > > > > there
> > > > > > > > > > > from
> > > > > > > > > > > > > >> > running on worker versions. there may be
> > > production
> > > > > > > > > deployments
> > > > > > > > > > > that
> > > > > > > > > > > > > >> need
> > > > > > > > > > > > > >> > one old and one new connector that now cannot
> > work
> > > > on
> > > > > > any
> > > > > > > > > > version
> > > > > > > > > > > > of a
> > > > > > > > > > > > > >> > single worker. Building connectors is complex,
> > and
> > > > > it's
> > > > > > > > kinda
> > > > > > > > > > > unfair
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> > expect folks to make changes over aesthetic
> > > reasons
> > > > > > alone.
> > > > > > > > > This
> > > > > > > > > > is
> > > > > > > > > > > > > >> probably
> > > > > > > > > > > > > >> > the reason why popular framework APIs very
> > rarely
> > > > (and
> > > > > > > > > probably
> > > > > > > > > > > > never)
> > > > > > > > > > > > > >> > change.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> I don't see how passing the reporter through an
> > > > > overloaded
> > > > > > > > > > > `put(...)`
> > > > > > > > > > > > is
> > > > > > > > > > > > > >> less backward compatible. Because the runtime
> > > provides
> > > > > the
> > > > > > > > > > SinkTask
> > > > > > > > > > > > base
> > > > > > > > > > > > > >> class, the runtime has control over what the
> > methods
> > > > do
> > > > > by
> > > > > > > > > > default.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Overall, yes, the "public void
> > > > > > > > > > > > > >> errantRecordReporter(BiConsumer<SinkRecord,
> > > > > > > > > > > > > >> > Throwable> reporter) {}" proposal in the
> > original
> > > > KIP
> > > > > is
> > > > > > > > > > somewhat
> > > > > > > > > > > > of a
> > > > > > > > > > > > > >> > mouthful, but are there are any simpler
> > > alternatives
> > > > > > that
> > > > > > > do
> > > > > > > > > not
> > > > > > > > > > > > > exclude
> > > > > > > > > > > > > >> > existing connectors, adding operational
> burdens
> > > and
> > > > > yet
> > > > > > > > > provide
> > > > > > > > > > a
> > > > > > > > > > > > > clean
> > > > > > > > > > > > > >> > contract?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> IMO, overloading `put(...)` is cleaner and
> easier
> > to
> > > > > > > > understand
> > > > > > > > > --
> > > > > > > > > > > > plus
> > > > > > > > > > > > > >> the
> > > > > > > > > > > > > >> other benefits in my earlier email.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Best,
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > PS: Apologies if the language is incorrect or
> > some
> > > > > > points
> > > > > > > > are
> > > > > > > > > > > > unclear.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > On Fri, May 15, 2020 at 12:02 PM Randall
> Hauch <
> > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > On Fri, May 15, 2020 at 1:45 PM Konstantine
> > > > > > Karantasis <
> > > > > > > > > > > > > >> > > konstant...@confluent.io> wrote:
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > Thanks for the quick response Aakash.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > To your last point, modern APIs like this
> > tend
> > > > to
> > > > > be
> > > > > > > > > > > > asynchronous
> > > > > > > > > > > > > >> (see
> > > > > > > > > > > > > >> > > > admin, producer in Kafka) and such
> > definition
> > > > > > results
> > > > > > > in
> > > > > > > > > > more
> > > > > > > > > > > > > >> > expressive
> > > > > > > > > > > > > >> > > > and well defined APIs.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > +1
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > What you describe is easily an opt-in
> > feature
> > > > for
> > > > > > the
> > > > > > > > > > > connector
> > > > > > > > > > > > > >> > > developer.
> > > > > > > > > > > > > >> > > > At the same time, the latest description
> > > above,
> > > > > > gives
> > > > > > > us
> > > > > > > > > > > better
> > > > > > > > > > > > > >> chances
> > > > > > > > > > > > > >> > > for
> > > > > > > > > > > > > >> > > > this API to remain like this for longer,
> > > because
> > > > > it
> > > > > > > > covers
> > > > > > > > > > > both
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> > sync
> > > > > > > > > > > > > >> > > > and async per `put` user cases.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > +1
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > Given how simple the sync implementation
> > > > > > > > > > > > > >> > > > is, just by complying with the return type
> > of
> > > > the
> > > > > > > > method,
> > > > > > > > > I
> > > > > > > > > > > > still
> > > > > > > > > > > > > >> think
> > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > >> > > > BiFunction definition that returns a
> Future
> > > > makes
> > > > > > > sense.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > Konstantine
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > On Fri, May 15, 2020 at 11:27 AM Aakash
> > Shah <
> > > > > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > > Thanks for the additional feedback.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > I see the benefits of adding an
> overloaded
> > > > > > put(...)
> > > > > > > > over
> > > > > > > > > > > > > >> alternatives
> > > > > > > > > > > > > >> > > > and I
> > > > > > > > > > > > > >> > > > > am on board going forward with this
> > > approach.
> > > > It
> > > > > > > will
> > > > > > > > > > > > definitely
> > > > > > > > > > > > > >> set
> > > > > > > > > > > > > >> > > > forth
> > > > > > > > > > > > > >> > > > > a contract of where the reporter will be
> > > used
> > > > > with
> > > > > > > > > better
> > > > > > > > > > > > > >> aesthetics.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > The original idea of going with a
> > > synchronous
> > > > > > > approach
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > > >> error
> > > > > > > > > > > > > >> > > > > reporter was to ease the connector
> > > developer's
> > > > > job
> > > > > > > > > > > interacting
> > > > > > > > > > > > > >> with
> > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > >> > > > > handling the error reporter. The
> tradeoff
> > > for
> > > > > > > having a
> > > > > > > > > > > > > >> > synchronous-only
> > > > > > > > > > > > > >> > > > > reporter would be lower throughput on
> the
> > > > > > reporter;
> > > > > > > > this
> > > > > > > > > > was
> > > > > > > > > > > > > >> thought
> > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > >> > > > > fine since arguably most circumstances
> > would
> > > > not
> > > > > > > > include
> > > > > > > > > > > > > >> consistently
> > > > > > > > > > > > > >> > > > large
> > > > > > > > > > > > > >> > > > > amounts of records being sent to the
> error
> > > > > > reporter.
> > > > > > > > > Even
> > > > > > > > > > if
> > > > > > > > > > > > > this
> > > > > > > > > > > > > >> was
> > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > >> > > > > case, an argument can be made that the
> > lower
> > > > > > > > throughput
> > > > > > > > > > > would
> > > > > > > > > > > > be
> > > > > > > > > > > > > >> of
> > > > > > > > > > > > > >> > > > > assistance in this case, as it would
> allow
> > > > more
> > > > > > time
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > > > > user
> > > > > > > > > > > > > >> to
> > > > > > > > > > > > > >> > > > > realize the connector is having records
> > sent
> > > > to
> > > > > > the
> > > > > > > > > error
> > > > > > > > > > > > > reporter
> > > > > > > > > > > > > >> > > before
> > > > > > > > > > > > > >> > > > > many are sent. However, if we are
> strongly
> > > in
> > > > > > favor
> > > > > > > of
> > > > > > > > > > > having
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> > > option
> > > > > > > > > > > > > >> > > > of
> > > > > > > > > > > > > >> > > > > asynchronous functionality available for
> > the
> > > > > > > > developer,
> > > > > > > > > > > then I
> > > > > > > > > > > > > am
> > > > > > > > > > > > > >> > fine
> > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > >> > > > > that as well.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Lastly, I am on board with changing the
> > name
> > > > to
> > > > > > > > > > > > > >> failedRecordReporter,
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Please let me know your thoughts.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > > >> > > > > Aakash
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > On Fri, May 15, 2020 at 9:10 AM Randall
> > > Hauch
> > > > <
> > > > > > > > > > > > rha...@gmail.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > > Konstantine said:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > > I notice Randall also used
> BiFunction
> > in
> > > > his
> > > > > > > > > example,
> > > > > > > > > > I
> > > > > > > > > > > > > >> wonder if
> > > > > > > > > > > > > >> > > > it's
> > > > > > > > > > > > > >> > > > > > for
> > > > > > > > > > > > > >> > > > > > > similar reasons.
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Nope. Just a typo on my part.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > There appear to be three outstanding
> > > > > questions.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > First, Konstantine suggested calling
> > this
> > > > > > > > > > > > > >> "failedRecordReporter". I
> > > > > > > > > > > > > >> > > > think
> > > > > > > > > > > > > >> > > > > > this is minor, but using this new name
> > may
> > > > be
> > > > > a
> > > > > > > bit
> > > > > > > > > more
> > > > > > > > > > > > > precise
> > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > >> > > > I'd
> > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > >> > > > > > fine with this.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Second, should the reporter method be
> > > > > > > synchronous? I
> > > > > > > > > > think
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> two
> > > > > > > > > > > > > >> > > > > options
> > > > > > > > > > > > > >> > > > > > are:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > 2a. Use `BiConsumer<SinkRecord,
> > > Throwable>`
> > > > > that
> > > > > > > > > returns
> > > > > > > > > > > > > nothing
> > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > >> > > > > blocks
> > > > > > > > > > > > > >> > > > > > (at this time).
> > > > > > > > > > > > > >> > > > > > 2b. Use `BiFunction<SinkRecord,
> > Throwable,
> > > > > > > > > > Future<Void>>`
> > > > > > > > > > > > that
> > > > > > > > > > > > > >> > > returns
> > > > > > > > > > > > > >> > > > a
> > > > > > > > > > > > > >> > > > > > future that the user can optionally
> use
> > to
> > > > be
> > > > > > > > > > synchronous.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > I do agree with Konstantine that
> option
> > 2b
> > > > > gives
> > > > > > > us
> > > > > > > > > more
> > > > > > > > > > > > room
> > > > > > > > > > > > > >> for
> > > > > > > > > > > > > >> > > > future
> > > > > > > > > > > > > >> > > > > > semantic changes, and since the
> producer
> > > > write
> > > > > > is
> > > > > > > > > > already
> > > > > > > > > > > > > >> > > asynchronous
> > > > > > > > > > > > > >> > > > > this
> > > > > > > > > > > > > >> > > > > > should be straightforward to
> implement.
> > I
> > > > > think
> > > > > > > the
> > > > > > > > > > > concern
> > > > > > > > > > > > > >> here is
> > > > > > > > > > > > > >> > > > that
> > > > > > > > > > > > > >> > > > > if
> > > > > > > > > > > > > >> > > > > > the sink task does not *use* the
> future
> > to
> > > > > make
> > > > > > > this
> > > > > > > > > > > > > >> synchronous,
> > > > > > > > > > > > > >> > it
> > > > > > > > > > > > > >> > > is
> > > > > > > > > > > > > >> > > > > > very possible that the error records
> > could
> > > > be
> > > > > > > > written
> > > > > > > > > > out
> > > > > > > > > > > of
> > > > > > > > > > > > > >> order
> > > > > > > > > > > > > >> > > (due
> > > > > > > > > > > > > >> > > > > to
> > > > > > > > > > > > > >> > > > > > retries). But this won't be an issue
> if
> > > the
> > > > > > > > > > implementation
> > > > > > > > > > > > > uses
> > > > > > > > > > > > > >> > > > > >
> > `max.in.flight.requests.per.connection=1`
> > > > for
> > > > > > > > writing
> > > > > > > > > > the
> > > > > > > > > > > > > error
> > > > > > > > > > > > > >> > > > records.
> > > > > > > > > > > > > >> > > > > > It's a little less clear, but honestly
> > IMO
> > > > > > passing
> > > > > > > > the
> > > > > > > > > > > > > reporter
> > > > > > > > > > > > > >> in
> > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > >> > > > > > `put(...)` method helps make this
> lambda
> > > > > easier
> > > > > > to
> > > > > > > > > > > > understand,
> > > > > > > > > > > > > >> for
> > > > > > > > > > > > > >> > > some
> > > > > > > > > > > > > >> > > > > > strange reason. So unless there are
> good
> > > > > reasons
> > > > > > > to
> > > > > > > > > > avoid
> > > > > > > > > > > > > this,
> > > > > > > > > > > > > >> I'd
> > > > > > > > > > > > > >> > > be
> > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > >> > > > > > favor of 2b and returning a Future.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Third, how do we pass the reporter
> > lambda
> > > /
> > > > > > method
> > > > > > > > > > > reference
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> the
> > > > > > > > > > > > > >> > > > task?
> > > > > > > > > > > > > >> > > > > > My proposal to pass the reporter via
> an
> > > > > overload
> > > > > > > > > > > `put(...)`
> > > > > > > > > > > > > >> still
> > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > >> > > > > > most attractive to me, for several
> > > reasons:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > 3a. There's no need to pass the
> reporter
> > > > > > > separately
> > > > > > > > > > *and*
> > > > > > > > > > > to
> > > > > > > > > > > > > >> > describe
> > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > >> > > > > > changes in method call ordering.
> > > > > > > > > > > > > >> > > > > > 3b. As mentioned above, for some
> reason
> > > > > passing
> > > > > > it
> > > > > > > > via
> > > > > > > > > > > > > >> `put(...)`
> > > > > > > > > > > > > >> > > makes
> > > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > > >> > > > > > intent more clear that it be used when
> > > > > > processing
> > > > > > > > the
> > > > > > > > > > > > > >> SinkRecord,
> > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > >> > > > > that
> > > > > > > > > > > > > >> > > > > > it shouldn't be used in `start(...)`,
> > > > > > > > > `preCommit(...)`,
> > > > > > > > > > > > > >> > > > > > `onPartitionsAssigned(...)`, or any of
> > the
> > > > > other
> > > > > > > > task
> > > > > > > > > > > > methods.
> > > > > > > > > > > > > >> As
> > > > > > > > > > > > > >> > > > Andrew
> > > > > > > > > > > > > >> > > > > > pointed out earlier, *describing* this
> > in
> > > > the
> > > > > > KIP
> > > > > > > > and
> > > > > > > > > in
> > > > > > > > > > > > > JavaDoc
> > > > > > > > > > > > > >> > will
> > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > >> > > > > > tough to be exact yet succinct.
> > > > > > > > > > > > > >> > > > > > 3c. There is already precedence for
> > > evolving
> > > > > > > > > > > > > >> > > > > > `SourceTask.commitRecord(...)`, and
> the
> > > > > pattern
> > > > > > is
> > > > > > > > > > > > identical.
> > > > > > > > > > > > > >> > > > > > 3d. Backward compatibility is easy to
> > > > > > understand,
> > > > > > > > and
> > > > > > > > > at
> > > > > > > > > > > the
> > > > > > > > > > > > > >> same
> > > > > > > > > > > > > >> > > time
> > > > > > > > > > > > > >> > > > > it's
> > > > > > > > > > > > > >> > > > > > pretty easy to describe what
> > > implementations
> > > > > > that
> > > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > > take
> > > > > > > > > > > > > >> > > > advantage
> > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > >> > > > > > this feature should do.
> > > > > > > > > > > > > >> > > > > > 3e. Minimal changes to the interface:
> > > we're
> > > > > just
> > > > > > > > > > *adding*
> > > > > > > > > > > > one
> > > > > > > > > > > > > >> > default
> > > > > > > > > > > > > >> > > > > > method that calls the existing method
> > and
> > > > > > > > deprecating
> > > > > > > > > > the
> > > > > > > > > > > > > >> existing
> > > > > > > > > > > > > >> > > > > > `put(...)`.
> > > > > > > > > > > > > >> > > > > > 3f. Deprecating the existing
> `put(...)`
> > > > makes
> > > > > it
> > > > > > > > more
> > > > > > > > > > > clear
> > > > > > > > > > > > > in a
> > > > > > > > > > > > > >> > > > > > programmatic sense that new sink
> > > > > implementations
> > > > > > > > > should
> > > > > > > > > > > use
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> > > > reporter,
> > > > > > > > > > > > > >> > > > > > and that we recommend old sinks evolve
> > to
> > > > use
> > > > > > it.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Some of these benefits apply to some
> of
> > > the
> > > > > > other
> > > > > > > > > > > > suggestions,
> > > > > > > > > > > > > >> but
> > > > > > > > > > > > > >> > I
> > > > > > > > > > > > > >> > > > > think
> > > > > > > > > > > > > >> > > > > > none of the other suggestions have all
> > of
> > > > > these
> > > > > > > > > > benefits.
> > > > > > > > > > > > For
> > > > > > > > > > > > > >> > > example,
> > > > > > > > > > > > > >> > > > > > overloading `initialize(...)` is more
> > > > > difficult
> > > > > > > > since
> > > > > > > > > > most
> > > > > > > > > > > > > sink
> > > > > > > > > > > > > >> > > > > connectors
> > > > > > > > > > > > > >> > > > > > don't override it and therefore would
> be
> > > > less
> > > > > > > > subject
> > > > > > > > > to
> > > > > > > > > > > > > >> > deprecations
> > > > > > > > > > > > > >> > > > > > warnings. Overloading `start(...)` is
> > less
> > > > > > > > attractive.
> > > > > > > > > > > > Adding
> > > > > > > > > > > > > a
> > > > > > > > > > > > > >> > > method
> > > > > > > > > > > > > >> > > > > IMO
> > > > > > > > > > > > > >> > > > > > shares the fewest of these benefits.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > The one disadvantage of this approach
> is
> > > > that
> > > > > > sink
> > > > > > > > > task
> > > > > > > > > > > > > >> > > implementations
> > > > > > > > > > > > > >> > > > > > can't rely upon the reporter upon
> > startup.
> > > > IMO
> > > > > > > > that's
> > > > > > > > > an
> > > > > > > > > > > > > >> acceptable
> > > > > > > > > > > > > >> > > > > > tradeoff to get the cleaner and more
> > > > explicit
> > > > > > API,
> > > > > > > > > > > > especially
> > > > > > > > > > > > > if
> > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > >> > > > API
> > > > > > > > > > > > > >> > > > > > contract is that Connect will pass the
> > > same
> > > > > > > reporter
> > > > > > > > > > > > instance
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> > each
> > > > > > > > > > > > > >> > > > > call
> > > > > > > > > > > > > >> > > > > > to `put(...)` on a single task
> instance.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Best regards,
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Randall
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > On Fri, May 15, 2020 at 6:59 AM Andrew
> > > > > > Schofield <
> > > > > > > > > > > > > >> > > > > > andrew_schofi...@live.com>
> > > > > > > > > > > > > >> > > > > > wrote:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > > Hi,
> > > > > > > > > > > > > >> > > > > > > Randall's suggestion is really
> good. I
> > > > think
> > > > > > it
> > > > > > > > > gives
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > > flexibility
> > > > > > > > > > > > > >> > > > > > > required and also
> > > > > > > > > > > > > >> > > > > > > keeps the interface the right way
> > round.
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > Thanks,
> > > > > > > > > > > > > >> > > > > > > Andrew Schofield
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > On 15/05/2020, 02:07, "Aakash
> Shah" <
> > > > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Hi Randall,
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback.
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > 1. This is a great suggestion,
> but I
> > > > find
> > > > > > that
> > > > > > > > > > adding
> > > > >

Reply via email to