Thanks, Aakash.

After thinking about my previous proposal, I would like to retract the
suggestion of adding the `report(SinkTask, Throwable, Callable)` method
from the new interface for the following reasons:

   1. The `Callable` interface requires the sink task developer to handle
   an error being passed to the callback, but in this case it's very unlikely
   the Connect runtime will ever call the callback with an error and will
   instead handle it (e.g., retry forever, or fail the task, etc.). IOW, the
   `Callback` interface is appropriate for `Producer`, but it's far more broad
   than is needed here.
   2. Without the callback, the Connect runtime will not run task code
   within the DLQ producer thread, and this is very safe. But when a callback
   is provided, that callback *will* be called on the DQL producer's thread --
   and any mistakes in the sink task's callback may block the DLQ. IOW, having
   a callback is too risky.
   3. We actually don't know that a callback is even necessary.
   4. Just having the one `report(SinkTask, Throwable)` is simpler and,
   given the looming deadline, much closer to what we've already discussed.

My previous proposal also used "NoSuchClassError" in the JavaDoc and
example, but that was a typo. Per Arjun's message, the actual exception
class should be "NoClassDefFoundError".

I made a few others suggestions yesterday about some other parts of the
KIP, and hope those will be considered as well.

Best regards,

Randall

On Sun, May 17, 2020 at 12:34 PM Aakash Shah <as...@confluent.io> wrote:

> Hi Randall,
>
> Thanks for the suggestions. Now that we are adding an interface, I think it
> is a good idea to overload the report method to support both cases.
>
> > I guess it boils down to this question: do we know today that we will
> > *never* want the reporter to write asynchronously?
>
> Originally, I believe the idea was to initially implement a synchronous
> design in order to ensure the guarantees mentioned by Magesh, and then
> introduce a new method in a subsequent KIP for an asynchronous error
> reporter if there were requests for it. But I agree with what you're
> saying, if we can design it asynchronously, then it opens up the
> possibilities to implement it in multiple ways in the future.
>
> Thanks,
> Aakash
>
> On Sun, May 17, 2020 at 9:04 AM Randall Hauch <rha...@gmail.com> wrote:
>
> > Thanks, Arjun! This has been very helpful.
> >
> > Looking in your POC and thinking in terms of the current KIP, it sounds
> > like the suggestion is to keep the same method signature for reporting
> > errors, but to change from the `BiFunction<SinkRecord, Throwable,
> > Future<Void>` to a new `ErrantRecordReporter` interface. More concretely,
> > I'd suggest the KIP propose adding the following new interface:
> >
> > /**
> >  * Component that the sink task can use as it {@link SinkTask#put(/**
> >  * Reporter of problematic records and the corresponding problems.
> >  *
> >  * @since 2.9
> >  */
> > public interface ErrantRecordReporter {
> >
> >     /**
> >      * Report a problematic record and the corresponding error to be
> > written to the sink
> >      * connector's dead letter queue (DLQ).
> >      *
> >      * <p>This call is asynchronous and returns a {@link
> > java.util.concurrent.Future Future} for the
> >      * {@link RecordMetadata} that will be assigned to the record in the
> > DLQ topic. Invoking
> >      * {@link java.util.concurrent.Future#get() get()} on this future
> will
> > block until the
> >      * record has been written and then return the metadata for the
> record
> >      * or throw any exception that occurred while sending the record.
> >      * If you want to simulate a simple blocking call you can call the
> > <code>get()</code> method
> >      * immediately.
> >      *
> >      * @param record the problematic record; may not be null
> >      * @param error  the error capturing the problem with the record; may
> > not be null
> >      * @return a future that can be used to block until the record and
> > error are written
> >      *         to the DLQ
> >      * @since 2.9
> >      */
> >     default Future<RecordMetadata> report(SinkRecord record, Throwable
> > error) {
> >         return report(record, error, null);
> >     }
> >
> >     /**
> >      * Report a problematic record and the corresponding error to be
> > written to the sink
> >      * connector's dead letter queue (DLQ).
> >      *
> >      * <p>This call is asynchronous and returns a {@link
> > java.util.concurrent.Future Future} for the
> >      * {@link RecordMetadata} that will be assigned to the record in the
> > DLQ topic. Invoking
> >      * {@link java.util.concurrent.Future#get() get()} on this future
> will
> > block until the
> >      * record has been written and then return the metadata for the
> record
> >      * or throw any exception that occurred while sending the record.
> >      * If you want to simulate a simple blocking call you can call the
> > <code>get()</code> method
> >      * immediately.
> >      *
> >      * <p>Fully non-blocking usage can make use of the {@link Callback}
> > parameter to provide a
> >      * callback that will be invoked when the request is complete.
> > Callbacks for records being
> >      * sent to the same partition are guaranteed to execute in order.
> >      *
> >      * @param record   the problematic record; may not be null
> >      * @param error    the error capturing the problem with the record;
> may
> > not be null
> >      * @param callback A user-supplied callback to execute when the
> record
> > has been acknowledged
> >      *                 by the server; may be null for no callback
> >      * @return a future that can be used to block until the record and
> > error are written
> >      *         to the DLQ
> >      * @since 2.9
> >      */
> >     Future<RecordMetadata> report(SinkRecord record, Throwable error,
> > Callback callback);
> > }
> >
> > and then modify the proposed changed to the SinkTaskContext to be:
> >
> > public interface SinkTaskContext {
> >     ...
> >     /**
> >      * Get the reporter to which the sink task can report problematic
> > {@link SinkRecord} objects
> >      * passed to the {@link SinkTask#put(Collection)} method.
> >      *
> >      * <p>This method was added in Apache Kafka 2.9. Sink tasks that use
> > this method and want to
> >      * maintain backward compatibility so they can also be installed in
> > older Connect runtimes
> >      * should guard its use with a try-catch block, since calling this
> > method will result in a
> >      * {@link NoSuchClassError} or {@link NoSuchMethodError} when the
> sink
> > connector is
> >      * used in Connect runtimes older than Kafka 2.9. For example:
> >      * <pre>
> >      *     ErrantRecordReporter reporter;
> >      *     try {
> >      *         reporter = context.failedRecordReporter();
> >      *     } catch (NoSuchClassError | NoSuchMethodError e) {
> >      *         reporter = null;
> >      *     }
> >      * </pre>
> >      *
> >      * @return the reporter function; null if no error reporter has been
> > configured for the connector
> >      * @since 2.9
> >      */
> >     ErrantRecordReporter failedRecordReporter();
> > }
> >
> > The example usage then becomes:
> >
> > private ErrantRecordReporter reporter;
> >
> > @Override
> > public void start(Map<String, String> props) {
> >   ...
> >   try {
> >     reporter = context.failedRecordReporter(); // may be null if DLQ not
> > enabled
> >   } catch (NoSuchClassError | NoSuchMethodError e) {
> >     // Will occur in Connect runtimes earlier than 2.9
> >     reporter = null;
> >   }
> > }
> >
> > @Override
> > public void put(Collection<SinkRecord> records) {
> >   for (SinkRecord record: records) {
> >     try {
> >       // attempt to send record to data sink
> >       process(record);
> >     } catch(Exception e) {
> >       if (reporter != null) {
> >         // Send errant record to error reporter
> >         reporter.accept(record, e, (metadata, error) -> {
> >             // do something
> >         ));
> >       } else {
> >         // There's no error reporter, so fail
> >         throw new ConnectException("Failed on record", e);
> >       }
> >     }
> >   }
> > }
> >
> > My suggestion for the `report(...)` method to return a `Future` as
> > discussed above to enable synchronous usage but to change the
> parameterized
> > type of the future to be `Future<RecordMetadata>` to mirror the
> > `Producer.send(...)` methods. This gives the sink task the ability to use
> > the record metadata information, and the implementation is much more
> > straightforward since the future is identical to the
> Producer.send(...)`. I
> > also suggest we overload the `report(...)` method to take a `Callback`,
> > again just like the `Producer.send(...)` methods, to enable non-blocking
> > usage.
> >
> > In light of Arjun's prototype, I think the KIP should also propose
> updating
> > the FileSinkTask example to use the new API, with proper error handling
> > that would allow the newer file sink connector to be deployed in older
> > Connect runtimes.
> >
> > Finally, the KIP's wording about backward compatibility should be updated
> > to something more like:
> >
> > "This proposal is backward compatible such that existing sink connector
> > implementations will continue to work as before. Developers can
> optionally
> > modify sink connector implementations to use the new error reporting
> > feature, yet still easily support installing and running those connectors
> > in older Connect runtimes where the feature does not exist."
> >
> > WDYT?
> >
> > Best regards,
> >
> > Randall
> >
> > On Sat, May 16, 2020 at 11:43 PM Chris Egerton <chr...@confluent.io>
> > wrote:
> >
> > > Hi Arjun,
> > >
> > > This is great news. Given that we're already willing to ask developers
> to
> > > catch a method-not-found exception and it sounds like a new interface
> can
> > > be handled in a similar try/catch block in a same place, I like the
> idea
> > of
> > > a new fleshed-out interface instead of a BiConsumer or a BiFunction.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Sat, May 16, 2020, 21:29 Arjun Satish <arjun.sat...@gmail.com>
> wrote:
> > >
> > > > ok folks, this is my POC PR:
> > > > https://github.com/wicknicks/kafka/tree/kip-610-2.4.1. connectors
> > built
> > > > from this were copied into a fresh installation of Kafka Connect
> (v2.5,
> > > the
> > > > latest version), and ran. Without proper try-catch, the tasks would
> > fail.
> > > > But when the appropriate exceptions were handled, the task proceeded
> > > > without any issues. I've added some comments here
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/commit/295e6a46925993060a60b79e646e06432480ed0d
> > > > that document these errors. Tested this with both java 8 and 11.
> > > >
> > > > To check if it is safe to add methods and new classes, I dug around
> the
> > > JVM
> > > > spec <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html
> >.
> > > The
> > > > following points are being made here:
> > > >
> > > > 1. During the loading phase, a number of structural details of the
> > class
> > > > are added to the run-time constant pool (class descriptor, fields,
> > > > methods). what these methods themselves do are not considered.
> > > > 2. During the linking phase, resolution of symbolic references is
> > > optional.
> > > > But the important bit is this
> > > > <
> > https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4
> > > >:
> > > > Whichever strategy (lazy or eager) is followed, any error detected
> > during
> > > > resolution must be thrown at a point in the program that (directly or
> > > > indirectly) uses a symbolic reference to the class or interface.
> > > >
> > > > So this means that any new classes that were originally relied upon
> the
> > > > connector that are missing now, will be in some internal "not found"
> > say
> > > > but no errors will be thrown till resolution stage.
> > > >
> > > > 3. Section 5.4.3
> > > > <
> > >
> >
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3
> > > > >
> > > > says execution of any of the (anewarray, checkcast etc) instructions
> > > > requires resolution of its symbolic reference, which is good since
> the
> > > > instructions are executed in order, and the connector developer has
> > time
> > > to
> > > > execute some code before program has to look for a unknown object.
> > > >
> > > > We are also safe in that the new interface will be used only during
> or
> > > > after start() is called, so there should not be any fields that do
> > > > something like:
> > > >
> > > > private final ErrantRecordReporter reporter =
> > > > sinkTaskContext.failedRecordReporter();
> > > >
> > > > The initialization will only be valid in start(), where we can safely
> > > wrap
> > > > around try catch issues. The errors being thrown are very precise.
> > > > NoSuchMethodError
> > > > <
> > > >
> > >
> >
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3.3
> > > > >
> > > > for method not found, and NoClassDefFoundError for any classes loaded
> > > > during resolution
> > > > <
> > https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.3
> > > >.
> > > >
> > > > BTW, one more example that comes to mind is the JDBC spec (the
> java.sql
> > > > package has been changing with every major version of Java). Newer
> > > classes
> > > > are added (look at java.sql.DriverAction (in Java 8) and
> > > > java.sql.ShardingKey (in java 9)). New methods to existing classes
> are
> > > > being added as well (for example,
> > > > java.sql.DriverManager#registerDriver(java.sql.Driver,
> > > > java.sql.DriverAction)
> > > > <
> > > >
> > >
> >
> https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#registerDriver-java.sql.Driver-java.sql.DriverAction-
> > > > >).
> > > > and the expectation is that the driver should handle any runtime
> > > > inconsistencies.
> > > >
> > > > Overall, I think it should be safe to have new interfaces, and have
> > them
> > > > loaded safely, with correct checks.
> > > >
> > > > BTW, one more opportunity we have is that the connector can check if
> > > these
> > > > new methods are present or not. For example:
> > > >
> > > >
> > > >
> > >
> >
> Class.forName("org.apache.kafka.connect.sink.SinkTaskContext").getMethod("failedRecordReporter");
> > > >
> > > > And based on this, chose to return old a different task in
> > > > SinkConnector#taskClass() if they want to hyper-safe.
> > > >
> > > > Apologies for throwing in such a wrench at the last minute! But IMHO
> > it'd
> > > > be good to take this opportunity if we can.
> > > >
> > > > Best,
> > > >
> > > > On Sat, May 16, 2020 at 6:05 PM Randall Hauch <rha...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks for updating the KIP, Aakash. A few comments on the updated
> > > > content
> > > > > there:
> > > > >
> > > > > In order to avoid error records being written out of order (for
> > > example,
> > > > > > due to retries), the developer can use
> > > > > > `max.in.flight.requests.per.connection=1` in their implementation
> > for
> > > > > > writing error records.
> > > > > >
> > > > >
> > > > > IMO, the DLQ should always use this setting to prevent retries, and
> > the
> > > > > developer can always set this property for the DLQ to something
> > larger
> > > > than
> > > > > 1 if order is not important and they need the extreme performance.
> > > > >
> > > > > Along with the original errant sink record, the exception thrown
> will
> > > be
> > > > > > sent to the error reporter to provide additional context.
> > > > > >
> > > > >
> > > > > This is also not really clear. What thrown exception is this
> > referring
> > > > to?
> > > > > Isn't the exception *passed* to the reporter method?
> > > > >
> > > > > I do think we need to better explain from the sink task's
> perspective
> > > > what
> > > > > behavior it should expect? Is there any case when the reporter will
> > be
> > > > null
> > > > > or be a no-op, and if so what should the sink task do? Should it
> > simply
> > > > > wrap and throw a ConnectException? And if there is a reporter,
> won't
> > > > > Connect treat this sink record as "processed" with respect to the
> > > > existing
> > > > > behavior of passing "processed" offsets back to the sink task's
> > > > > `preCommit(...)` method.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Randall
> > > > >
> > > > > On Sat, May 16, 2020 at 6:38 PM Arjun Satish <
> arjun.sat...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Yeah I had tried this locally on java 8 and 11, and it had seemed
> > to
> > > > > work.
> > > > > > Let me clean up and publish my code in a branch somewhere so we
> can
> > > > take
> > > > > a
> > > > > > look at it.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > On Sat, May 16, 2020 at 3:39 PM Randall Hauch <rha...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Have you tried this? IIUC the problem is with the new type, and
> > any
> > > > > class
> > > > > > > that uses ‘ErrantRecordReporter’ with an import would fail to
> be
> > > > loaded
> > > > > > by
> > > > > > > the classloader if the type does not exist (I.e., pre-2.9
> Connect
> > > > > > > runtimes). Catching that ClassNotFoundException and dynamically
> > > > > importing
> > > > > > > the type is actually much harder.
> > > > > > >
> > > > > > > On Sat, May 16, 2020 at 5:07 PM Aakash Shah <
> as...@confluent.io>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Arjun,
> > > > > > > >
> > > > > > > > Thanks for this suggestion. I actually like this a lot
> because
> > a
> > > > > > defined
> > > > > > > > interface looks more appealing and is clearer in its
> intention.
> > > > Since
> > > > > > we
> > > > > > > > are still using NoSuchMethodException to account for
> backwards
> > > > > > > > compatibility, this works for me. I can't see any drawbacks
> > > besides
> > > > > > > having
> > > > > > > > to call the getter method for the processing of every errant
> > > > record.
> > > > > > > >
> > > > > > > > I would like to hear others' thoughts on if this drawback
> > > outweighs
> > > > > the
> > > > > > > > added benefit of clarity.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Aakash
> > > > > > > >
> > > > > > > > On Sat, May 16, 2020 at 2:54 PM Arjun Satish <
> > > > arjun.sat...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Konstantine, happy to write something up in a KIP.
> > But I
> > > > > think
> > > > > > > it
> > > > > > > > > would be redundant if we add this kip. What do you think?
> > > > > > > > >
> > > > > > > > > Also, Randall, yes that API would work. But, if we expect
> the
> > > > > > > developers
> > > > > > > > to
> > > > > > > > > catch NoSuchMethodErrors, then should we also go ahead and
> > > make a
> > > > > > class
> > > > > > > > > that would have a report method(similar to ErrorReporter
> > > > > > > > > <
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
> > > > > > > > > >
> > > > > > > > > but maybe with different arguments on the report method)?
> > they
> > > > > would
> > > > > > > also
> > > > > > > > > have to catch NoClassDefFoundError.
> > > > > > > > >
> > > > > > > > > That would modify your change in SinkTaskContext to:
> > > > > > > > >
> > > > > > > > > public interface SinkTaskContext {
> > > > > > > > >     /**
> > > > > > > > >      * Get the reporter to which the sink task can report
> > > > > problematic
> > > > > > > or
> > > > > > > > >      * failed {@link SinkRecord} passed to the {@link
> > > > > > > > > SinkTask#put(Collection)} method.
> > > > > > > > >      *
> > > > > > > > >      * @return a errant record reporter
> > > > > > > > >      */
> > > > > > > > >     ErrantRecordReporter failedRecordReporter();
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > where ErrantRecordReporter is:
> > > > > > > > >
> > > > > > > > > public interface ErrantRecordReporter {
> > > > > > > > >
> > > > > > > > >     /**
> > > > > > > > >      * Serialize and produce records to the error topic
> > > > > > > > >      *
> > > > > > > > >      * @param record the errant record
> > > > > > > > >      */
> > > > > > > > >     void report(SinkRecord record, Throwable error);
> > > > > > > > >
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > Usage in put would be the same though if the class is not
> > > > > explicitly
> > > > > > > > named:
> > > > > > > > >
> > > > > > > > >         try {
> > > > > > > > >             context.failedRecordReporter().report(record,
> > > error);
> > > > > > > > >         } catch (NoSuchMethodError e) {
> > > > > > > > >             log.info("Boooooooooo!");
> > > > > > > > >         }
> > > > > > > > >
> > > > > > > > > Thoughts?
> > > > > > > > >
> > > > > > > > > On Sat, May 16, 2020 at 12:46 PM Konstantine Karantasis <
> > > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for following up Randall.
> > > > > > > > > >
> > > > > > > > > > I agree with your latest suggestion. It was good that we
> > > > explored
> > > > > > > > several
> > > > > > > > > > options but accessing the context to obtain the reporter
> in
> > > > Kafka
> > > > > > > > Connect
> > > > > > > > > > versions that support this feature makes the most sense.
> > The
> > > > > burden
> > > > > > > for
> > > > > > > > > > connector developers that want to use this reporter _and_
> > > make
> > > > > > > > connectors
> > > > > > > > > > compatible with old and new workers is minimal.
> > > > > > > > > >
> > > > > > > > > > We'll have to leave with additions like this and the
> > > appearance
> > > > > in
> > > > > > > both
> > > > > > > > > > KIP-131 and here in KIP-610 indeed creates a reasonable
> > > > > precedent.
> > > > > > > > > >
> > > > > > > > > > Konstantine
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch <
> > > > rha...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks again for the active discussion!
> > > > > > > > > > >
> > > > > > > > > > > Regarding the future-vs-callback discussion: I did like
> > > where
> > > > > > Chris
> > > > > > > > was
> > > > > > > > > > > going with the Callback, but he raises good point that
> > it's
> > > > > > unclear
> > > > > > > > > what
> > > > > > > > > > to
> > > > > > > > > > > use for the reporter type, since we'd need three
> > > parameters.
> > > > > > > > > Introducing
> > > > > > > > > > a
> > > > > > > > > > > new interface makes it much harder for a sink task to
> be
> > > > > backward
> > > > > > > > > > > compatible, so sticking with BiFunction is a good
> > > compromise.
> > > > > > Plus,
> > > > > > > > > > another
> > > > > > > > > > > significant disadvantage of a callback approach is
> that a
> > > > sink
> > > > > > > task's
> > > > > > > > > > > callback is called from the producer thread, and this
> > > risks a
> > > > > > > > > > > poorly written sink task callback killing the
> reporter's
> > > > > producer
> > > > > > > > > without
> > > > > > > > > > > necessarily failing the task. Using a future avoids
> this
> > > risk
> > > > > > > > > altogether,
> > > > > > > > > > > still provides the sink task with the ability to do
> > > > synchronous
> > > > > > > > > reporting
> > > > > > > > > > > using Future, which is a standard and conventional
> design
> > > > > > pattern.
> > > > > > > So
> > > > > > > > > we
> > > > > > > > > > do
> > > > > > > > > > > seem to have converged on using `BiFunction<SinkRecord,
> > > > > > Throwable,
> > > > > > > > > > > Future<Void>>` for the reporter type.
> > > > > > > > > > >
> > > > > > > > > > > Now, we still seem to not have converted upon how to
> pass
> > > the
> > > > > > > > reporter
> > > > > > > > > to
> > > > > > > > > > > the sink task. I agree with Konstantine that the
> > > deprecation
> > > > > > > affects
> > > > > > > > > only
> > > > > > > > > > > newer versions of Connect, and that a sink task should
> > deal
> > > > > with
> > > > > > > both
> > > > > > > > > put
> > > > > > > > > > > methods only when it wants to support older runtimes. I
> > > also
> > > > > > think
> > > > > > > > that
> > > > > > > > > > > this is a viable approach, but I do concede that this
> > > > evolution
> > > > > > of
> > > > > > > > the
> > > > > > > > > > sink
> > > > > > > > > > > task API is more complicated than it should be.
> > > > > > > > > > >
> > > > > > > > > > > In the interest of quickly coming to consensus on how
> we
> > > pass
> > > > > the
> > > > > > > > > > reporter
> > > > > > > > > > > to the sink task, I'd like to go back to Andrew's
> > original
> > > > > > > > suggestion,
> > > > > > > > > > > which I think we disregarded too quickly: add a getter
> on
> > > the
> > > > > > > > > > > SinkTaskContext interface. We already have precedent
> for
> > > > adding
> > > > > > > > methods
> > > > > > > > > > to
> > > > > > > > > > > one of the context classes with the newly-adopted
> > KIP-131,
> > > > > which
> > > > > > > > adds a
> > > > > > > > > > > getter for the OffsetStorageReader on the (new)
> > > > > > > > SourceConnectorContext.
> > > > > > > > > > > That KIP accepts the fact that a source connector
> wanting
> > > to
> > > > > use
> > > > > > > this
> > > > > > > > > > > feature while also keeping the ability to be installed
> > into
> > > > > older
> > > > > > > > > Connect
> > > > > > > > > > > runtimes must guard its use of the context's getter
> > method.
> > > > > > > > > > >
> > > > > > > > > > > I think we can use the same pattern for this KIP, and
> > add a
> > > > > > getter
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > existing SinkTaskContext that is defined something
> like:
> > > > > > > > > > >
> > > > > > > > > > > public interface SinkTaskContext {
> > > > > > > > > > >     ...
> > > > > > > > > > >     /**
> > > > > > > > > > >      * Get the reporter to which the sink task can
> report
> > > > > > > problematic
> > > > > > > > > or
> > > > > > > > > > > failed {@link SinkRecord}
> > > > > > > > > > >      * passed to the {@link SinkTask#put(Collection)}
> > > method.
> > > > > > When
> > > > > > > > > > > reporting a failed record,
> > > > > > > > > > >      * the sink task will receive a {@link Future} that
> > the
> > > > > task
> > > > > > > can
> > > > > > > > > > > optionally use to wait until
> > > > > > > > > > >      * the failed record and exception have been
> written
> > to
> > > > > Kafka
> > > > > > > via
> > > > > > > > > > > Connect's DLQ. Note that
> > > > > > > > > > >      * the result of this method may be null if this
> > > > connector
> > > > > > has
> > > > > > > > not
> > > > > > > > > > been
> > > > > > > > > > > configured with a DLQ.
> > > > > > > > > > >      *
> > > > > > > > > > >      * <p>This method was added in Apache Kafka 2.9.
> Sink
> > > > tasks
> > > > > > > that
> > > > > > > > > use
> > > > > > > > > > > this method but want to
> > > > > > > > > > >      * maintain backward compatibility so they can also
> > be
> > > > > > deployed
> > > > > > > > to
> > > > > > > > > > > older Connect runtimes
> > > > > > > > > > >      * should guard the call to this method with a
> > > try-catch
> > > > > > block,
> > > > > > > > > since
> > > > > > > > > > > calling this method will result in a
> > > > > > > > > > >      * {@link NoSuchMethodException} when the sink
> > > connector
> > > > is
> > > > > > > > > deployed
> > > > > > > > > > to
> > > > > > > > > > > Connect runtimes
> > > > > > > > > > >      * older than Kafka 2.9. For example:
> > > > > > > > > > >      * <pre>
> > > > > > > > > > >      *     BiFunction&lt;SinkTask, Throwable,
> > > > > > > Future&lt;Void&gt;&gt;
> > > > > > > > > > > reporter;
> > > > > > > > > > >      *     try {
> > > > > > > > > > >      *         reporter =
> context.failedRecordReporter();
> > > > > > > > > > >      *     } catch (NoSuchMethodException e) {
> > > > > > > > > > >      *         reporter = null;
> > > > > > > > > > >      *     }
> > > > > > > > > > >      * </pre>
> > > > > > > > > > >      *
> > > > > > > > > > >      * @return the reporter function; null if no error
> > > > reporter
> > > > > > has
> > > > > > > > > been
> > > > > > > > > > > configured for the connector
> > > > > > > > > > >      * @since 2.9
> > > > > > > > > > >      */
> > > > > > > > > > >     BiFunction<SinkTask, Throwable, Future<Void>>
> > > > > > > > > failedRecordReporter();
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > The main advantage is that the KIP no longer has to
> make
> > > *any
> > > > > > > other*
> > > > > > > > > > > changes to the Sink Connector or Task API. The above is
> > > > really
> > > > > > the
> > > > > > > > only
> > > > > > > > > > > change, and it's merely an addition to the API. No
> > > > deprecation
> > > > > > and
> > > > > > > no
> > > > > > > > > > > overloading methods. The KIP does need to explain how
> the
> > > > > > reporter
> > > > > > > is
> > > > > > > > > > > configured and used (which it already does), but IMO
> the
> > > KIP
> > > > > > > doesn't
> > > > > > > > > need
> > > > > > > > > > > to describe  when this reporter can/should be used.
> After
> > > > all,
> > > > > > this
> > > > > > > > > > method
> > > > > > > > > > > is on the existing SinkTaskContext, so this method is
> > > really
> > > > no
> > > > > > > > > different
> > > > > > > > > > > than any other existing method. I think my JavaDoc
> (which
> > > is
> > > > > > just a
> > > > > > > > > > > suggestion for a starting point that Aakash can improve
> > as
> > > > > > needed)
> > > > > > > > > > > describes how easy it is for a sink to maintain
> backward
> > > > > > > > compatibility.
> > > > > > > > > > > (The use of `BiFunction` helps tremendously.) Another
> not
> > > > > > > > insignificant
> > > > > > > > > > > advantage is that a sink task can use this reporter
> > > reference
> > > > > > > > > throughout
> > > > > > > > > > > the task's lifetime (after it's started and before it
> is
> > > > > > stopped),
> > > > > > > > > making
> > > > > > > > > > > it less invasive for existing sink task implementations
> > > that
> > > > > want
> > > > > > > to
> > > > > > > > > use
> > > > > > > > > > > it.
> > > > > > > > > > >
> > > > > > > > > > > I hope we can all get being this compromise, which IMO
> is
> > > > > > actually
> > > > > > > > > super
> > > > > > > > > > > clean and makes a lot of sense. Thanks, Andrew, for
> > > > originally
> > > > > > > > > suggesting
> > > > > > > > > > > it. I know we're all trying to improve the Connect API
> > in a
> > > > way
> > > > > > > that
> > > > > > > > > > makes
> > > > > > > > > > > sense, and deliberate and constructive discussion is a
> > > > healthy
> > > > > > > thing.
> > > > > > > > > > > Thanks again to everyone for participating!
> > > > > > > > > > >
> > > > > > > > > > > BTW, we've agreed upon a number of other changes, but I
> > > don't
> > > > > see
> > > > > > > any
> > > > > > > > > of
> > > > > > > > > > > those changes on the KIP. Aakash, can you please update
> > the
> > > > KIP
> > > > > > > > quickly
> > > > > > > > > > so
> > > > > > > > > > > we can make sure the other parts are the KIP are
> > > acceptable?
> > > > > > > > > > >
> > > > > > > > > > > Best regards,
> > > > > > > > > > >
> > > > > > > > > > > Randall
> > > > > > > > > > >
> > > > > > > > > > > On Sat, May 16, 2020 at 12:24 PM Konstantine
> Karantasis <
> > > > > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks for the quick response Aakash.
> > > > > > > > > > > >
> > > > > > > > > > > > With respect to deprecation, this refers to
> deprecating
> > > > this
> > > > > > > method
> > > > > > > > > in
> > > > > > > > > > > > newer versions of Kafka Connect (and eventually
> > removing
> > > > it).
> > > > > > > > > > > >
> > > > > > > > > > > > As a connector developer, if you want your connector
> to
> > > run
> > > > > > > across
> > > > > > > > a
> > > > > > > > > > wide
> > > > > > > > > > > > spectrum of Connect versions, you'll have to take
> this
> > > into
> > > > > > > > > > consideration
> > > > > > > > > > > > and retain both methods in a functional state. The
> good
> > > > news
> > > > > is
> > > > > > > > that
> > > > > > > > > > both
> > > > > > > > > > > > methods can share a lot of code, so in reality both
> the
> > > old
> > > > > and
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > put
> > > > > > > > > > > > will be thin shims over a `putRecord` method (or
> > > `process`
> > > > > > method
> > > > > > > > as
> > > > > > > > > > you
> > > > > > > > > > > > call it in the KIP).
> > > > > > > > > > > >
> > > > > > > > > > > > Given the above, there's no requirement to
> > conditionally
> > > > call
> > > > > > one
> > > > > > > > > > method
> > > > > > > > > > > or
> > > > > > > > > > > > the other in the framework based on configuration.
> Once
> > > you
> > > > > > > > implement
> > > > > > > > > > the
> > > > > > > > > > > > new `put` with something other than its default
> > > > > implementation,
> > > > > > > as
> > > > > > > > a
> > > > > > > > > > > > connector developer, you'll know to adapt to the
> above.
> > > > > > > > > > > >
> > > > > > > > > > > > I definitely suggest extending our docs in a
> meaningful
> > > way
> > > > > in
> > > > > > > > order
> > > > > > > > > to
> > > > > > > > > > > > make the upgrade path easy to follow. Maybe you'd
> like
> > to
> > > > > add a
> > > > > > > > note
> > > > > > > > > to
> > > > > > > > > > > > your compatibility section in this KIP as well.
> > > > > > > > > > > >
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Konstantine
> > > > > > > > > > > >
> > > > > > > > > > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah <
> > > > > > as...@confluent.io
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > +1
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine
> > Karantasis
> > > <
> > > > > > > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Arjun,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I think I agree with you that subject is
> > interesting.
> > > > > Yet,
> > > > > > I
> > > > > > > > feel
> > > > > > > > > > it
> > > > > > > > > > > > > > belongs to a separate future KIP. Reading the
> > > proposal
> > > > in
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > > > format
> > > > > > > > > > > > > > will help, at least myself, to understand it
> > better.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Having said that, for the purpose of simplifying
> > > error
> > > > > > > handling
> > > > > > > > > for
> > > > > > > > > > > > sink
> > > > > > > > > > > > > > tasks, the discussion on KIP-610 has made some
> good
> > > > > > progress
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > > > mailing
> > > > > > > > > > > > > > list. If the few open items are reflected on the
> > > > > proposal,
> > > > > > > > maybe
> > > > > > > > > > it'd
> > > > > > > > > > > > be
> > > > > > > > > > > > > > even worthwhile to consider it for inclusion in
> the
> > > > > > upcoming
> > > > > > > > > > release
> > > > > > > > > > > > with
> > > > > > > > > > > > > > its current scope.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Konstantine
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish <
> > > > > > > > > > arjun.sat...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'm kinda hoping that we get to an approach on
> > how
> > > to
> > > > > > > extend
> > > > > > > > > the
> > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > framework. Adding parameters in the put method
> is
> > > > nice,
> > > > > > and
> > > > > > > > > maybe
> > > > > > > > > > > > works
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > now, but I'm not sure how scalable it is. It'd
> > > great
> > > > to
> > > > > > be
> > > > > > > > able
> > > > > > > > > > to
> > > > > > > > > > > > add
> > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > functionality in the future. Couple of
> examples:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > - make the metrics registry available to a
> task,
> > so
> > > > > they
> > > > > > > can
> > > > > > > > > > report
> > > > > > > > > > > > > task
> > > > > > > > > > > > > > > level metrics or
> > > > > > > > > > > > > > > - be able to pass in a RestExtension handle to
> > the
> > > > > task,
> > > > > > so
> > > > > > > > the
> > > > > > > > > > > task
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > > provide a rest endpoint which users can hit to
> > get
> > > > some
> > > > > > > task
> > > > > > > > > > level
> > > > > > > > > > > > > > > information (about its status, health, for
> > example)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > In such scenarios, maybe adding new parameters
> to
> > > > > > existing
> > > > > > > > > > methods
> > > > > > > > > > > > may
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > be immediately acceptable.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Since we are very close to a deadline, I wanted
> > to
> > > > > check
> > > > > > if
> > > > > > > > the
> > > > > > > > > > one
> > > > > > > > > > > > > more
> > > > > > > > > > > > > > > possibility is acceptable :-)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > What if we could create a library that could be
> > > used
> > > > by
> > > > > > > > > connector
> > > > > > > > > > > to
> > > > > > > > > > > > > > > independently integrated by connector
> developers
> > in
> > > > > their
> > > > > > > > > > > connectors.
> > > > > > > > > > > > > The
> > > > > > > > > > > > > > > library would be packaged and shipped with
> their
> > > > > > connector
> > > > > > > > like
> > > > > > > > > > any
> > > > > > > > > > > > > other
> > > > > > > > > > > > > > > library on maven (and other similar
> > repositories).
> > > > The
> > > > > > new
> > > > > > > > > module
> > > > > > > > > > > > would
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > in the AK project, but its jars will *not* be
> > added
> > > > to
> > > > > > > > > classpath
> > > > > > > > > > > for
> > > > > > > > > > > > > > > Connect worker.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The library would provide a public interface
> for
> > an
> > > > > error
> > > > > > > > > > reporter,
> > > > > > > > > > > > > which
> > > > > > > > > > > > > > > provides both synchronous and asynchronous
> > > > > > functionalities
> > > > > > > > (as
> > > > > > > > > > was
> > > > > > > > > > > > > > brought
> > > > > > > > > > > > > > > up above).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This would be an independent library, they can
> be
> > > > > easily
> > > > > > > > > bundled
> > > > > > > > > > > and
> > > > > > > > > > > > > > loaded
> > > > > > > > > > > > > > > with the other connectors. The connect
> framework
> > > will
> > > > > be
> > > > > > > > > > decoupled
> > > > > > > > > > > > from
> > > > > > > > > > > > > > > this utility.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I understand that a similar option is in the
> > > rejected
> > > > > > > > > > alternatives,
> > > > > > > > > > > > > > mostly
> > > > > > > > > > > > > > > because of configuration overhead, but the
> > > > > configuration
> > > > > > > > > required
> > > > > > > > > > > > here
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > come directly from the worker properties (and
> > just
> > > be
> > > > > > copy
> > > > > > > > > pasted
> > > > > > > > > > > > from
> > > > > > > > > > > > > > > there, maybe with a prefix). and I wonder (if
> > maybe
> > > > > part
> > > > > > > as a
> > > > > > > > > > > future
> > > > > > > > > > > > > > KIP),
> > > > > > > > > > > > > > > we can evaluate a strategy where certain worker
> > > > configs
> > > > > > can
> > > > > > > > be
> > > > > > > > > > > passed
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > connector (for example, the
> > producer/consume/admin
> > > > > ones),
> > > > > > > so
> > > > > > > > > end
> > > > > > > > > > > > users
> > > > > > > > > > > > > do
> > > > > > > > > > > > > > > not have to.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Overall, we would get clean APIs, contracts and
> > > > > > developers
> > > > > > > > get
> > > > > > > > > > > > freedom
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > use these libraries and functionalities however
> > > they
> > > > > > want.
> > > > > > > > The
> > > > > > > > > > only
> > > > > > > > > > > > > > > drawback is how this is configured (end-users
> > will
> > > > have
> > > > > > to
> > > > > > > > add
> > > > > > > > > > more
> > > > > > > > > > > > > lines
> > > > > > > > > > > > > > > in the json/properties files). But all configs
> > can
> > > > > simply
> > > > > > > > come
> > > > > > > > > > from
> > > > > > > > > > > > > > worker,
> > > > > > > > > > > > > > > I believe this is relatively minor issue. We
> > should
> > > > be
> > > > > > able
> > > > > > > > to
> > > > > > > > > > work
> > > > > > > > > > > > out
> > > > > > > > > > > > > > > compatibility issues in the implementations, so
> > > that
> > > > > the
> > > > > > > > > library
> > > > > > > > > > > can
> > > > > > > > > > > > > > safely
> > > > > > > > > > > > > > > run (and degrade functionality if needed) with
> > old
> > > > > > workers.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:04 PM Aakash Shah <
> > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Just wanted to clarify that I am on board
> with
> > > > adding
> > > > > > the
> > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > put(...) method.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Aakash
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah <
> > > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi Randall and Konstantine,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > As Chris and Arjun mentioned, I think the
> > main
> > > > > > concern
> > > > > > > is
> > > > > > > > > the
> > > > > > > > > > > > > > potential
> > > > > > > > > > > > > > > > > gap in which developers don't implement the
> > > > > > deprecated
> > > > > > > > > method
> > > > > > > > > > > due
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > misunderstanding of use cases. Using the
> > setter
> > > > > > method
> > > > > > > > > > approach
> > > > > > > > > > > > > > ensures
> > > > > > > > > > > > > > > > > that the developer won't break backwards
> > > > > > compatibility
> > > > > > > > when
> > > > > > > > > > > using
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > method due to a mistake. That being said, I
> > > think
> > > > > the
> > > > > > > > value
> > > > > > > > > > > added
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > clarity of contract of when the error
> > reporter
> > > > will
> > > > > > be
> > > > > > > > > > invoked
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > > overall
> > > > > > > > > > > > > > > > > aesthetic while maintaining backwards
> > > > compatibility
> > > > > > > > > outweighs
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > potential
> > > > > > > > > > > > > > > > > mistake of a developer in not implementing
> > the
> > > > > > original
> > > > > > > > > > > put(...)
> > > > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > With respect to synchrony, I agree with
> > > > > Konstantine's
> > > > > > > > > point,
> > > > > > > > > > > that
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > should make it an opt-in feature of making
> > the
> > > > > > reporter
> > > > > > > > > only
> > > > > > > > > > > > > > > synchronous.
> > > > > > > > > > > > > > > > > At the same time, I believe it is important
> > to
> > > > > > relieve
> > > > > > > as
> > > > > > > > > > much
> > > > > > > > > > > of
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > burden of implementation as possible from
> the
> > > > > > developer
> > > > > > > > in
> > > > > > > > > > this
> > > > > > > > > > > > > case,
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > thus I think using a Callback rather than a
> > > > Future
> > > > > > > would
> > > > > > > > be
> > > > > > > > > > > > easier
> > > > > > > > > > > > > on
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > developer, while adding asynchronous
> > > > functionality
> > > > > > with
> > > > > > > > the
> > > > > > > > > > > > ability
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > opt-in synchronous functionality. I also
> > > believe
> > > > > > making
> > > > > > > > it
> > > > > > > > > > > opt-in
> > > > > > > > > > > > > > > > > synchronous vs. the other way simplifies
> > > > > > implementation
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > > > > > > developer
> > > > > > > > > > > > > > > > > (blocking vs creating a new thread).
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Please let me know your thoughts. I would
> > like
> > > to
> > > > > > come
> > > > > > > > to a
> > > > > > > > > > > > > consensus
> > > > > > > > > > > > > > > > soon
> > > > > > > > > > > > > > > > > due to the AK 2.6 deadlines; I will then
> > > shortly
> > > > > > update
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > > start a
> > > > > > > > > > > > > > > > > vote.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Aakash
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 2:24 PM Randall
> > Hauch <
> > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> On Fri, May 15, 2020 at 3:13 PM Arjun
> > Satish <
> > > > > > > > > > > > > > arjun.sat...@gmail.com>
> > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> > Couple of thoughts:
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > 1. If we add new parameters to put(..),
> > and
> > > > new
> > > > > > > > > connectors
> > > > > > > > > > > > > > implement
> > > > > > > > > > > > > > > > >> only
> > > > > > > > > > > > > > > > >> > this method, it makes them backward
> > > > incompatible
> > > > > > > with
> > > > > > > > > > older
> > > > > > > > > > > > > > > workers. I
> > > > > > > > > > > > > > > > >> > think newer connectors may only choose
> to
> > > only
> > > > > > > > implement
> > > > > > > > > > the
> > > > > > > > > > > > > > latest
> > > > > > > > > > > > > > > > >> method,
> > > > > > > > > > > > > > > > >> > and we are passing the compatibility
> > > problems
> > > > > back
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > >> > developers.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> New connectors would have to implement
> both
> > if
> > > > > they
> > > > > > > want
> > > > > > > > > to
> > > > > > > > > > > run
> > > > > > > > > > > > in
> > > > > > > > > > > > > > > older
> > > > > > > > > > > > > > > > >> runtimes.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> > 2. if we deprecate the older put()
> method
> > > and
> > > > > > > > eventually
> > > > > > > > > > > > remove
> > > > > > > > > > > > > > it,
> > > > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > >> > old connectors are forward incompatible.
> > If
> > > we
> > > > > are
> > > > > > > not
> > > > > > > > > > going
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > remove
> > > > > > > > > > > > > > > > >> it,
> > > > > > > > > > > > > > > > >> > then maybe we should not deprecate it?
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> I don't think we'll ever remove deprecated
> > > > methods
> > > > > > --
> > > > > > > > > > there's
> > > > > > > > > > > no
> > > > > > > > > > > > > > > reason
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> cut off older connectors.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> > 3. if a record is realized to be
> erroneous
> > > > > outside
> > > > > > > > put()
> > > > > > > > > > > (say,
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > flush
> > > > > > > > > > > > > > > > >> or
> > > > > > > > > > > > > > > > >> > preCommit), how will it be reported?
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> This is a concern no matter how the
> reporter
> > > is
> > > > > > passed
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > > > task.
> > > > > > > > > > > > > > > > >> Actually, I think it's more clear that the
> > > > > reporter
> > > > > > > > passed
> > > > > > > > > > > > through
> > > > > > > > > > > > > > > > >> `put(...)` should be used to record errors
> > on
> > > > the
> > > > > > > > > > SinkRecords
> > > > > > > > > > > > > passed
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > > > >> same method call.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > I do think the concern over aesthetics
> is
> > an
> > > > > > > important
> > > > > > > > > > one,
> > > > > > > > > > > > but
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > trade-off here is to exclude many
> > connectors
> > > > > that
> > > > > > > are
> > > > > > > > > out
> > > > > > > > > > > > there
> > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > >> > running on worker versions. there may be
> > > > > > production
> > > > > > > > > > > > deployments
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > >> need
> > > > > > > > > > > > > > > > >> > one old and one new connector that now
> > > cannot
> > > > > work
> > > > > > > on
> > > > > > > > > any
> > > > > > > > > > > > > version
> > > > > > > > > > > > > > > of a
> > > > > > > > > > > > > > > > >> > single worker. Building connectors is
> > > complex,
> > > > > and
> > > > > > > > it's
> > > > > > > > > > > kinda
> > > > > > > > > > > > > > unfair
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> > expect folks to make changes over
> > aesthetic
> > > > > > reasons
> > > > > > > > > alone.
> > > > > > > > > > > > This
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > >> probably
> > > > > > > > > > > > > > > > >> > the reason why popular framework APIs
> very
> > > > > rarely
> > > > > > > (and
> > > > > > > > > > > > probably
> > > > > > > > > > > > > > > never)
> > > > > > > > > > > > > > > > >> > change.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> I don't see how passing the reporter
> through
> > > an
> > > > > > > > overloaded
> > > > > > > > > > > > > > `put(...)`
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > >> less backward compatible. Because the
> > runtime
> > > > > > provides
> > > > > > > > the
> > > > > > > > > > > > > SinkTask
> > > > > > > > > > > > > > > base
> > > > > > > > > > > > > > > > >> class, the runtime has control over what
> the
> > > > > methods
> > > > > > > do
> > > > > > > > by
> > > > > > > > > > > > > default.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Overall, yes, the "public void
> > > > > > > > > > > > > > > > >>
> errantRecordReporter(BiConsumer<SinkRecord,
> > > > > > > > > > > > > > > > >> > Throwable> reporter) {}" proposal in the
> > > > > original
> > > > > > > KIP
> > > > > > > > is
> > > > > > > > > > > > > somewhat
> > > > > > > > > > > > > > > of a
> > > > > > > > > > > > > > > > >> > mouthful, but are there are any simpler
> > > > > > alternatives
> > > > > > > > > that
> > > > > > > > > > do
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > > exclude
> > > > > > > > > > > > > > > > >> > existing connectors, adding operational
> > > > burdens
> > > > > > and
> > > > > > > > yet
> > > > > > > > > > > > provide
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > clean
> > > > > > > > > > > > > > > > >> > contract?
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> IMO, overloading `put(...)` is cleaner and
> > > > easier
> > > > > to
> > > > > > > > > > > understand
> > > > > > > > > > > > --
> > > > > > > > > > > > > > > plus
> > > > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > > > >> other benefits in my earlier email.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Best,
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > PS: Apologies if the language is
> incorrect
> > > or
> > > > > some
> > > > > > > > > points
> > > > > > > > > > > are
> > > > > > > > > > > > > > > unclear.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > On Fri, May 15, 2020 at 12:02 PM Randall
> > > > Hauch <
> > > > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > > On Fri, May 15, 2020 at 1:45 PM
> > > Konstantine
> > > > > > > > > Karantasis <
> > > > > > > > > > > > > > > > >> > > konstant...@confluent.io> wrote:
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > > Thanks for the quick response
> Aakash.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > To your last point, modern APIs like
> > > this
> > > > > tend
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > > > > > > asynchronous
> > > > > > > > > > > > > > > > >> (see
> > > > > > > > > > > > > > > > >> > > > admin, producer in Kafka) and such
> > > > > definition
> > > > > > > > > results
> > > > > > > > > > in
> > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > >> > expressive
> > > > > > > > > > > > > > > > >> > > > and well defined APIs.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > +1
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > > What you describe is easily an
> opt-in
> > > > > feature
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > >> > > developer.
> > > > > > > > > > > > > > > > >> > > > At the same time, the latest
> > description
> > > > > > above,
> > > > > > > > > gives
> > > > > > > > > > us
> > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > >> chances
> > > > > > > > > > > > > > > > >> > > for
> > > > > > > > > > > > > > > > >> > > > this API to remain like this for
> > longer,
> > > > > > because
> > > > > > > > it
> > > > > > > > > > > covers
> > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > sync
> > > > > > > > > > > > > > > > >> > > > and async per `put` user cases.
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > +1
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > > Given how simple the sync
> > implementation
> > > > > > > > > > > > > > > > >> > > > is, just by complying with the
> return
> > > type
> > > > > of
> > > > > > > the
> > > > > > > > > > > method,
> > > > > > > > > > > > I
> > > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > >> think
> > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > >> > > > BiFunction definition that returns a
> > > > Future
> > > > > > > makes
> > > > > > > > > > sense.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > Konstantine
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > On Fri, May 15, 2020 at 11:27 AM
> > Aakash
> > > > > Shah <
> > > > > > > > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > > Thanks for the additional
> feedback.
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > I see the benefits of adding an
> > > > overloaded
> > > > > > > > > put(...)
> > > > > > > > > > > over
> > > > > > > > > > > > > > > > >> alternatives
> > > > > > > > > > > > > > > > >> > > > and I
> > > > > > > > > > > > > > > > >> > > > > am on board going forward with
> this
> > > > > > approach.
> > > > > > > It
> > > > > > > > > > will
> > > > > > > > > > > > > > > definitely
> > > > > > > > > > > > > > > > >> set
> > > > > > > > > > > > > > > > >> > > > forth
> > > > > > > > > > > > > > > > >> > > > > a contract of where the reporter
> > will
> > > be
> > > > > > used
> > > > > > > > with
> > > > > > > > > > > > better
> > > > > > > > > > > > > > > > >> aesthetics.
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > The original idea of going with a
> > > > > > synchronous
> > > > > > > > > > approach
> > > > > > > > > > > > for
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> error
> > > > > > > > > > > > > > > > >> > > > > reporter was to ease the connector
> > > > > > developer's
> > > > > > > > job
> > > > > > > > > > > > > > interacting
> > > > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > > >> > > > > handling the error reporter. The
> > > > tradeoff
> > > > > > for
> > > > > > > > > > having a
> > > > > > > > > > > > > > > > >> > synchronous-only
> > > > > > > > > > > > > > > > >> > > > > reporter would be lower throughput
> > on
> > > > the
> > > > > > > > > reporter;
> > > > > > > > > > > this
> > > > > > > > > > > > > was
> > > > > > > > > > > > > > > > >> thought
> > > > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > > > > >> > > > > fine since arguably most
> > circumstances
> > > > > would
> > > > > > > not
> > > > > > > > > > > include
> > > > > > > > > > > > > > > > >> consistently
> > > > > > > > > > > > > > > > >> > > > large
> > > > > > > > > > > > > > > > >> > > > > amounts of records being sent to
> the
> > > > error
> > > > > > > > > reporter.
> > > > > > > > > > > > Even
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > >> was
> > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > >> > > > > case, an argument can be made that
> > the
> > > > > lower
> > > > > > > > > > > throughput
> > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >> of
> > > > > > > > > > > > > > > > >> > > > > assistance in this case, as it
> would
> > > > allow
> > > > > > > more
> > > > > > > > > time
> > > > > > > > > > > for
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > user
> > > > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > > > >> > > > > realize the connector is having
> > > records
> > > > > sent
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > error
> > > > > > > > > > > > > > > > reporter
> > > > > > > > > > > > > > > > >> > > before
> > > > > > > > > > > > > > > > >> > > > > many are sent. However, if we are
> > > > strongly
> > > > > > in
> > > > > > > > > favor
> > > > > > > > > > of
> > > > > > > > > > > > > > having
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > > option
> > > > > > > > > > > > > > > > >> > > > of
> > > > > > > > > > > > > > > > >> > > > > asynchronous functionality
> available
> > > for
> > > > > the
> > > > > > > > > > > developer,
> > > > > > > > > > > > > > then I
> > > > > > > > > > > > > > > > am
> > > > > > > > > > > > > > > > >> > fine
> > > > > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > > > > >> > > > > that as well.
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > Lastly, I am on board with
> changing
> > > the
> > > > > name
> > > > > > > to
> > > > > > > > > > > > > > > > >> failedRecordReporter,
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > Please let me know your thoughts.
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > > > > > >> > > > > Aakash
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > On Fri, May 15, 2020 at 9:10 AM
> > > Randall
> > > > > > Hauch
> > > > > > > <
> > > > > > > > > > > > > > > rha...@gmail.com
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > > Konstantine said:
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > I notice Randall also used
> > > > BiFunction
> > > > > in
> > > > > > > his
> > > > > > > > > > > > example,
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > >> wonder if
> > > > > > > > > > > > > > > > >> > > > it's
> > > > > > > > > > > > > > > > >> > > > > > for
> > > > > > > > > > > > > > > > >> > > > > > > similar reasons.
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Nope. Just a typo on my part.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > There appear to be three
> > outstanding
> > > > > > > > questions.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > First, Konstantine suggested
> > calling
> > > > > this
> > > > > > > > > > > > > > > > >> "failedRecordReporter". I
> > > > > > > > > > > > > > > > >> > > > think
> > > > > > > > > > > > > > > > >> > > > > > this is minor, but using this
> new
> > > name
> > > > > may
> > > > > > > be
> > > > > > > > a
> > > > > > > > > > bit
> > > > > > > > > > > > more
> > > > > > > > > > > > > > > > precise
> > > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > > >> > > > I'd
> > > > > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > > > > >> > > > > > fine with this.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Second, should the reporter
> method
> > > be
> > > > > > > > > > synchronous? I
> > > > > > > > > > > > > think
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> two
> > > > > > > > > > > > > > > > >> > > > > options
> > > > > > > > > > > > > > > > >> > > > > > are:
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > 2a. Use `BiConsumer<SinkRecord,
> > > > > > Throwable>`
> > > > > > > > that
> > > > > > > > > > > > returns
> > > > > > > > > > > > > > > > nothing
> > > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > > >> > > > > blocks
> > > > > > > > > > > > > > > > >> > > > > > (at this time).
> > > > > > > > > > > > > > > > >> > > > > > 2b. Use `BiFunction<SinkRecord,
> > > > > Throwable,
> > > > > > > > > > > > > Future<Void>>`
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > >> > > returns
> > > > > > > > > > > > > > > > >> > > > a
> > > > > > > > > > > > > > > > >> > > > > > future that the user can
> > optionally
> > > > use
> > > > > to
> > > > > > > be
> > > > > > > > > > > > > synchronous.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > I do agree with Konstantine that
> > > > option
> > > > > 2b
> > > > > > > > gives
> > > > > > > > > > us
> > > > > > > > > > > > more
> > > > > > > > > > > > > > > room
> > > > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > > > >> > > > future
> > > > > > > > > > > > > > > > >> > > > > > semantic changes, and since the
> > > > producer
> > > > > > > write
> > > > > > > > > is
> > > > > > > > > > > > > already
> > > > > > > > > > > > > > > > >> > > asynchronous
> > > > > > > > > > > > > > > > >> > > > > this
> > > > > > > > > > > > > > > > >> > > > > > should be straightforward to
> > > > implement.
> > > > > I
> > > > > > > > think
> > > > > > > > > > the
> > > > > > > > > > > > > > concern
> > > > > > > > > > > > > > > > >> here is
> > > > > > > > > > > > > > > > >> > > > that
> > > > > > > > > > > > > > > > >> > > > > if
> > > > > > > > > > > > > > > > >> > > > > > the sink task does not *use* the
> > > > future
> > > > > to
> > > > > > > > make
> > > > > > > > > > this
> > > > > > > > > > > > > > > > >> synchronous,
> > > > > > > > > > > > > > > > >> > it
> > > > > > > > > > > > > > > > >> > > is
> > > > > > > > > > > > > > > > >> > > > > > very possible that the error
> > records
> > > > > could
> > > > > > > be
> > > > > > > > > > > written
> > > > > > > > > > > > > out
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > >> order
> > > > > > > > > > > > > > > > >> > > (due
> > > > > > > > > > > > > > > > >> > > > > to
> > > > > > > > > > > > > > > > >> > > > > > retries). But this won't be an
> > issue
> > > > if
> > > > > > the
> > > > > > > > > > > > > implementation
> > > > > > > > > > > > > > > > uses
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > `max.in.flight.requests.per.connection=1`
> > > > > > > for
> > > > > > > > > > > writing
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > error
> > > > > > > > > > > > > > > > >> > > > records.
> > > > > > > > > > > > > > > > >> > > > > > It's a little less clear, but
> > > honestly
> > > > > IMO
> > > > > > > > > passing
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > reporter
> > > > > > > > > > > > > > > > >> in
> > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > >> > > > > > `put(...)` method helps make
> this
> > > > lambda
> > > > > > > > easier
> > > > > > > > > to
> > > > > > > > > > > > > > > understand,
> > > > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > > > >> > > some
> > > > > > > > > > > > > > > > >> > > > > > strange reason. So unless there
> > are
> > > > good
> > > > > > > > reasons
> > > > > > > > > > to
> > > > > > > > > > > > > avoid
> > > > > > > > > > > > > > > > this,
> > > > > > > > > > > > > > > > >> I'd
> > > > > > > > > > > > > > > > >> > > be
> > > > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > > > >> > > > > > favor of 2b and returning a
> > Future.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Third, how do we pass the
> reporter
> > > > > lambda
> > > > > > /
> > > > > > > > > method
> > > > > > > > > > > > > > reference
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > > > >> > > > task?
> > > > > > > > > > > > > > > > >> > > > > > My proposal to pass the reporter
> > via
> > > > an
> > > > > > > > overload
> > > > > > > > > > > > > > `put(...)`
> > > > > > > > > > > > > > > > >> still
> > > > > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > > > > >> > > > > > most attractive to me, for
> several
> > > > > > reasons:
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > 3a. There's no need to pass the
> > > > reporter
> > > > > > > > > > separately
> > > > > > > > > > > > > *and*
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> > describe
> > > > > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > > > > >> > > > > > changes in method call ordering.
> > > > > > > > > > > > > > > > >> > > > > > 3b. As mentioned above, for some
> > > > reason
> > > > > > > > passing
> > > > > > > > > it
> > > > > > > > > > > via
> > > > > > > > > > > > > > > > >> `put(...)`
> > > > > > > > > > > > > > > > >> > > makes
> > > > > > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > > > > > >> > > > > > intent more clear that it be
> used
> > > when
> > > > > > > > > processing
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> SinkRecord,
> > > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > > >> > > > > that
> > > > > > > > > > > > > > > > >> > > > > > it shouldn't be used in
> > > `start(...)`,
> > > > > > > > > > > > `preCommit(...)`,
> > > > > > > > > > > > > > > > >> > > > > > `onPartitionsAssigned(...)`, or
> > any
> > > of
> > > > > the
> > > > > > > > other
> > > > > > > > > > > task
> > > > > > > > > > > > > > > methods.
> > > > > > > > > > > > > > > > >> As
> > > > > > > > > > > > > > > > >> > > > Andrew
> > > > > > > > > > > > > > > > >> > > > > > pointed out earlier,
> *describing*
> > > this
> > > > > in
> > > > > > > the
> > > > > > > > > KIP
> > > > > > > > > > > and
> > > > > > > > > > > > in
> > > > > > > > > > > > > > > > JavaDoc
> > > > > > > > > > > > > > > > >> > will
> > > > > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > > > > >> > > > > > tough to be exact yet succinct.
> > > > > > > > > > > > > > > > >> > > > > > 3c. There is already precedence
> > for
> > > > > > evolving
> > > > > > > > > > > > > > > > >> > > > > > `SourceTask.commitRecord(...)`,
> > and
> > > > the
> > > > > > > > pattern
> > > > > > > > > is
> > > > > > > > > > > > > > > identical.
> > > > > > > > > > > > > > > > >> > > > > > 3d. Backward compatibility is
> easy
> > > to
> > > > > > > > > understand,
> > > > > > > > > > > and
> > > > > > > > > > > > at
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> same
> > > > > > > > > > > > > > > > >> > > time
> > > > > > > > > > > > > > > > >> > > > > it's
> > > > > > > > > > > > > > > > >> > > > > > pretty easy to describe what
> > > > > > implementations
> > > > > > > > > that
> > > > > > > > > > > want
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > take
> > > > > > > > > > > > > > > > >> > > > advantage
> > > > > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > > > > >> > > > > > this feature should do.
> > > > > > > > > > > > > > > > >> > > > > > 3e. Minimal changes to the
> > > interface:
> > > > > > we're
> > > > > > > > just
> > > > > > > > > > > > > *adding*
> > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > >> > default
> > > > > > > > > > > > > > > > >> > > > > > method that calls the existing
> > > method
> > > > > and
> > > > > > > > > > > deprecating
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> existing
> > > > > > > > > > > > > > > > >> > > > > > `put(...)`.
> > > > > > > > > > > > > > > > >> > > > > > 3f. Deprecating the existing
> > > > `put(...)`
> > > > > > > makes
> > > > > > > > it
> > > > > > > > > > > more
> > > > > > > > > > > > > > clear
> > > > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > > > >> > > > > > programmatic sense that new sink
> > > > > > > > implementations
> > > > > > > > > > > > should
> > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > > > reporter,
> > > > > > > > > > > > > > > > >> > > > > > and that we recommend old sinks
> > > evolve
> > > > > to
> > > > > > > use
> > > > > > > > > it.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Some of these benefits apply to
> > some
> > > > of
> > > > > > the
> > > > > > > > > other
> > > > > > > > > > > > > > > suggestions,
> > > > > > > > > > > > > > > > >> but
> > > > > > > > > > > > > > > > >> > I
> > > > > > > > > > > > > > > > >> > > > > think
> > > > > > > > > > > > > > > > >> > > > > > none of the other suggestions
> have
> > > all
> > > > > of
> > > > > > > > these
> > > > > > > > > > > > > benefits.
> > > > > > > > > > > > > > > For
> > > > > > > > > > > > > > > > >> > > example,
> > > > > > > > > > > > > > > > >> > > > > > overloading `initialize(...)` is
> > > more
> > > > > > > > difficult
> > > > > > > > > > > since
> > > > > > > > > > > > > most
> > > > > > > > > > > > > > > > sink
> > > > > > > > > > > > > > > > >> > > > > connectors
> > > > > > > > > > > > > > > > >> > > > > > don't override it and therefore
> > > would
> > > > be
> > > > > > > less
> > > > > > > > > > > subject
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> > deprecations
> > > > > > > > > > > > > > > > >> > > > > > warnings. Overloading
> `start(...)`
> > > is
> > > > > less
> > > > > > > > > > > attractive.
> > > > > > > > > > > > > > > Adding
> > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > >> > > method
> > > > > > > > > > > > > > > > >> > > > > IMO
> > > > > > > > > > > > > > > > >> > > > > > shares the fewest of these
> > benefits.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > The one disadvantage of this
> > > approach
> > > > is
> > > > > > > that
> > > > > > > > > sink
> > > > > > > > > > > > task
> > > > > > > > > > > > > > > > >> > > implementations
> > > > > > > > > > > > > > > > >> > > > > > can't rely upon the reporter
> upon
> > > > > startup.
> > > > > > > IMO
> > > > > > > > > > > that's
> > > > > > > > > > > > an
> > > > > > > > > > > > > > > > >> acceptable
> > > > > > > > > > > > > > > > >> > > > > > tradeoff to get the cleaner and
> > more
> > > > > > > explicit
> > > > > > > > > API,
> > > > > > > > > > > > > > > especially
> > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > >> > > > API
> > > > > > > > > > > > > > > > >> > > > > > contract is that Connect will
> pass
> > > the
> > > > > > same
> > > > > > > > > > reporter
> > > > > > > > > > > > > > > instance
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> > each
> > > > > > > > > > > > > > > > >> > > > > call
> > > > > > > > > > > > > > > > >> > > > > > to `put(...)` on a single task
> > > > instance.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Best regards,
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Randall
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > On Fri, May 15, 2020 at 6:59 AM
> > > Andrew
> > > > > > > > > Schofield <
> > > > > > > > > > > > > > > > >> > > > > > andrew_schofi...@live.com>
> > > > > > > > > > > > > > > > >> > > > > > wrote:
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > Hi,
> > > > > > > > > > > > > > > > >> > > > > > > Randall's suggestion is really
> > > > good. I
> > > > > > > think
> > > > > > > > > it
> > > > > > > > > > > > gives
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > > flexibility
> > > > > > > > > > > > > > > > >> > > > > > > required and also
> > > > > > > > > > > > > > > > >> > > > > > > keeps the interface the right
> > way
> > > > > round.
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > Thanks,
> > > > > > > > > > > > > > > > >> > > > > > > Andrew Schofield
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > On 15/05/2020, 02:07, "Aakash
> > > > Shah" <
> > > > > > > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > > Hi Randall,
> > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback.
> > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > > 1. This is a great
> suggestion,
> > > > but I
> > > > > > > find
> > > > > > > > > that
> > > > > > > > > > > > > adding
> > > > > > > >
> > >
> >
>

Reply via email to