Thanks, Arjun! This has been very helpful.

Looking in your POC and thinking in terms of the current KIP, it sounds
like the suggestion is to keep the same method signature for reporting
errors, but to change from the `BiFunction<SinkRecord, Throwable,
Future<Void>` to a new `ErrantRecordReporter` interface. More concretely,
I'd suggest the KIP propose adding the following new interface:

/**
 * Component that the sink task can use as it {@link SinkTask#put(/**
 * Reporter of problematic records and the corresponding problems.
 *
 * @since 2.9
 */
public interface ErrantRecordReporter {

    /**
     * Report a problematic record and the corresponding error to be
written to the sink
     * connector's dead letter queue (DLQ).
     *
     * <p>This call is asynchronous and returns a {@link
java.util.concurrent.Future Future} for the
     * {@link RecordMetadata} that will be assigned to the record in the
DLQ topic. Invoking
     * {@link java.util.concurrent.Future#get() get()} on this future will
block until the
     * record has been written and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * If you want to simulate a simple blocking call you can call the
<code>get()</code> method
     * immediately.
     *
     * @param record the problematic record; may not be null
     * @param error  the error capturing the problem with the record; may
not be null
     * @return a future that can be used to block until the record and
error are written
     *         to the DLQ
     * @since 2.9
     */
    default Future<RecordMetadata> report(SinkRecord record, Throwable
error) {
        return report(record, error, null);
    }

    /**
     * Report a problematic record and the corresponding error to be
written to the sink
     * connector's dead letter queue (DLQ).
     *
     * <p>This call is asynchronous and returns a {@link
java.util.concurrent.Future Future} for the
     * {@link RecordMetadata} that will be assigned to the record in the
DLQ topic. Invoking
     * {@link java.util.concurrent.Future#get() get()} on this future will
block until the
     * record has been written and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * If you want to simulate a simple blocking call you can call the
<code>get()</code> method
     * immediately.
     *
     * <p>Fully non-blocking usage can make use of the {@link Callback}
parameter to provide a
     * callback that will be invoked when the request is complete.
Callbacks for records being
     * sent to the same partition are guaranteed to execute in order.
     *
     * @param record   the problematic record; may not be null
     * @param error    the error capturing the problem with the record; may
not be null
     * @param callback A user-supplied callback to execute when the record
has been acknowledged
     *                 by the server; may be null for no callback
     * @return a future that can be used to block until the record and
error are written
     *         to the DLQ
     * @since 2.9
     */
    Future<RecordMetadata> report(SinkRecord record, Throwable error,
Callback callback);
}

and then modify the proposed changed to the SinkTaskContext to be:

public interface SinkTaskContext {
    ...
    /**
     * Get the reporter to which the sink task can report problematic
{@link SinkRecord} objects
     * passed to the {@link SinkTask#put(Collection)} method.
     *
     * <p>This method was added in Apache Kafka 2.9. Sink tasks that use
this method and want to
     * maintain backward compatibility so they can also be installed in
older Connect runtimes
     * should guard its use with a try-catch block, since calling this
method will result in a
     * {@link NoSuchClassError} or {@link NoSuchMethodError} when the sink
connector is
     * used in Connect runtimes older than Kafka 2.9. For example:
     * <pre>
     *     ErrantRecordReporter reporter;
     *     try {
     *         reporter = context.failedRecordReporter();
     *     } catch (NoSuchClassError | NoSuchMethodError e) {
     *         reporter = null;
     *     }
     * </pre>
     *
     * @return the reporter function; null if no error reporter has been
configured for the connector
     * @since 2.9
     */
    ErrantRecordReporter failedRecordReporter();
}

The example usage then becomes:

private ErrantRecordReporter reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReporter(); // may be null if DLQ not
enabled
  } catch (NoSuchClassError | NoSuchMethodError e) {
    // Will occur in Connect runtimes earlier than 2.9
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        reporter.accept(record, e, (metadata, error) -> {
            // do something
        ));
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

My suggestion for the `report(...)` method to return a `Future` as
discussed above to enable synchronous usage but to change the parameterized
type of the future to be `Future<RecordMetadata>` to mirror the
`Producer.send(...)` methods. This gives the sink task the ability to use
the record metadata information, and the implementation is much more
straightforward since the future is identical to the Producer.send(...)`. I
also suggest we overload the `report(...)` method to take a `Callback`,
again just like the `Producer.send(...)` methods, to enable non-blocking
usage.

In light of Arjun's prototype, I think the KIP should also propose updating
the FileSinkTask example to use the new API, with proper error handling
that would allow the newer file sink connector to be deployed in older
Connect runtimes.

Finally, the KIP's wording about backward compatibility should be updated
to something more like:

"This proposal is backward compatible such that existing sink connector
implementations will continue to work as before. Developers can optionally
modify sink connector implementations to use the new error reporting
feature, yet still easily support installing and running those connectors
in older Connect runtimes where the feature does not exist."

WDYT?

Best regards,

Randall

On Sat, May 16, 2020 at 11:43 PM Chris Egerton <chr...@confluent.io> wrote:

> Hi Arjun,
>
> This is great news. Given that we're already willing to ask developers to
> catch a method-not-found exception and it sounds like a new interface can
> be handled in a similar try/catch block in a same place, I like the idea of
> a new fleshed-out interface instead of a BiConsumer or a BiFunction.
>
> Cheers,
>
> Chris
>
> On Sat, May 16, 2020, 21:29 Arjun Satish <arjun.sat...@gmail.com> wrote:
>
> > ok folks, this is my POC PR:
> > https://github.com/wicknicks/kafka/tree/kip-610-2.4.1. connectors built
> > from this were copied into a fresh installation of Kafka Connect (v2.5,
> the
> > latest version), and ran. Without proper try-catch, the tasks would fail.
> > But when the appropriate exceptions were handled, the task proceeded
> > without any issues. I've added some comments here
> >
> >
> https://github.com/apache/kafka/commit/295e6a46925993060a60b79e646e06432480ed0d
> > that document these errors. Tested this with both java 8 and 11.
> >
> > To check if it is safe to add methods and new classes, I dug around the
> JVM
> > spec <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html>.
> The
> > following points are being made here:
> >
> > 1. During the loading phase, a number of structural details of the class
> > are added to the run-time constant pool (class descriptor, fields,
> > methods). what these methods themselves do are not considered.
> > 2. During the linking phase, resolution of symbolic references is
> optional.
> > But the important bit is this
> > <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4
> >:
> > Whichever strategy (lazy or eager) is followed, any error detected during
> > resolution must be thrown at a point in the program that (directly or
> > indirectly) uses a symbolic reference to the class or interface.
> >
> > So this means that any new classes that were originally relied upon the
> > connector that are missing now, will be in some internal "not found" say
> > but no errors will be thrown till resolution stage.
> >
> > 3. Section 5.4.3
> > <
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3
> > >
> > says execution of any of the (anewarray, checkcast etc) instructions
> > requires resolution of its symbolic reference, which is good since the
> > instructions are executed in order, and the connector developer has time
> to
> > execute some code before program has to look for a unknown object.
> >
> > We are also safe in that the new interface will be used only during or
> > after start() is called, so there should not be any fields that do
> > something like:
> >
> > private final ErrantRecordReporter reporter =
> > sinkTaskContext.failedRecordReporter();
> >
> > The initialization will only be valid in start(), where we can safely
> wrap
> > around try catch issues. The errors being thrown are very precise.
> > NoSuchMethodError
> > <
> >
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3.3
> > >
> > for method not found, and NoClassDefFoundError for any classes loaded
> > during resolution
> > <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.3
> >.
> >
> > BTW, one more example that comes to mind is the JDBC spec (the java.sql
> > package has been changing with every major version of Java). Newer
> classes
> > are added (look at java.sql.DriverAction (in Java 8) and
> > java.sql.ShardingKey (in java 9)). New methods to existing classes are
> > being added as well (for example,
> > java.sql.DriverManager#registerDriver(java.sql.Driver,
> > java.sql.DriverAction)
> > <
> >
> https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#registerDriver-java.sql.Driver-java.sql.DriverAction-
> > >).
> > and the expectation is that the driver should handle any runtime
> > inconsistencies.
> >
> > Overall, I think it should be safe to have new interfaces, and have them
> > loaded safely, with correct checks.
> >
> > BTW, one more opportunity we have is that the connector can check if
> these
> > new methods are present or not. For example:
> >
> >
> >
> Class.forName("org.apache.kafka.connect.sink.SinkTaskContext").getMethod("failedRecordReporter");
> >
> > And based on this, chose to return old a different task in
> > SinkConnector#taskClass() if they want to hyper-safe.
> >
> > Apologies for throwing in such a wrench at the last minute! But IMHO it'd
> > be good to take this opportunity if we can.
> >
> > Best,
> >
> > On Sat, May 16, 2020 at 6:05 PM Randall Hauch <rha...@gmail.com> wrote:
> >
> > > Thanks for updating the KIP, Aakash. A few comments on the updated
> > content
> > > there:
> > >
> > > In order to avoid error records being written out of order (for
> example,
> > > > due to retries), the developer can use
> > > > `max.in.flight.requests.per.connection=1` in their implementation for
> > > > writing error records.
> > > >
> > >
> > > IMO, the DLQ should always use this setting to prevent retries, and the
> > > developer can always set this property for the DLQ to something larger
> > than
> > > 1 if order is not important and they need the extreme performance.
> > >
> > > Along with the original errant sink record, the exception thrown will
> be
> > > > sent to the error reporter to provide additional context.
> > > >
> > >
> > > This is also not really clear. What thrown exception is this referring
> > to?
> > > Isn't the exception *passed* to the reporter method?
> > >
> > > I do think we need to better explain from the sink task's perspective
> > what
> > > behavior it should expect? Is there any case when the reporter will be
> > null
> > > or be a no-op, and if so what should the sink task do? Should it simply
> > > wrap and throw a ConnectException? And if there is a reporter, won't
> > > Connect treat this sink record as "processed" with respect to the
> > existing
> > > behavior of passing "processed" offsets back to the sink task's
> > > `preCommit(...)` method.
> > >
> > > Thanks,
> > >
> > > Randall
> > >
> > > On Sat, May 16, 2020 at 6:38 PM Arjun Satish <arjun.sat...@gmail.com>
> > > wrote:
> > >
> > > > Yeah I had tried this locally on java 8 and 11, and it had seemed to
> > > work.
> > > > Let me clean up and publish my code in a branch somewhere so we can
> > take
> > > a
> > > > look at it.
> > > >
> > > > Thanks,
> > > >
> > > > On Sat, May 16, 2020 at 3:39 PM Randall Hauch <rha...@gmail.com>
> > wrote:
> > > >
> > > > > Have you tried this? IIUC the problem is with the new type, and any
> > > class
> > > > > that uses ‘ErrantRecordReporter’ with an import would fail to be
> > loaded
> > > > by
> > > > > the classloader if the type does not exist (I.e., pre-2.9 Connect
> > > > > runtimes). Catching that ClassNotFoundException and dynamically
> > > importing
> > > > > the type is actually much harder.
> > > > >
> > > > > On Sat, May 16, 2020 at 5:07 PM Aakash Shah <as...@confluent.io>
> > > wrote:
> > > > >
> > > > > > Hi Arjun,
> > > > > >
> > > > > > Thanks for this suggestion. I actually like this a lot because a
> > > > defined
> > > > > > interface looks more appealing and is clearer in its intention.
> > Since
> > > > we
> > > > > > are still using NoSuchMethodException to account for backwards
> > > > > > compatibility, this works for me. I can't see any drawbacks
> besides
> > > > > having
> > > > > > to call the getter method for the processing of every errant
> > record.
> > > > > >
> > > > > > I would like to hear others' thoughts on if this drawback
> outweighs
> > > the
> > > > > > added benefit of clarity.
> > > > > >
> > > > > > Thanks,
> > > > > > Aakash
> > > > > >
> > > > > > On Sat, May 16, 2020 at 2:54 PM Arjun Satish <
> > arjun.sat...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Konstantine, happy to write something up in a KIP. But I
> > > think
> > > > > it
> > > > > > > would be redundant if we add this kip. What do you think?
> > > > > > >
> > > > > > > Also, Randall, yes that API would work. But, if we expect the
> > > > > developers
> > > > > > to
> > > > > > > catch NoSuchMethodErrors, then should we also go ahead and
> make a
> > > > class
> > > > > > > that would have a report method(similar to ErrorReporter
> > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
> > > > > > > >
> > > > > > > but maybe with different arguments on the report method)? they
> > > would
> > > > > also
> > > > > > > have to catch NoClassDefFoundError.
> > > > > > >
> > > > > > > That would modify your change in SinkTaskContext to:
> > > > > > >
> > > > > > > public interface SinkTaskContext {
> > > > > > >     /**
> > > > > > >      * Get the reporter to which the sink task can report
> > > problematic
> > > > > or
> > > > > > >      * failed {@link SinkRecord} passed to the {@link
> > > > > > > SinkTask#put(Collection)} method.
> > > > > > >      *
> > > > > > >      * @return a errant record reporter
> > > > > > >      */
> > > > > > >     ErrantRecordReporter failedRecordReporter();
> > > > > > > }
> > > > > > >
> > > > > > > where ErrantRecordReporter is:
> > > > > > >
> > > > > > > public interface ErrantRecordReporter {
> > > > > > >
> > > > > > >     /**
> > > > > > >      * Serialize and produce records to the error topic
> > > > > > >      *
> > > > > > >      * @param record the errant record
> > > > > > >      */
> > > > > > >     void report(SinkRecord record, Throwable error);
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > > Usage in put would be the same though if the class is not
> > > explicitly
> > > > > > named:
> > > > > > >
> > > > > > >         try {
> > > > > > >             context.failedRecordReporter().report(record,
> error);
> > > > > > >         } catch (NoSuchMethodError e) {
> > > > > > >             log.info("Boooooooooo!");
> > > > > > >         }
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > On Sat, May 16, 2020 at 12:46 PM Konstantine Karantasis <
> > > > > > > konstant...@confluent.io> wrote:
> > > > > > >
> > > > > > > > Thanks for following up Randall.
> > > > > > > >
> > > > > > > > I agree with your latest suggestion. It was good that we
> > explored
> > > > > > several
> > > > > > > > options but accessing the context to obtain the reporter in
> > Kafka
> > > > > > Connect
> > > > > > > > versions that support this feature makes the most sense. The
> > > burden
> > > > > for
> > > > > > > > connector developers that want to use this reporter _and_
> make
> > > > > > connectors
> > > > > > > > compatible with old and new workers is minimal.
> > > > > > > >
> > > > > > > > We'll have to leave with additions like this and the
> appearance
> > > in
> > > > > both
> > > > > > > > KIP-131 and here in KIP-610 indeed creates a reasonable
> > > precedent.
> > > > > > > >
> > > > > > > > Konstantine
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch <
> > rha...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks again for the active discussion!
> > > > > > > > >
> > > > > > > > > Regarding the future-vs-callback discussion: I did like
> where
> > > > Chris
> > > > > > was
> > > > > > > > > going with the Callback, but he raises good point that it's
> > > > unclear
> > > > > > > what
> > > > > > > > to
> > > > > > > > > use for the reporter type, since we'd need three
> parameters.
> > > > > > > Introducing
> > > > > > > > a
> > > > > > > > > new interface makes it much harder for a sink task to be
> > > backward
> > > > > > > > > compatible, so sticking with BiFunction is a good
> compromise.
> > > > Plus,
> > > > > > > > another
> > > > > > > > > significant disadvantage of a callback approach is that a
> > sink
> > > > > task's
> > > > > > > > > callback is called from the producer thread, and this
> risks a
> > > > > > > > > poorly written sink task callback killing the reporter's
> > > producer
> > > > > > > without
> > > > > > > > > necessarily failing the task. Using a future avoids this
> risk
> > > > > > > altogether,
> > > > > > > > > still provides the sink task with the ability to do
> > synchronous
> > > > > > > reporting
> > > > > > > > > using Future, which is a standard and conventional design
> > > > pattern.
> > > > > So
> > > > > > > we
> > > > > > > > do
> > > > > > > > > seem to have converged on using `BiFunction<SinkRecord,
> > > > Throwable,
> > > > > > > > > Future<Void>>` for the reporter type.
> > > > > > > > >
> > > > > > > > > Now, we still seem to not have converted upon how to pass
> the
> > > > > > reporter
> > > > > > > to
> > > > > > > > > the sink task. I agree with Konstantine that the
> deprecation
> > > > > affects
> > > > > > > only
> > > > > > > > > newer versions of Connect, and that a sink task should deal
> > > with
> > > > > both
> > > > > > > put
> > > > > > > > > methods only when it wants to support older runtimes. I
> also
> > > > think
> > > > > > that
> > > > > > > > > this is a viable approach, but I do concede that this
> > evolution
> > > > of
> > > > > > the
> > > > > > > > sink
> > > > > > > > > task API is more complicated than it should be.
> > > > > > > > >
> > > > > > > > > In the interest of quickly coming to consensus on how we
> pass
> > > the
> > > > > > > > reporter
> > > > > > > > > to the sink task, I'd like to go back to Andrew's original
> > > > > > suggestion,
> > > > > > > > > which I think we disregarded too quickly: add a getter on
> the
> > > > > > > > > SinkTaskContext interface. We already have precedent for
> > adding
> > > > > > methods
> > > > > > > > to
> > > > > > > > > one of the context classes with the newly-adopted KIP-131,
> > > which
> > > > > > adds a
> > > > > > > > > getter for the OffsetStorageReader on the (new)
> > > > > > SourceConnectorContext.
> > > > > > > > > That KIP accepts the fact that a source connector wanting
> to
> > > use
> > > > > this
> > > > > > > > > feature while also keeping the ability to be installed into
> > > older
> > > > > > > Connect
> > > > > > > > > runtimes must guard its use of the context's getter method.
> > > > > > > > >
> > > > > > > > > I think we can use the same pattern for this KIP, and add a
> > > > getter
> > > > > to
> > > > > > > the
> > > > > > > > > existing SinkTaskContext that is defined something like:
> > > > > > > > >
> > > > > > > > > public interface SinkTaskContext {
> > > > > > > > >     ...
> > > > > > > > >     /**
> > > > > > > > >      * Get the reporter to which the sink task can report
> > > > > problematic
> > > > > > > or
> > > > > > > > > failed {@link SinkRecord}
> > > > > > > > >      * passed to the {@link SinkTask#put(Collection)}
> method.
> > > > When
> > > > > > > > > reporting a failed record,
> > > > > > > > >      * the sink task will receive a {@link Future} that the
> > > task
> > > > > can
> > > > > > > > > optionally use to wait until
> > > > > > > > >      * the failed record and exception have been written to
> > > Kafka
> > > > > via
> > > > > > > > > Connect's DLQ. Note that
> > > > > > > > >      * the result of this method may be null if this
> > connector
> > > > has
> > > > > > not
> > > > > > > > been
> > > > > > > > > configured with a DLQ.
> > > > > > > > >      *
> > > > > > > > >      * <p>This method was added in Apache Kafka 2.9. Sink
> > tasks
> > > > > that
> > > > > > > use
> > > > > > > > > this method but want to
> > > > > > > > >      * maintain backward compatibility so they can also be
> > > > deployed
> > > > > > to
> > > > > > > > > older Connect runtimes
> > > > > > > > >      * should guard the call to this method with a
> try-catch
> > > > block,
> > > > > > > since
> > > > > > > > > calling this method will result in a
> > > > > > > > >      * {@link NoSuchMethodException} when the sink
> connector
> > is
> > > > > > > deployed
> > > > > > > > to
> > > > > > > > > Connect runtimes
> > > > > > > > >      * older than Kafka 2.9. For example:
> > > > > > > > >      * <pre>
> > > > > > > > >      *     BiFunction&lt;SinkTask, Throwable,
> > > > > Future&lt;Void&gt;&gt;
> > > > > > > > > reporter;
> > > > > > > > >      *     try {
> > > > > > > > >      *         reporter = context.failedRecordReporter();
> > > > > > > > >      *     } catch (NoSuchMethodException e) {
> > > > > > > > >      *         reporter = null;
> > > > > > > > >      *     }
> > > > > > > > >      * </pre>
> > > > > > > > >      *
> > > > > > > > >      * @return the reporter function; null if no error
> > reporter
> > > > has
> > > > > > > been
> > > > > > > > > configured for the connector
> > > > > > > > >      * @since 2.9
> > > > > > > > >      */
> > > > > > > > >     BiFunction<SinkTask, Throwable, Future<Void>>
> > > > > > > failedRecordReporter();
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > The main advantage is that the KIP no longer has to make
> *any
> > > > > other*
> > > > > > > > > changes to the Sink Connector or Task API. The above is
> > really
> > > > the
> > > > > > only
> > > > > > > > > change, and it's merely an addition to the API. No
> > deprecation
> > > > and
> > > > > no
> > > > > > > > > overloading methods. The KIP does need to explain how the
> > > > reporter
> > > > > is
> > > > > > > > > configured and used (which it already does), but IMO the
> KIP
> > > > > doesn't
> > > > > > > need
> > > > > > > > > to describe  when this reporter can/should be used. After
> > all,
> > > > this
> > > > > > > > method
> > > > > > > > > is on the existing SinkTaskContext, so this method is
> really
> > no
> > > > > > > different
> > > > > > > > > than any other existing method. I think my JavaDoc (which
> is
> > > > just a
> > > > > > > > > suggestion for a starting point that Aakash can improve as
> > > > needed)
> > > > > > > > > describes how easy it is for a sink to maintain backward
> > > > > > compatibility.
> > > > > > > > > (The use of `BiFunction` helps tremendously.) Another not
> > > > > > insignificant
> > > > > > > > > advantage is that a sink task can use this reporter
> reference
> > > > > > > throughout
> > > > > > > > > the task's lifetime (after it's started and before it is
> > > > stopped),
> > > > > > > making
> > > > > > > > > it less invasive for existing sink task implementations
> that
> > > want
> > > > > to
> > > > > > > use
> > > > > > > > > it.
> > > > > > > > >
> > > > > > > > > I hope we can all get being this compromise, which IMO is
> > > > actually
> > > > > > > super
> > > > > > > > > clean and makes a lot of sense. Thanks, Andrew, for
> > originally
> > > > > > > suggesting
> > > > > > > > > it. I know we're all trying to improve the Connect API in a
> > way
> > > > > that
> > > > > > > > makes
> > > > > > > > > sense, and deliberate and constructive discussion is a
> > healthy
> > > > > thing.
> > > > > > > > > Thanks again to everyone for participating!
> > > > > > > > >
> > > > > > > > > BTW, we've agreed upon a number of other changes, but I
> don't
> > > see
> > > > > any
> > > > > > > of
> > > > > > > > > those changes on the KIP. Aakash, can you please update the
> > KIP
> > > > > > quickly
> > > > > > > > so
> > > > > > > > > we can make sure the other parts are the KIP are
> acceptable?
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > >
> > > > > > > > > Randall
> > > > > > > > >
> > > > > > > > > On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis <
> > > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for the quick response Aakash.
> > > > > > > > > >
> > > > > > > > > > With respect to deprecation, this refers to deprecating
> > this
> > > > > method
> > > > > > > in
> > > > > > > > > > newer versions of Kafka Connect (and eventually removing
> > it).
> > > > > > > > > >
> > > > > > > > > > As a connector developer, if you want your connector to
> run
> > > > > across
> > > > > > a
> > > > > > > > wide
> > > > > > > > > > spectrum of Connect versions, you'll have to take this
> into
> > > > > > > > consideration
> > > > > > > > > > and retain both methods in a functional state. The good
> > news
> > > is
> > > > > > that
> > > > > > > > both
> > > > > > > > > > methods can share a lot of code, so in reality both the
> old
> > > and
> > > > > the
> > > > > > > new
> > > > > > > > > put
> > > > > > > > > > will be thin shims over a `putRecord` method (or
> `process`
> > > > method
> > > > > > as
> > > > > > > > you
> > > > > > > > > > call it in the KIP).
> > > > > > > > > >
> > > > > > > > > > Given the above, there's no requirement to conditionally
> > call
> > > > one
> > > > > > > > method
> > > > > > > > > or
> > > > > > > > > > the other in the framework based on configuration. Once
> you
> > > > > > implement
> > > > > > > > the
> > > > > > > > > > new `put` with something other than its default
> > > implementation,
> > > > > as
> > > > > > a
> > > > > > > > > > connector developer, you'll know to adapt to the above.
> > > > > > > > > >
> > > > > > > > > > I definitely suggest extending our docs in a meaningful
> way
> > > in
> > > > > > order
> > > > > > > to
> > > > > > > > > > make the upgrade path easy to follow. Maybe you'd like to
> > > add a
> > > > > > note
> > > > > > > to
> > > > > > > > > > your compatibility section in this KIP as well.
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Konstantine
> > > > > > > > > >
> > > > > > > > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah <
> > > > as...@confluent.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > +1
> > > > > > > > > > >
> > > > > > > > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis
> <
> > > > > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Arjun,
> > > > > > > > > > > >
> > > > > > > > > > > > I think I agree with you that subject is interesting.
> > > Yet,
> > > > I
> > > > > > feel
> > > > > > > > it
> > > > > > > > > > > > belongs to a separate future KIP. Reading the
> proposal
> > in
> > > > the
> > > > > > KIP
> > > > > > > > > > format
> > > > > > > > > > > > will help, at least myself, to understand it better.
> > > > > > > > > > > >
> > > > > > > > > > > > Having said that, for the purpose of simplifying
> error
> > > > > handling
> > > > > > > for
> > > > > > > > > > sink
> > > > > > > > > > > > tasks, the discussion on KIP-610 has made some good
> > > > progress
> > > > > on
> > > > > > > the
> > > > > > > > > > > mailing
> > > > > > > > > > > > list. If the few open items are reflected on the
> > > proposal,
> > > > > > maybe
> > > > > > > > it'd
> > > > > > > > > > be
> > > > > > > > > > > > even worthwhile to consider it for inclusion in the
> > > > upcoming
> > > > > > > > release
> > > > > > > > > > with
> > > > > > > > > > > > its current scope.
> > > > > > > > > > > >
> > > > > > > > > > > > Konstantine
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish <
> > > > > > > > arjun.sat...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > I'm kinda hoping that we get to an approach on how
> to
> > > > > extend
> > > > > > > the
> > > > > > > > > > > Connect
> > > > > > > > > > > > > framework. Adding parameters in the put method is
> > nice,
> > > > and
> > > > > > > maybe
> > > > > > > > > > works
> > > > > > > > > > > > for
> > > > > > > > > > > > > now, but I'm not sure how scalable it is. It'd
> great
> > to
> > > > be
> > > > > > able
> > > > > > > > to
> > > > > > > > > > add
> > > > > > > > > > > > more
> > > > > > > > > > > > > functionality in the future. Couple of examples:
> > > > > > > > > > > > >
> > > > > > > > > > > > > - make the metrics registry available to a task, so
> > > they
> > > > > can
> > > > > > > > report
> > > > > > > > > > > task
> > > > > > > > > > > > > level metrics or
> > > > > > > > > > > > > - be able to pass in a RestExtension handle to the
> > > task,
> > > > so
> > > > > > the
> > > > > > > > > task
> > > > > > > > > > > can
> > > > > > > > > > > > > provide a rest endpoint which users can hit to get
> > some
> > > > > task
> > > > > > > > level
> > > > > > > > > > > > > information (about its status, health, for example)
> > > > > > > > > > > > >
> > > > > > > > > > > > > In such scenarios, maybe adding new parameters to
> > > > existing
> > > > > > > > methods
> > > > > > > > > > may
> > > > > > > > > > > > not
> > > > > > > > > > > > > be immediately acceptable.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Since we are very close to a deadline, I wanted to
> > > check
> > > > if
> > > > > > the
> > > > > > > > one
> > > > > > > > > > > more
> > > > > > > > > > > > > possibility is acceptable :-)
> > > > > > > > > > > > >
> > > > > > > > > > > > > What if we could create a library that could be
> used
> > by
> > > > > > > connector
> > > > > > > > > to
> > > > > > > > > > > > > independently integrated by connector developers in
> > > their
> > > > > > > > > connectors.
> > > > > > > > > > > The
> > > > > > > > > > > > > library would be packaged and shipped with their
> > > > connector
> > > > > > like
> > > > > > > > any
> > > > > > > > > > > other
> > > > > > > > > > > > > library on maven (and other similar repositories).
> > The
> > > > new
> > > > > > > module
> > > > > > > > > > would
> > > > > > > > > > > > be
> > > > > > > > > > > > > in the AK project, but its jars will *not* be added
> > to
> > > > > > > classpath
> > > > > > > > > for
> > > > > > > > > > > > > Connect worker.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The library would provide a public interface for an
> > > error
> > > > > > > > reporter,
> > > > > > > > > > > which
> > > > > > > > > > > > > provides both synchronous and asynchronous
> > > > functionalities
> > > > > > (as
> > > > > > > > was
> > > > > > > > > > > > brought
> > > > > > > > > > > > > up above).
> > > > > > > > > > > > >
> > > > > > > > > > > > > This would be an independent library, they can be
> > > easily
> > > > > > > bundled
> > > > > > > > > and
> > > > > > > > > > > > loaded
> > > > > > > > > > > > > with the other connectors. The connect framework
> will
> > > be
> > > > > > > > decoupled
> > > > > > > > > > from
> > > > > > > > > > > > > this utility.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I understand that a similar option is in the
> rejected
> > > > > > > > alternatives,
> > > > > > > > > > > > mostly
> > > > > > > > > > > > > because of configuration overhead, but the
> > > configuration
> > > > > > > required
> > > > > > > > > > here
> > > > > > > > > > > > can
> > > > > > > > > > > > > come directly from the worker properties (and just
> be
> > > > copy
> > > > > > > pasted
> > > > > > > > > > from
> > > > > > > > > > > > > there, maybe with a prefix). and I wonder (if maybe
> > > part
> > > > > as a
> > > > > > > > > future
> > > > > > > > > > > > KIP),
> > > > > > > > > > > > > we can evaluate a strategy where certain worker
> > configs
> > > > can
> > > > > > be
> > > > > > > > > passed
> > > > > > > > > > > to
> > > > > > > > > > > > a
> > > > > > > > > > > > > connector (for example, the producer/consume/admin
> > > ones),
> > > > > so
> > > > > > > end
> > > > > > > > > > users
> > > > > > > > > > > do
> > > > > > > > > > > > > not have to.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Overall, we would get clean APIs, contracts and
> > > > developers
> > > > > > get
> > > > > > > > > > freedom
> > > > > > > > > > > to
> > > > > > > > > > > > > use these libraries and functionalities however
> they
> > > > want.
> > > > > > The
> > > > > > > > only
> > > > > > > > > > > > > drawback is how this is configured (end-users will
> > have
> > > > to
> > > > > > add
> > > > > > > > more
> > > > > > > > > > > lines
> > > > > > > > > > > > > in the json/properties files). But all configs can
> > > simply
> > > > > > come
> > > > > > > > from
> > > > > > > > > > > > worker,
> > > > > > > > > > > > > I believe this is relatively minor issue. We should
> > be
> > > > able
> > > > > > to
> > > > > > > > work
> > > > > > > > > > out
> > > > > > > > > > > > > compatibility issues in the implementations, so
> that
> > > the
> > > > > > > library
> > > > > > > > > can
> > > > > > > > > > > > safely
> > > > > > > > > > > > > run (and degrade functionality if needed) with old
> > > > workers.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, May 15, 2020 at 7:04 PM Aakash Shah <
> > > > > > > as...@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Just wanted to clarify that I am on board with
> > adding
> > > > the
> > > > > > > > > > overloaded
> > > > > > > > > > > > > > put(...) method.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Aakash
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah <
> > > > > > > > as...@confluent.io>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Randall and Konstantine,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > As Chris and Arjun mentioned, I think the main
> > > > concern
> > > > > is
> > > > > > > the
> > > > > > > > > > > > potential
> > > > > > > > > > > > > > > gap in which developers don't implement the
> > > > deprecated
> > > > > > > method
> > > > > > > > > due
> > > > > > > > > > > to
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > misunderstanding of use cases. Using the setter
> > > > method
> > > > > > > > approach
> > > > > > > > > > > > ensures
> > > > > > > > > > > > > > > that the developer won't break backwards
> > > > compatibility
> > > > > > when
> > > > > > > > > using
> > > > > > > > > > > the
> > > > > > > > > > > > > new
> > > > > > > > > > > > > > > method due to a mistake. That being said, I
> think
> > > the
> > > > > > value
> > > > > > > > > added
> > > > > > > > > > > in
> > > > > > > > > > > > > > > clarity of contract of when the error reporter
> > will
> > > > be
> > > > > > > > invoked
> > > > > > > > > > and
> > > > > > > > > > > > > > overall
> > > > > > > > > > > > > > > aesthetic while maintaining backwards
> > compatibility
> > > > > > > outweighs
> > > > > > > > > the
> > > > > > > > > > > > > > potential
> > > > > > > > > > > > > > > mistake of a developer in not implementing the
> > > > original
> > > > > > > > > put(...)
> > > > > > > > > > > > > method.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > With respect to synchrony, I agree with
> > > Konstantine's
> > > > > > > point,
> > > > > > > > > that
> > > > > > > > > > > we
> > > > > > > > > > > > > > > should make it an opt-in feature of making the
> > > > reporter
> > > > > > > only
> > > > > > > > > > > > > synchronous.
> > > > > > > > > > > > > > > At the same time, I believe it is important to
> > > > relieve
> > > > > as
> > > > > > > > much
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > > > burden of implementation as possible from the
> > > > developer
> > > > > > in
> > > > > > > > this
> > > > > > > > > > > case,
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > thus I think using a Callback rather than a
> > Future
> > > > > would
> > > > > > be
> > > > > > > > > > easier
> > > > > > > > > > > on
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > developer, while adding asynchronous
> > functionality
> > > > with
> > > > > > the
> > > > > > > > > > ability
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > opt-in synchronous functionality. I also
> believe
> > > > making
> > > > > > it
> > > > > > > > > opt-in
> > > > > > > > > > > > > > > synchronous vs. the other way simplifies
> > > > implementation
> > > > > > for
> > > > > > > > the
> > > > > > > > > > > > > developer
> > > > > > > > > > > > > > > (blocking vs creating a new thread).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Please let me know your thoughts. I would like
> to
> > > > come
> > > > > > to a
> > > > > > > > > > > consensus
> > > > > > > > > > > > > > soon
> > > > > > > > > > > > > > > due to the AK 2.6 deadlines; I will then
> shortly
> > > > update
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > and
> > > > > > > > > > > > > > start a
> > > > > > > > > > > > > > > vote.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Aakash
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, May 15, 2020 at 2:24 PM Randall Hauch <
> > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> On Fri, May 15, 2020 at 3:13 PM Arjun Satish <
> > > > > > > > > > > > arjun.sat...@gmail.com>
> > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Couple of thoughts:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > 1. If we add new parameters to put(..), and
> > new
> > > > > > > connectors
> > > > > > > > > > > > implement
> > > > > > > > > > > > > > >> only
> > > > > > > > > > > > > > >> > this method, it makes them backward
> > incompatible
> > > > > with
> > > > > > > > older
> > > > > > > > > > > > > workers. I
> > > > > > > > > > > > > > >> > think newer connectors may only choose to
> only
> > > > > > implement
> > > > > > > > the
> > > > > > > > > > > > latest
> > > > > > > > > > > > > > >> method,
> > > > > > > > > > > > > > >> > and we are passing the compatibility
> problems
> > > back
> > > > > to
> > > > > > > the
> > > > > > > > > > > > connector
> > > > > > > > > > > > > > >> > developers.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> New connectors would have to implement both if
> > > they
> > > > > want
> > > > > > > to
> > > > > > > > > run
> > > > > > > > > > in
> > > > > > > > > > > > > older
> > > > > > > > > > > > > > >> runtimes.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > 2. if we deprecate the older put() method
> and
> > > > > > eventually
> > > > > > > > > > remove
> > > > > > > > > > > > it,
> > > > > > > > > > > > > > then
> > > > > > > > > > > > > > >> > old connectors are forward incompatible. If
> we
> > > are
> > > > > not
> > > > > > > > going
> > > > > > > > > > to
> > > > > > > > > > > > > remove
> > > > > > > > > > > > > > >> it,
> > > > > > > > > > > > > > >> > then maybe we should not deprecate it?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> I don't think we'll ever remove deprecated
> > methods
> > > > --
> > > > > > > > there's
> > > > > > > > > no
> > > > > > > > > > > > > reason
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> cut off older connectors.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > 3. if a record is realized to be erroneous
> > > outside
> > > > > > put()
> > > > > > > > > (say,
> > > > > > > > > > > in
> > > > > > > > > > > > > > flush
> > > > > > > > > > > > > > >> or
> > > > > > > > > > > > > > >> > preCommit), how will it be reported?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> This is a concern no matter how the reporter
> is
> > > > passed
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > task.
> > > > > > > > > > > > > > >> Actually, I think it's more clear that the
> > > reporter
> > > > > > passed
> > > > > > > > > > through
> > > > > > > > > > > > > > >> `put(...)` should be used to record errors on
> > the
> > > > > > > > SinkRecords
> > > > > > > > > > > passed
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> same method call.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > I do think the concern over aesthetics is an
> > > > > important
> > > > > > > > one,
> > > > > > > > > > but
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > trade-off here is to exclude many connectors
> > > that
> > > > > are
> > > > > > > out
> > > > > > > > > > there
> > > > > > > > > > > > from
> > > > > > > > > > > > > > >> > running on worker versions. there may be
> > > > production
> > > > > > > > > > deployments
> > > > > > > > > > > > that
> > > > > > > > > > > > > > >> need
> > > > > > > > > > > > > > >> > one old and one new connector that now
> cannot
> > > work
> > > > > on
> > > > > > > any
> > > > > > > > > > > version
> > > > > > > > > > > > > of a
> > > > > > > > > > > > > > >> > single worker. Building connectors is
> complex,
> > > and
> > > > > > it's
> > > > > > > > > kinda
> > > > > > > > > > > > unfair
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > expect folks to make changes over aesthetic
> > > > reasons
> > > > > > > alone.
> > > > > > > > > > This
> > > > > > > > > > > is
> > > > > > > > > > > > > > >> probably
> > > > > > > > > > > > > > >> > the reason why popular framework APIs very
> > > rarely
> > > > > (and
> > > > > > > > > > probably
> > > > > > > > > > > > > never)
> > > > > > > > > > > > > > >> > change.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> I don't see how passing the reporter through
> an
> > > > > > overloaded
> > > > > > > > > > > > `put(...)`
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > >> less backward compatible. Because the runtime
> > > > provides
> > > > > > the
> > > > > > > > > > > SinkTask
> > > > > > > > > > > > > base
> > > > > > > > > > > > > > >> class, the runtime has control over what the
> > > methods
> > > > > do
> > > > > > by
> > > > > > > > > > > default.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Overall, yes, the "public void
> > > > > > > > > > > > > > >> errantRecordReporter(BiConsumer<SinkRecord,
> > > > > > > > > > > > > > >> > Throwable> reporter) {}" proposal in the
> > > original
> > > > > KIP
> > > > > > is
> > > > > > > > > > > somewhat
> > > > > > > > > > > > > of a
> > > > > > > > > > > > > > >> > mouthful, but are there are any simpler
> > > > alternatives
> > > > > > > that
> > > > > > > > do
> > > > > > > > > > not
> > > > > > > > > > > > > > exclude
> > > > > > > > > > > > > > >> > existing connectors, adding operational
> > burdens
> > > > and
> > > > > > yet
> > > > > > > > > > provide
> > > > > > > > > > > a
> > > > > > > > > > > > > > clean
> > > > > > > > > > > > > > >> > contract?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> IMO, overloading `put(...)` is cleaner and
> > easier
> > > to
> > > > > > > > > understand
> > > > > > > > > > --
> > > > > > > > > > > > > plus
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> other benefits in my earlier email.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Best,
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > PS: Apologies if the language is incorrect
> or
> > > some
> > > > > > > points
> > > > > > > > > are
> > > > > > > > > > > > > unclear.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > On Fri, May 15, 2020 at 12:02 PM Randall
> > Hauch <
> > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > > On Fri, May 15, 2020 at 1:45 PM
> Konstantine
> > > > > > > Karantasis <
> > > > > > > > > > > > > > >> > > konstant...@confluent.io> wrote:
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > > Thanks for the quick response Aakash.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > To your last point, modern APIs like
> this
> > > tend
> > > > > to
> > > > > > be
> > > > > > > > > > > > > asynchronous
> > > > > > > > > > > > > > >> (see
> > > > > > > > > > > > > > >> > > > admin, producer in Kafka) and such
> > > definition
> > > > > > > results
> > > > > > > > in
> > > > > > > > > > > more
> > > > > > > > > > > > > > >> > expressive
> > > > > > > > > > > > > > >> > > > and well defined APIs.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > +1
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > > What you describe is easily an opt-in
> > > feature
> > > > > for
> > > > > > > the
> > > > > > > > > > > > connector
> > > > > > > > > > > > > > >> > > developer.
> > > > > > > > > > > > > > >> > > > At the same time, the latest description
> > > > above,
> > > > > > > gives
> > > > > > > > us
> > > > > > > > > > > > better
> > > > > > > > > > > > > > >> chances
> > > > > > > > > > > > > > >> > > for
> > > > > > > > > > > > > > >> > > > this API to remain like this for longer,
> > > > because
> > > > > > it
> > > > > > > > > covers
> > > > > > > > > > > > both
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > sync
> > > > > > > > > > > > > > >> > > > and async per `put` user cases.
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > +1
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > > Given how simple the sync implementation
> > > > > > > > > > > > > > >> > > > is, just by complying with the return
> type
> > > of
> > > > > the
> > > > > > > > > method,
> > > > > > > > > > I
> > > > > > > > > > > > > still
> > > > > > > > > > > > > > >> think
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > BiFunction definition that returns a
> > Future
> > > > > makes
> > > > > > > > sense.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Konstantine
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > On Fri, May 15, 2020 at 11:27 AM Aakash
> > > Shah <
> > > > > > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > Thanks for the additional feedback.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > I see the benefits of adding an
> > overloaded
> > > > > > > put(...)
> > > > > > > > > over
> > > > > > > > > > > > > > >> alternatives
> > > > > > > > > > > > > > >> > > > and I
> > > > > > > > > > > > > > >> > > > > am on board going forward with this
> > > > approach.
> > > > > It
> > > > > > > > will
> > > > > > > > > > > > > definitely
> > > > > > > > > > > > > > >> set
> > > > > > > > > > > > > > >> > > > forth
> > > > > > > > > > > > > > >> > > > > a contract of where the reporter will
> be
> > > > used
> > > > > > with
> > > > > > > > > > better
> > > > > > > > > > > > > > >> aesthetics.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > The original idea of going with a
> > > > synchronous
> > > > > > > > approach
> > > > > > > > > > for
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> error
> > > > > > > > > > > > > > >> > > > > reporter was to ease the connector
> > > > developer's
> > > > > > job
> > > > > > > > > > > > interacting
> > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > >> > > > > handling the error reporter. The
> > tradeoff
> > > > for
> > > > > > > > having a
> > > > > > > > > > > > > > >> > synchronous-only
> > > > > > > > > > > > > > >> > > > > reporter would be lower throughput on
> > the
> > > > > > > reporter;
> > > > > > > > > this
> > > > > > > > > > > was
> > > > > > > > > > > > > > >> thought
> > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > > >> > > > > fine since arguably most circumstances
> > > would
> > > > > not
> > > > > > > > > include
> > > > > > > > > > > > > > >> consistently
> > > > > > > > > > > > > > >> > > > large
> > > > > > > > > > > > > > >> > > > > amounts of records being sent to the
> > error
> > > > > > > reporter.
> > > > > > > > > > Even
> > > > > > > > > > > if
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > >> was
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > case, an argument can be made that the
> > > lower
> > > > > > > > > throughput
> > > > > > > > > > > > would
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > >> of
> > > > > > > > > > > > > > >> > > > > assistance in this case, as it would
> > allow
> > > > > more
> > > > > > > time
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > > > user
> > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > >> > > > > realize the connector is having
> records
> > > sent
> > > > > to
> > > > > > > the
> > > > > > > > > > error
> > > > > > > > > > > > > > reporter
> > > > > > > > > > > > > > >> > > before
> > > > > > > > > > > > > > >> > > > > many are sent. However, if we are
> > strongly
> > > > in
> > > > > > > favor
> > > > > > > > of
> > > > > > > > > > > > having
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > option
> > > > > > > > > > > > > > >> > > > of
> > > > > > > > > > > > > > >> > > > > asynchronous functionality available
> for
> > > the
> > > > > > > > > developer,
> > > > > > > > > > > > then I
> > > > > > > > > > > > > > am
> > > > > > > > > > > > > > >> > fine
> > > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > > >> > > > > that as well.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Lastly, I am on board with changing
> the
> > > name
> > > > > to
> > > > > > > > > > > > > > >> failedRecordReporter,
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Please let me know your thoughts.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > > > >> > > > > Aakash
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > On Fri, May 15, 2020 at 9:10 AM
> Randall
> > > > Hauch
> > > > > <
> > > > > > > > > > > > > rha...@gmail.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > > Konstantine said:
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > > I notice Randall also used
> > BiFunction
> > > in
> > > > > his
> > > > > > > > > > example,
> > > > > > > > > > > I
> > > > > > > > > > > > > > >> wonder if
> > > > > > > > > > > > > > >> > > > it's
> > > > > > > > > > > > > > >> > > > > > for
> > > > > > > > > > > > > > >> > > > > > > similar reasons.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Nope. Just a typo on my part.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > There appear to be three outstanding
> > > > > > questions.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > First, Konstantine suggested calling
> > > this
> > > > > > > > > > > > > > >> "failedRecordReporter". I
> > > > > > > > > > > > > > >> > > > think
> > > > > > > > > > > > > > >> > > > > > this is minor, but using this new
> name
> > > may
> > > > > be
> > > > > > a
> > > > > > > > bit
> > > > > > > > > > more
> > > > > > > > > > > > > > precise
> > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > >> > > > I'd
> > > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > > >> > > > > > fine with this.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Second, should the reporter method
> be
> > > > > > > > synchronous? I
> > > > > > > > > > > think
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> two
> > > > > > > > > > > > > > >> > > > > options
> > > > > > > > > > > > > > >> > > > > > are:
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > 2a. Use `BiConsumer<SinkRecord,
> > > > Throwable>`
> > > > > > that
> > > > > > > > > > returns
> > > > > > > > > > > > > > nothing
> > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > >> > > > > blocks
> > > > > > > > > > > > > > >> > > > > > (at this time).
> > > > > > > > > > > > > > >> > > > > > 2b. Use `BiFunction<SinkRecord,
> > > Throwable,
> > > > > > > > > > > Future<Void>>`
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > >> > > returns
> > > > > > > > > > > > > > >> > > > a
> > > > > > > > > > > > > > >> > > > > > future that the user can optionally
> > use
> > > to
> > > > > be
> > > > > > > > > > > synchronous.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > I do agree with Konstantine that
> > option
> > > 2b
> > > > > > gives
> > > > > > > > us
> > > > > > > > > > more
> > > > > > > > > > > > > room
> > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > >> > > > future
> > > > > > > > > > > > > > >> > > > > > semantic changes, and since the
> > producer
> > > > > write
> > > > > > > is
> > > > > > > > > > > already
> > > > > > > > > > > > > > >> > > asynchronous
> > > > > > > > > > > > > > >> > > > > this
> > > > > > > > > > > > > > >> > > > > > should be straightforward to
> > implement.
> > > I
> > > > > > think
> > > > > > > > the
> > > > > > > > > > > > concern
> > > > > > > > > > > > > > >> here is
> > > > > > > > > > > > > > >> > > > that
> > > > > > > > > > > > > > >> > > > > if
> > > > > > > > > > > > > > >> > > > > > the sink task does not *use* the
> > future
> > > to
> > > > > > make
> > > > > > > > this
> > > > > > > > > > > > > > >> synchronous,
> > > > > > > > > > > > > > >> > it
> > > > > > > > > > > > > > >> > > is
> > > > > > > > > > > > > > >> > > > > > very possible that the error records
> > > could
> > > > > be
> > > > > > > > > written
> > > > > > > > > > > out
> > > > > > > > > > > > of
> > > > > > > > > > > > > > >> order
> > > > > > > > > > > > > > >> > > (due
> > > > > > > > > > > > > > >> > > > > to
> > > > > > > > > > > > > > >> > > > > > retries). But this won't be an issue
> > if
> > > > the
> > > > > > > > > > > implementation
> > > > > > > > > > > > > > uses
> > > > > > > > > > > > > > >> > > > > >
> > > `max.in.flight.requests.per.connection=1`
> > > > > for
> > > > > > > > > writing
> > > > > > > > > > > the
> > > > > > > > > > > > > > error
> > > > > > > > > > > > > > >> > > > records.
> > > > > > > > > > > > > > >> > > > > > It's a little less clear, but
> honestly
> > > IMO
> > > > > > > passing
> > > > > > > > > the
> > > > > > > > > > > > > > reporter
> > > > > > > > > > > > > > >> in
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > > `put(...)` method helps make this
> > lambda
> > > > > > easier
> > > > > > > to
> > > > > > > > > > > > > understand,
> > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > >> > > some
> > > > > > > > > > > > > > >> > > > > > strange reason. So unless there are
> > good
> > > > > > reasons
> > > > > > > > to
> > > > > > > > > > > avoid
> > > > > > > > > > > > > > this,
> > > > > > > > > > > > > > >> I'd
> > > > > > > > > > > > > > >> > > be
> > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > >> > > > > > favor of 2b and returning a Future.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Third, how do we pass the reporter
> > > lambda
> > > > /
> > > > > > > method
> > > > > > > > > > > > reference
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> > > > task?
> > > > > > > > > > > > > > >> > > > > > My proposal to pass the reporter via
> > an
> > > > > > overload
> > > > > > > > > > > > `put(...)`
> > > > > > > > > > > > > > >> still
> > > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > > >> > > > > > most attractive to me, for several
> > > > reasons:
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > 3a. There's no need to pass the
> > reporter
> > > > > > > > separately
> > > > > > > > > > > *and*
> > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > describe
> > > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > > >> > > > > > changes in method call ordering.
> > > > > > > > > > > > > > >> > > > > > 3b. As mentioned above, for some
> > reason
> > > > > > passing
> > > > > > > it
> > > > > > > > > via
> > > > > > > > > > > > > > >> `put(...)`
> > > > > > > > > > > > > > >> > > makes
> > > > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > > > >> > > > > > intent more clear that it be used
> when
> > > > > > > processing
> > > > > > > > > the
> > > > > > > > > > > > > > >> SinkRecord,
> > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > >> > > > > that
> > > > > > > > > > > > > > >> > > > > > it shouldn't be used in
> `start(...)`,
> > > > > > > > > > `preCommit(...)`,
> > > > > > > > > > > > > > >> > > > > > `onPartitionsAssigned(...)`, or any
> of
> > > the
> > > > > > other
> > > > > > > > > task
> > > > > > > > > > > > > methods.
> > > > > > > > > > > > > > >> As
> > > > > > > > > > > > > > >> > > > Andrew
> > > > > > > > > > > > > > >> > > > > > pointed out earlier, *describing*
> this
> > > in
> > > > > the
> > > > > > > KIP
> > > > > > > > > and
> > > > > > > > > > in
> > > > > > > > > > > > > > JavaDoc
> > > > > > > > > > > > > > >> > will
> > > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > > >> > > > > > tough to be exact yet succinct.
> > > > > > > > > > > > > > >> > > > > > 3c. There is already precedence for
> > > > evolving
> > > > > > > > > > > > > > >> > > > > > `SourceTask.commitRecord(...)`, and
> > the
> > > > > > pattern
> > > > > > > is
> > > > > > > > > > > > > identical.
> > > > > > > > > > > > > > >> > > > > > 3d. Backward compatibility is easy
> to
> > > > > > > understand,
> > > > > > > > > and
> > > > > > > > > > at
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> same
> > > > > > > > > > > > > > >> > > time
> > > > > > > > > > > > > > >> > > > > it's
> > > > > > > > > > > > > > >> > > > > > pretty easy to describe what
> > > > implementations
> > > > > > > that
> > > > > > > > > want
> > > > > > > > > > > to
> > > > > > > > > > > > > take
> > > > > > > > > > > > > > >> > > > advantage
> > > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > > >> > > > > > this feature should do.
> > > > > > > > > > > > > > >> > > > > > 3e. Minimal changes to the
> interface:
> > > > we're
> > > > > > just
> > > > > > > > > > > *adding*
> > > > > > > > > > > > > one
> > > > > > > > > > > > > > >> > default
> > > > > > > > > > > > > > >> > > > > > method that calls the existing
> method
> > > and
> > > > > > > > > deprecating
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> existing
> > > > > > > > > > > > > > >> > > > > > `put(...)`.
> > > > > > > > > > > > > > >> > > > > > 3f. Deprecating the existing
> > `put(...)`
> > > > > makes
> > > > > > it
> > > > > > > > > more
> > > > > > > > > > > > clear
> > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > >> > > > > > programmatic sense that new sink
> > > > > > implementations
> > > > > > > > > > should
> > > > > > > > > > > > use
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > reporter,
> > > > > > > > > > > > > > >> > > > > > and that we recommend old sinks
> evolve
> > > to
> > > > > use
> > > > > > > it.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Some of these benefits apply to some
> > of
> > > > the
> > > > > > > other
> > > > > > > > > > > > > suggestions,
> > > > > > > > > > > > > > >> but
> > > > > > > > > > > > > > >> > I
> > > > > > > > > > > > > > >> > > > > think
> > > > > > > > > > > > > > >> > > > > > none of the other suggestions have
> all
> > > of
> > > > > > these
> > > > > > > > > > > benefits.
> > > > > > > > > > > > > For
> > > > > > > > > > > > > > >> > > example,
> > > > > > > > > > > > > > >> > > > > > overloading `initialize(...)` is
> more
> > > > > > difficult
> > > > > > > > > since
> > > > > > > > > > > most
> > > > > > > > > > > > > > sink
> > > > > > > > > > > > > > >> > > > > connectors
> > > > > > > > > > > > > > >> > > > > > don't override it and therefore
> would
> > be
> > > > > less
> > > > > > > > > subject
> > > > > > > > > > to
> > > > > > > > > > > > > > >> > deprecations
> > > > > > > > > > > > > > >> > > > > > warnings. Overloading `start(...)`
> is
> > > less
> > > > > > > > > attractive.
> > > > > > > > > > > > > Adding
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > >> > > method
> > > > > > > > > > > > > > >> > > > > IMO
> > > > > > > > > > > > > > >> > > > > > shares the fewest of these benefits.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > The one disadvantage of this
> approach
> > is
> > > > > that
> > > > > > > sink
> > > > > > > > > > task
> > > > > > > > > > > > > > >> > > implementations
> > > > > > > > > > > > > > >> > > > > > can't rely upon the reporter upon
> > > startup.
> > > > > IMO
> > > > > > > > > that's
> > > > > > > > > > an
> > > > > > > > > > > > > > >> acceptable
> > > > > > > > > > > > > > >> > > > > > tradeoff to get the cleaner and more
> > > > > explicit
> > > > > > > API,
> > > > > > > > > > > > > especially
> > > > > > > > > > > > > > if
> > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > >> > > > API
> > > > > > > > > > > > > > >> > > > > > contract is that Connect will pass
> the
> > > > same
> > > > > > > > reporter
> > > > > > > > > > > > > instance
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > each
> > > > > > > > > > > > > > >> > > > > call
> > > > > > > > > > > > > > >> > > > > > to `put(...)` on a single task
> > instance.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Best regards,
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Randall
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > On Fri, May 15, 2020 at 6:59 AM
> Andrew
> > > > > > > Schofield <
> > > > > > > > > > > > > > >> > > > > > andrew_schofi...@live.com>
> > > > > > > > > > > > > > >> > > > > > wrote:
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > > Hi,
> > > > > > > > > > > > > > >> > > > > > > Randall's suggestion is really
> > good. I
> > > > > think
> > > > > > > it
> > > > > > > > > > gives
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > flexibility
> > > > > > > > > > > > > > >> > > > > > > required and also
> > > > > > > > > > > > > > >> > > > > > > keeps the interface the right way
> > > round.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > Thanks,
> > > > > > > > > > > > > > >> > > > > > > Andrew Schofield
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > On 15/05/2020, 02:07, "Aakash
> > Shah" <
> > > > > > > > > > > > as...@confluent.io>
> > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Hi Randall,
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback.
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > 1. This is a great suggestion,
> > but I
> > > > > find
> > > > > > > that
> > > > > > > > > > > adding
> > > > > >
>

Reply via email to