Hi Arjun, This is great news. Given that we're already willing to ask developers to catch a method-not-found exception and it sounds like a new interface can be handled in a similar try/catch block in a same place, I like the idea of a new fleshed-out interface instead of a BiConsumer or a BiFunction.
Cheers, Chris On Sat, May 16, 2020, 21:29 Arjun Satish <arjun.sat...@gmail.com> wrote: > ok folks, this is my POC PR: > https://github.com/wicknicks/kafka/tree/kip-610-2.4.1. connectors built > from this were copied into a fresh installation of Kafka Connect (v2.5, the > latest version), and ran. Without proper try-catch, the tasks would fail. > But when the appropriate exceptions were handled, the task proceeded > without any issues. I've added some comments here > > https://github.com/apache/kafka/commit/295e6a46925993060a60b79e646e06432480ed0d > that document these errors. Tested this with both java 8 and 11. > > To check if it is safe to add methods and new classes, I dug around the JVM > spec <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html>. The > following points are being made here: > > 1. During the loading phase, a number of structural details of the class > are added to the run-time constant pool (class descriptor, fields, > methods). what these methods themselves do are not considered. > 2. During the linking phase, resolution of symbolic references is optional. > But the important bit is this > <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4>: > Whichever strategy (lazy or eager) is followed, any error detected during > resolution must be thrown at a point in the program that (directly or > indirectly) uses a symbolic reference to the class or interface. > > So this means that any new classes that were originally relied upon the > connector that are missing now, will be in some internal "not found" say > but no errors will be thrown till resolution stage. > > 3. Section 5.4.3 > <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3 > > > says execution of any of the (anewarray, checkcast etc) instructions > requires resolution of its symbolic reference, which is good since the > instructions are executed in order, and the connector developer has time to > execute some code before program has to look for a unknown object. > > We are also safe in that the new interface will be used only during or > after start() is called, so there should not be any fields that do > something like: > > private final ErrantRecordReporter reporter = > sinkTaskContext.failedRecordReporter(); > > The initialization will only be valid in start(), where we can safely wrap > around try catch issues. The errors being thrown are very precise. > NoSuchMethodError > < > https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3.3 > > > for method not found, and NoClassDefFoundError for any classes loaded > during resolution > <https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.3>. > > BTW, one more example that comes to mind is the JDBC spec (the java.sql > package has been changing with every major version of Java). Newer classes > are added (look at java.sql.DriverAction (in Java 8) and > java.sql.ShardingKey (in java 9)). New methods to existing classes are > being added as well (for example, > java.sql.DriverManager#registerDriver(java.sql.Driver, > java.sql.DriverAction) > < > https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#registerDriver-java.sql.Driver-java.sql.DriverAction- > >). > and the expectation is that the driver should handle any runtime > inconsistencies. > > Overall, I think it should be safe to have new interfaces, and have them > loaded safely, with correct checks. > > BTW, one more opportunity we have is that the connector can check if these > new methods are present or not. For example: > > > Class.forName("org.apache.kafka.connect.sink.SinkTaskContext").getMethod("failedRecordReporter"); > > And based on this, chose to return old a different task in > SinkConnector#taskClass() if they want to hyper-safe. > > Apologies for throwing in such a wrench at the last minute! But IMHO it'd > be good to take this opportunity if we can. > > Best, > > On Sat, May 16, 2020 at 6:05 PM Randall Hauch <rha...@gmail.com> wrote: > > > Thanks for updating the KIP, Aakash. A few comments on the updated > content > > there: > > > > In order to avoid error records being written out of order (for example, > > > due to retries), the developer can use > > > `max.in.flight.requests.per.connection=1` in their implementation for > > > writing error records. > > > > > > > IMO, the DLQ should always use this setting to prevent retries, and the > > developer can always set this property for the DLQ to something larger > than > > 1 if order is not important and they need the extreme performance. > > > > Along with the original errant sink record, the exception thrown will be > > > sent to the error reporter to provide additional context. > > > > > > > This is also not really clear. What thrown exception is this referring > to? > > Isn't the exception *passed* to the reporter method? > > > > I do think we need to better explain from the sink task's perspective > what > > behavior it should expect? Is there any case when the reporter will be > null > > or be a no-op, and if so what should the sink task do? Should it simply > > wrap and throw a ConnectException? And if there is a reporter, won't > > Connect treat this sink record as "processed" with respect to the > existing > > behavior of passing "processed" offsets back to the sink task's > > `preCommit(...)` method. > > > > Thanks, > > > > Randall > > > > On Sat, May 16, 2020 at 6:38 PM Arjun Satish <arjun.sat...@gmail.com> > > wrote: > > > > > Yeah I had tried this locally on java 8 and 11, and it had seemed to > > work. > > > Let me clean up and publish my code in a branch somewhere so we can > take > > a > > > look at it. > > > > > > Thanks, > > > > > > On Sat, May 16, 2020 at 3:39 PM Randall Hauch <rha...@gmail.com> > wrote: > > > > > > > Have you tried this? IIUC the problem is with the new type, and any > > class > > > > that uses ‘ErrantRecordReporter’ with an import would fail to be > loaded > > > by > > > > the classloader if the type does not exist (I.e., pre-2.9 Connect > > > > runtimes). Catching that ClassNotFoundException and dynamically > > importing > > > > the type is actually much harder. > > > > > > > > On Sat, May 16, 2020 at 5:07 PM Aakash Shah <as...@confluent.io> > > wrote: > > > > > > > > > Hi Arjun, > > > > > > > > > > Thanks for this suggestion. I actually like this a lot because a > > > defined > > > > > interface looks more appealing and is clearer in its intention. > Since > > > we > > > > > are still using NoSuchMethodException to account for backwards > > > > > compatibility, this works for me. I can't see any drawbacks besides > > > > having > > > > > to call the getter method for the processing of every errant > record. > > > > > > > > > > I would like to hear others' thoughts on if this drawback outweighs > > the > > > > > added benefit of clarity. > > > > > > > > > > Thanks, > > > > > Aakash > > > > > > > > > > On Sat, May 16, 2020 at 2:54 PM Arjun Satish < > arjun.sat...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Thanks Konstantine, happy to write something up in a KIP. But I > > think > > > > it > > > > > > would be redundant if we add this kip. What do you think? > > > > > > > > > > > > Also, Randall, yes that API would work. But, if we expect the > > > > developers > > > > > to > > > > > > catch NoSuchMethodErrors, then should we also go ahead and make a > > > class > > > > > > that would have a report method(similar to ErrorReporter > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java > > > > > > > > > > > > > but maybe with different arguments on the report method)? they > > would > > > > also > > > > > > have to catch NoClassDefFoundError. > > > > > > > > > > > > That would modify your change in SinkTaskContext to: > > > > > > > > > > > > public interface SinkTaskContext { > > > > > > /** > > > > > > * Get the reporter to which the sink task can report > > problematic > > > > or > > > > > > * failed {@link SinkRecord} passed to the {@link > > > > > > SinkTask#put(Collection)} method. > > > > > > * > > > > > > * @return a errant record reporter > > > > > > */ > > > > > > ErrantRecordReporter failedRecordReporter(); > > > > > > } > > > > > > > > > > > > where ErrantRecordReporter is: > > > > > > > > > > > > public interface ErrantRecordReporter { > > > > > > > > > > > > /** > > > > > > * Serialize and produce records to the error topic > > > > > > * > > > > > > * @param record the errant record > > > > > > */ > > > > > > void report(SinkRecord record, Throwable error); > > > > > > > > > > > > } > > > > > > > > > > > > Usage in put would be the same though if the class is not > > explicitly > > > > > named: > > > > > > > > > > > > try { > > > > > > context.failedRecordReporter().report(record, error); > > > > > > } catch (NoSuchMethodError e) { > > > > > > log.info("Boooooooooo!"); > > > > > > } > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > On Sat, May 16, 2020 at 12:46 PM Konstantine Karantasis < > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > Thanks for following up Randall. > > > > > > > > > > > > > > I agree with your latest suggestion. It was good that we > explored > > > > > several > > > > > > > options but accessing the context to obtain the reporter in > Kafka > > > > > Connect > > > > > > > versions that support this feature makes the most sense. The > > burden > > > > for > > > > > > > connector developers that want to use this reporter _and_ make > > > > > connectors > > > > > > > compatible with old and new workers is minimal. > > > > > > > > > > > > > > We'll have to leave with additions like this and the appearance > > in > > > > both > > > > > > > KIP-131 and here in KIP-610 indeed creates a reasonable > > precedent. > > > > > > > > > > > > > > Konstantine > > > > > > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 12:34 PM Randall Hauch < > rha...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > Thanks again for the active discussion! > > > > > > > > > > > > > > > > Regarding the future-vs-callback discussion: I did like where > > > Chris > > > > > was > > > > > > > > going with the Callback, but he raises good point that it's > > > unclear > > > > > > what > > > > > > > to > > > > > > > > use for the reporter type, since we'd need three parameters. > > > > > > Introducing > > > > > > > a > > > > > > > > new interface makes it much harder for a sink task to be > > backward > > > > > > > > compatible, so sticking with BiFunction is a good compromise. > > > Plus, > > > > > > > another > > > > > > > > significant disadvantage of a callback approach is that a > sink > > > > task's > > > > > > > > callback is called from the producer thread, and this risks a > > > > > > > > poorly written sink task callback killing the reporter's > > producer > > > > > > without > > > > > > > > necessarily failing the task. Using a future avoids this risk > > > > > > altogether, > > > > > > > > still provides the sink task with the ability to do > synchronous > > > > > > reporting > > > > > > > > using Future, which is a standard and conventional design > > > pattern. > > > > So > > > > > > we > > > > > > > do > > > > > > > > seem to have converged on using `BiFunction<SinkRecord, > > > Throwable, > > > > > > > > Future<Void>>` for the reporter type. > > > > > > > > > > > > > > > > Now, we still seem to not have converted upon how to pass the > > > > > reporter > > > > > > to > > > > > > > > the sink task. I agree with Konstantine that the deprecation > > > > affects > > > > > > only > > > > > > > > newer versions of Connect, and that a sink task should deal > > with > > > > both > > > > > > put > > > > > > > > methods only when it wants to support older runtimes. I also > > > think > > > > > that > > > > > > > > this is a viable approach, but I do concede that this > evolution > > > of > > > > > the > > > > > > > sink > > > > > > > > task API is more complicated than it should be. > > > > > > > > > > > > > > > > In the interest of quickly coming to consensus on how we pass > > the > > > > > > > reporter > > > > > > > > to the sink task, I'd like to go back to Andrew's original > > > > > suggestion, > > > > > > > > which I think we disregarded too quickly: add a getter on the > > > > > > > > SinkTaskContext interface. We already have precedent for > adding > > > > > methods > > > > > > > to > > > > > > > > one of the context classes with the newly-adopted KIP-131, > > which > > > > > adds a > > > > > > > > getter for the OffsetStorageReader on the (new) > > > > > SourceConnectorContext. > > > > > > > > That KIP accepts the fact that a source connector wanting to > > use > > > > this > > > > > > > > feature while also keeping the ability to be installed into > > older > > > > > > Connect > > > > > > > > runtimes must guard its use of the context's getter method. > > > > > > > > > > > > > > > > I think we can use the same pattern for this KIP, and add a > > > getter > > > > to > > > > > > the > > > > > > > > existing SinkTaskContext that is defined something like: > > > > > > > > > > > > > > > > public interface SinkTaskContext { > > > > > > > > ... > > > > > > > > /** > > > > > > > > * Get the reporter to which the sink task can report > > > > problematic > > > > > > or > > > > > > > > failed {@link SinkRecord} > > > > > > > > * passed to the {@link SinkTask#put(Collection)} method. > > > When > > > > > > > > reporting a failed record, > > > > > > > > * the sink task will receive a {@link Future} that the > > task > > > > can > > > > > > > > optionally use to wait until > > > > > > > > * the failed record and exception have been written to > > Kafka > > > > via > > > > > > > > Connect's DLQ. Note that > > > > > > > > * the result of this method may be null if this > connector > > > has > > > > > not > > > > > > > been > > > > > > > > configured with a DLQ. > > > > > > > > * > > > > > > > > * <p>This method was added in Apache Kafka 2.9. Sink > tasks > > > > that > > > > > > use > > > > > > > > this method but want to > > > > > > > > * maintain backward compatibility so they can also be > > > deployed > > > > > to > > > > > > > > older Connect runtimes > > > > > > > > * should guard the call to this method with a try-catch > > > block, > > > > > > since > > > > > > > > calling this method will result in a > > > > > > > > * {@link NoSuchMethodException} when the sink connector > is > > > > > > deployed > > > > > > > to > > > > > > > > Connect runtimes > > > > > > > > * older than Kafka 2.9. For example: > > > > > > > > * <pre> > > > > > > > > * BiFunction<SinkTask, Throwable, > > > > Future<Void>> > > > > > > > > reporter; > > > > > > > > * try { > > > > > > > > * reporter = context.failedRecordReporter(); > > > > > > > > * } catch (NoSuchMethodException e) { > > > > > > > > * reporter = null; > > > > > > > > * } > > > > > > > > * </pre> > > > > > > > > * > > > > > > > > * @return the reporter function; null if no error > reporter > > > has > > > > > > been > > > > > > > > configured for the connector > > > > > > > > * @since 2.9 > > > > > > > > */ > > > > > > > > BiFunction<SinkTask, Throwable, Future<Void>> > > > > > > failedRecordReporter(); > > > > > > > > } > > > > > > > > > > > > > > > > The main advantage is that the KIP no longer has to make *any > > > > other* > > > > > > > > changes to the Sink Connector or Task API. The above is > really > > > the > > > > > only > > > > > > > > change, and it's merely an addition to the API. No > deprecation > > > and > > > > no > > > > > > > > overloading methods. The KIP does need to explain how the > > > reporter > > > > is > > > > > > > > configured and used (which it already does), but IMO the KIP > > > > doesn't > > > > > > need > > > > > > > > to describe when this reporter can/should be used. After > all, > > > this > > > > > > > method > > > > > > > > is on the existing SinkTaskContext, so this method is really > no > > > > > > different > > > > > > > > than any other existing method. I think my JavaDoc (which is > > > just a > > > > > > > > suggestion for a starting point that Aakash can improve as > > > needed) > > > > > > > > describes how easy it is for a sink to maintain backward > > > > > compatibility. > > > > > > > > (The use of `BiFunction` helps tremendously.) Another not > > > > > insignificant > > > > > > > > advantage is that a sink task can use this reporter reference > > > > > > throughout > > > > > > > > the task's lifetime (after it's started and before it is > > > stopped), > > > > > > making > > > > > > > > it less invasive for existing sink task implementations that > > want > > > > to > > > > > > use > > > > > > > > it. > > > > > > > > > > > > > > > > I hope we can all get being this compromise, which IMO is > > > actually > > > > > > super > > > > > > > > clean and makes a lot of sense. Thanks, Andrew, for > originally > > > > > > suggesting > > > > > > > > it. I know we're all trying to improve the Connect API in a > way > > > > that > > > > > > > makes > > > > > > > > sense, and deliberate and constructive discussion is a > healthy > > > > thing. > > > > > > > > Thanks again to everyone for participating! > > > > > > > > > > > > > > > > BTW, we've agreed upon a number of other changes, but I don't > > see > > > > any > > > > > > of > > > > > > > > those changes on the KIP. Aakash, can you please update the > KIP > > > > > quickly > > > > > > > so > > > > > > > > we can make sure the other parts are the KIP are acceptable? > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > Randall > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis < > > > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > > > > > Thanks for the quick response Aakash. > > > > > > > > > > > > > > > > > > With respect to deprecation, this refers to deprecating > this > > > > method > > > > > > in > > > > > > > > > newer versions of Kafka Connect (and eventually removing > it). > > > > > > > > > > > > > > > > > > As a connector developer, if you want your connector to run > > > > across > > > > > a > > > > > > > wide > > > > > > > > > spectrum of Connect versions, you'll have to take this into > > > > > > > consideration > > > > > > > > > and retain both methods in a functional state. The good > news > > is > > > > > that > > > > > > > both > > > > > > > > > methods can share a lot of code, so in reality both the old > > and > > > > the > > > > > > new > > > > > > > > put > > > > > > > > > will be thin shims over a `putRecord` method (or `process` > > > method > > > > > as > > > > > > > you > > > > > > > > > call it in the KIP). > > > > > > > > > > > > > > > > > > Given the above, there's no requirement to conditionally > call > > > one > > > > > > > method > > > > > > > > or > > > > > > > > > the other in the framework based on configuration. Once you > > > > > implement > > > > > > > the > > > > > > > > > new `put` with something other than its default > > implementation, > > > > as > > > > > a > > > > > > > > > connector developer, you'll know to adapt to the above. > > > > > > > > > > > > > > > > > > I definitely suggest extending our docs in a meaningful way > > in > > > > > order > > > > > > to > > > > > > > > > make the upgrade path easy to follow. Maybe you'd like to > > add a > > > > > note > > > > > > to > > > > > > > > > your compatibility section in this KIP as well. > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Konstantine > > > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah < > > > as...@confluent.io > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > +1 > > > > > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis < > > > > > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > > > > > > > > > Hi Arjun, > > > > > > > > > > > > > > > > > > > > > > I think I agree with you that subject is interesting. > > Yet, > > > I > > > > > feel > > > > > > > it > > > > > > > > > > > belongs to a separate future KIP. Reading the proposal > in > > > the > > > > > KIP > > > > > > > > > format > > > > > > > > > > > will help, at least myself, to understand it better. > > > > > > > > > > > > > > > > > > > > > > Having said that, for the purpose of simplifying error > > > > handling > > > > > > for > > > > > > > > > sink > > > > > > > > > > > tasks, the discussion on KIP-610 has made some good > > > progress > > > > on > > > > > > the > > > > > > > > > > mailing > > > > > > > > > > > list. If the few open items are reflected on the > > proposal, > > > > > maybe > > > > > > > it'd > > > > > > > > > be > > > > > > > > > > > even worthwhile to consider it for inclusion in the > > > upcoming > > > > > > > release > > > > > > > > > with > > > > > > > > > > > its current scope. > > > > > > > > > > > > > > > > > > > > > > Konstantine > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish < > > > > > > > arjun.sat...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > I'm kinda hoping that we get to an approach on how to > > > > extend > > > > > > the > > > > > > > > > > Connect > > > > > > > > > > > > framework. Adding parameters in the put method is > nice, > > > and > > > > > > maybe > > > > > > > > > works > > > > > > > > > > > for > > > > > > > > > > > > now, but I'm not sure how scalable it is. It'd great > to > > > be > > > > > able > > > > > > > to > > > > > > > > > add > > > > > > > > > > > more > > > > > > > > > > > > functionality in the future. Couple of examples: > > > > > > > > > > > > > > > > > > > > > > > > - make the metrics registry available to a task, so > > they > > > > can > > > > > > > report > > > > > > > > > > task > > > > > > > > > > > > level metrics or > > > > > > > > > > > > - be able to pass in a RestExtension handle to the > > task, > > > so > > > > > the > > > > > > > > task > > > > > > > > > > can > > > > > > > > > > > > provide a rest endpoint which users can hit to get > some > > > > task > > > > > > > level > > > > > > > > > > > > information (about its status, health, for example) > > > > > > > > > > > > > > > > > > > > > > > > In such scenarios, maybe adding new parameters to > > > existing > > > > > > > methods > > > > > > > > > may > > > > > > > > > > > not > > > > > > > > > > > > be immediately acceptable. > > > > > > > > > > > > > > > > > > > > > > > > Since we are very close to a deadline, I wanted to > > check > > > if > > > > > the > > > > > > > one > > > > > > > > > > more > > > > > > > > > > > > possibility is acceptable :-) > > > > > > > > > > > > > > > > > > > > > > > > What if we could create a library that could be used > by > > > > > > connector > > > > > > > > to > > > > > > > > > > > > independently integrated by connector developers in > > their > > > > > > > > connectors. > > > > > > > > > > The > > > > > > > > > > > > library would be packaged and shipped with their > > > connector > > > > > like > > > > > > > any > > > > > > > > > > other > > > > > > > > > > > > library on maven (and other similar repositories). > The > > > new > > > > > > module > > > > > > > > > would > > > > > > > > > > > be > > > > > > > > > > > > in the AK project, but its jars will *not* be added > to > > > > > > classpath > > > > > > > > for > > > > > > > > > > > > Connect worker. > > > > > > > > > > > > > > > > > > > > > > > > The library would provide a public interface for an > > error > > > > > > > reporter, > > > > > > > > > > which > > > > > > > > > > > > provides both synchronous and asynchronous > > > functionalities > > > > > (as > > > > > > > was > > > > > > > > > > > brought > > > > > > > > > > > > up above). > > > > > > > > > > > > > > > > > > > > > > > > This would be an independent library, they can be > > easily > > > > > > bundled > > > > > > > > and > > > > > > > > > > > loaded > > > > > > > > > > > > with the other connectors. The connect framework will > > be > > > > > > > decoupled > > > > > > > > > from > > > > > > > > > > > > this utility. > > > > > > > > > > > > > > > > > > > > > > > > I understand that a similar option is in the rejected > > > > > > > alternatives, > > > > > > > > > > > mostly > > > > > > > > > > > > because of configuration overhead, but the > > configuration > > > > > > required > > > > > > > > > here > > > > > > > > > > > can > > > > > > > > > > > > come directly from the worker properties (and just be > > > copy > > > > > > pasted > > > > > > > > > from > > > > > > > > > > > > there, maybe with a prefix). and I wonder (if maybe > > part > > > > as a > > > > > > > > future > > > > > > > > > > > KIP), > > > > > > > > > > > > we can evaluate a strategy where certain worker > configs > > > can > > > > > be > > > > > > > > passed > > > > > > > > > > to > > > > > > > > > > > a > > > > > > > > > > > > connector (for example, the producer/consume/admin > > ones), > > > > so > > > > > > end > > > > > > > > > users > > > > > > > > > > do > > > > > > > > > > > > not have to. > > > > > > > > > > > > > > > > > > > > > > > > Overall, we would get clean APIs, contracts and > > > developers > > > > > get > > > > > > > > > freedom > > > > > > > > > > to > > > > > > > > > > > > use these libraries and functionalities however they > > > want. > > > > > The > > > > > > > only > > > > > > > > > > > > drawback is how this is configured (end-users will > have > > > to > > > > > add > > > > > > > more > > > > > > > > > > lines > > > > > > > > > > > > in the json/properties files). But all configs can > > simply > > > > > come > > > > > > > from > > > > > > > > > > > worker, > > > > > > > > > > > > I believe this is relatively minor issue. We should > be > > > able > > > > > to > > > > > > > work > > > > > > > > > out > > > > > > > > > > > > compatibility issues in the implementations, so that > > the > > > > > > library > > > > > > > > can > > > > > > > > > > > safely > > > > > > > > > > > > run (and degrade functionality if needed) with old > > > workers. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:04 PM Aakash Shah < > > > > > > as...@confluent.io> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Just wanted to clarify that I am on board with > adding > > > the > > > > > > > > > overloaded > > > > > > > > > > > > > put(...) method. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Aakash > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah < > > > > > > > as...@confluent.io> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Randall and Konstantine, > > > > > > > > > > > > > > > > > > > > > > > > > > > > As Chris and Arjun mentioned, I think the main > > > concern > > > > is > > > > > > the > > > > > > > > > > > potential > > > > > > > > > > > > > > gap in which developers don't implement the > > > deprecated > > > > > > method > > > > > > > > due > > > > > > > > > > to > > > > > > > > > > > a > > > > > > > > > > > > > > misunderstanding of use cases. Using the setter > > > method > > > > > > > approach > > > > > > > > > > > ensures > > > > > > > > > > > > > > that the developer won't break backwards > > > compatibility > > > > > when > > > > > > > > using > > > > > > > > > > the > > > > > > > > > > > > new > > > > > > > > > > > > > > method due to a mistake. That being said, I think > > the > > > > > value > > > > > > > > added > > > > > > > > > > in > > > > > > > > > > > > > > clarity of contract of when the error reporter > will > > > be > > > > > > > invoked > > > > > > > > > and > > > > > > > > > > > > > overall > > > > > > > > > > > > > > aesthetic while maintaining backwards > compatibility > > > > > > outweighs > > > > > > > > the > > > > > > > > > > > > > potential > > > > > > > > > > > > > > mistake of a developer in not implementing the > > > original > > > > > > > > put(...) > > > > > > > > > > > > method. > > > > > > > > > > > > > > > > > > > > > > > > > > > > With respect to synchrony, I agree with > > Konstantine's > > > > > > point, > > > > > > > > that > > > > > > > > > > we > > > > > > > > > > > > > > should make it an opt-in feature of making the > > > reporter > > > > > > only > > > > > > > > > > > > synchronous. > > > > > > > > > > > > > > At the same time, I believe it is important to > > > relieve > > > > as > > > > > > > much > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > burden of implementation as possible from the > > > developer > > > > > in > > > > > > > this > > > > > > > > > > case, > > > > > > > > > > > > and > > > > > > > > > > > > > > thus I think using a Callback rather than a > Future > > > > would > > > > > be > > > > > > > > > easier > > > > > > > > > > on > > > > > > > > > > > > the > > > > > > > > > > > > > > developer, while adding asynchronous > functionality > > > with > > > > > the > > > > > > > > > ability > > > > > > > > > > > to > > > > > > > > > > > > > > opt-in synchronous functionality. I also believe > > > making > > > > > it > > > > > > > > opt-in > > > > > > > > > > > > > > synchronous vs. the other way simplifies > > > implementation > > > > > for > > > > > > > the > > > > > > > > > > > > developer > > > > > > > > > > > > > > (blocking vs creating a new thread). > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please let me know your thoughts. I would like to > > > come > > > > > to a > > > > > > > > > > consensus > > > > > > > > > > > > > soon > > > > > > > > > > > > > > due to the AK 2.6 deadlines; I will then shortly > > > update > > > > > the > > > > > > > KIP > > > > > > > > > and > > > > > > > > > > > > > start a > > > > > > > > > > > > > > vote. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Aakash > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2020 at 2:24 PM Randall Hauch < > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> On Fri, May 15, 2020 at 3:13 PM Arjun Satish < > > > > > > > > > > > arjun.sat...@gmail.com> > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > Couple of thoughts: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > 1. If we add new parameters to put(..), and > new > > > > > > connectors > > > > > > > > > > > implement > > > > > > > > > > > > > >> only > > > > > > > > > > > > > >> > this method, it makes them backward > incompatible > > > > with > > > > > > > older > > > > > > > > > > > > workers. I > > > > > > > > > > > > > >> > think newer connectors may only choose to only > > > > > implement > > > > > > > the > > > > > > > > > > > latest > > > > > > > > > > > > > >> method, > > > > > > > > > > > > > >> > and we are passing the compatibility problems > > back > > > > to > > > > > > the > > > > > > > > > > > connector > > > > > > > > > > > > > >> > developers. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> New connectors would have to implement both if > > they > > > > want > > > > > > to > > > > > > > > run > > > > > > > > > in > > > > > > > > > > > > older > > > > > > > > > > > > > >> runtimes. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > 2. if we deprecate the older put() method and > > > > > eventually > > > > > > > > > remove > > > > > > > > > > > it, > > > > > > > > > > > > > then > > > > > > > > > > > > > >> > old connectors are forward incompatible. If we > > are > > > > not > > > > > > > going > > > > > > > > > to > > > > > > > > > > > > remove > > > > > > > > > > > > > >> it, > > > > > > > > > > > > > >> > then maybe we should not deprecate it? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> I don't think we'll ever remove deprecated > methods > > > -- > > > > > > > there's > > > > > > > > no > > > > > > > > > > > > reason > > > > > > > > > > > > > to > > > > > > > > > > > > > >> cut off older connectors. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > 3. if a record is realized to be erroneous > > outside > > > > > put() > > > > > > > > (say, > > > > > > > > > > in > > > > > > > > > > > > > flush > > > > > > > > > > > > > >> or > > > > > > > > > > > > > >> > preCommit), how will it be reported? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> This is a concern no matter how the reporter is > > > passed > > > > > to > > > > > > > the > > > > > > > > > > task. > > > > > > > > > > > > > >> Actually, I think it's more clear that the > > reporter > > > > > passed > > > > > > > > > through > > > > > > > > > > > > > >> `put(...)` should be used to record errors on > the > > > > > > > SinkRecords > > > > > > > > > > passed > > > > > > > > > > > > in > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> same method call. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > I do think the concern over aesthetics is an > > > > important > > > > > > > one, > > > > > > > > > but > > > > > > > > > > > the > > > > > > > > > > > > > >> > trade-off here is to exclude many connectors > > that > > > > are > > > > > > out > > > > > > > > > there > > > > > > > > > > > from > > > > > > > > > > > > > >> > running on worker versions. there may be > > > production > > > > > > > > > deployments > > > > > > > > > > > that > > > > > > > > > > > > > >> need > > > > > > > > > > > > > >> > one old and one new connector that now cannot > > work > > > > on > > > > > > any > > > > > > > > > > version > > > > > > > > > > > > of a > > > > > > > > > > > > > >> > single worker. Building connectors is complex, > > and > > > > > it's > > > > > > > > kinda > > > > > > > > > > > unfair > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > expect folks to make changes over aesthetic > > > reasons > > > > > > alone. > > > > > > > > > This > > > > > > > > > > is > > > > > > > > > > > > > >> probably > > > > > > > > > > > > > >> > the reason why popular framework APIs very > > rarely > > > > (and > > > > > > > > > probably > > > > > > > > > > > > never) > > > > > > > > > > > > > >> > change. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> I don't see how passing the reporter through an > > > > > overloaded > > > > > > > > > > > `put(...)` > > > > > > > > > > > > is > > > > > > > > > > > > > >> less backward compatible. Because the runtime > > > provides > > > > > the > > > > > > > > > > SinkTask > > > > > > > > > > > > base > > > > > > > > > > > > > >> class, the runtime has control over what the > > methods > > > > do > > > > > by > > > > > > > > > > default. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Overall, yes, the "public void > > > > > > > > > > > > > >> errantRecordReporter(BiConsumer<SinkRecord, > > > > > > > > > > > > > >> > Throwable> reporter) {}" proposal in the > > original > > > > KIP > > > > > is > > > > > > > > > > somewhat > > > > > > > > > > > > of a > > > > > > > > > > > > > >> > mouthful, but are there are any simpler > > > alternatives > > > > > > that > > > > > > > do > > > > > > > > > not > > > > > > > > > > > > > exclude > > > > > > > > > > > > > >> > existing connectors, adding operational > burdens > > > and > > > > > yet > > > > > > > > > provide > > > > > > > > > > a > > > > > > > > > > > > > clean > > > > > > > > > > > > > >> > contract? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> IMO, overloading `put(...)` is cleaner and > easier > > to > > > > > > > > understand > > > > > > > > > -- > > > > > > > > > > > > plus > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> other benefits in my earlier email. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Best, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > PS: Apologies if the language is incorrect or > > some > > > > > > points > > > > > > > > are > > > > > > > > > > > > unclear. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > On Fri, May 15, 2020 at 12:02 PM Randall > Hauch < > > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > On Fri, May 15, 2020 at 1:45 PM Konstantine > > > > > > Karantasis < > > > > > > > > > > > > > >> > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Thanks for the quick response Aakash. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > To your last point, modern APIs like this > > tend > > > > to > > > > > be > > > > > > > > > > > > asynchronous > > > > > > > > > > > > > >> (see > > > > > > > > > > > > > >> > > > admin, producer in Kafka) and such > > definition > > > > > > results > > > > > > > in > > > > > > > > > > more > > > > > > > > > > > > > >> > expressive > > > > > > > > > > > > > >> > > > and well defined APIs. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > +1 > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > What you describe is easily an opt-in > > feature > > > > for > > > > > > the > > > > > > > > > > > connector > > > > > > > > > > > > > >> > > developer. > > > > > > > > > > > > > >> > > > At the same time, the latest description > > > above, > > > > > > gives > > > > > > > us > > > > > > > > > > > better > > > > > > > > > > > > > >> chances > > > > > > > > > > > > > >> > > for > > > > > > > > > > > > > >> > > > this API to remain like this for longer, > > > because > > > > > it > > > > > > > > covers > > > > > > > > > > > both > > > > > > > > > > > > > the > > > > > > > > > > > > > >> > sync > > > > > > > > > > > > > >> > > > and async per `put` user cases. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > +1 > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Given how simple the sync implementation > > > > > > > > > > > > > >> > > > is, just by complying with the return type > > of > > > > the > > > > > > > > method, > > > > > > > > > I > > > > > > > > > > > > still > > > > > > > > > > > > > >> think > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > BiFunction definition that returns a > Future > > > > makes > > > > > > > sense. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Konstantine > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > On Fri, May 15, 2020 at 11:27 AM Aakash > > Shah < > > > > > > > > > > > > as...@confluent.io> > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > Thanks for the additional feedback. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > I see the benefits of adding an > overloaded > > > > > > put(...) > > > > > > > > over > > > > > > > > > > > > > >> alternatives > > > > > > > > > > > > > >> > > > and I > > > > > > > > > > > > > >> > > > > am on board going forward with this > > > approach. > > > > It > > > > > > > will > > > > > > > > > > > > definitely > > > > > > > > > > > > > >> set > > > > > > > > > > > > > >> > > > forth > > > > > > > > > > > > > >> > > > > a contract of where the reporter will be > > > used > > > > > with > > > > > > > > > better > > > > > > > > > > > > > >> aesthetics. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > The original idea of going with a > > > synchronous > > > > > > > approach > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > > >> error > > > > > > > > > > > > > >> > > > > reporter was to ease the connector > > > developer's > > > > > job > > > > > > > > > > > interacting > > > > > > > > > > > > > >> with > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > > > handling the error reporter. The > tradeoff > > > for > > > > > > > having a > > > > > > > > > > > > > >> > synchronous-only > > > > > > > > > > > > > >> > > > > reporter would be lower throughput on > the > > > > > > reporter; > > > > > > > > this > > > > > > > > > > was > > > > > > > > > > > > > >> thought > > > > > > > > > > > > > >> > to > > > > > > > > > > > > > >> > > > be > > > > > > > > > > > > > >> > > > > fine since arguably most circumstances > > would > > > > not > > > > > > > > include > > > > > > > > > > > > > >> consistently > > > > > > > > > > > > > >> > > > large > > > > > > > > > > > > > >> > > > > amounts of records being sent to the > error > > > > > > reporter. > > > > > > > > > Even > > > > > > > > > > if > > > > > > > > > > > > > this > > > > > > > > > > > > > >> was > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > case, an argument can be made that the > > lower > > > > > > > > throughput > > > > > > > > > > > would > > > > > > > > > > > > be > > > > > > > > > > > > > >> of > > > > > > > > > > > > > >> > > > > assistance in this case, as it would > allow > > > > more > > > > > > time > > > > > > > > for > > > > > > > > > > the > > > > > > > > > > > > > user > > > > > > > > > > > > > >> to > > > > > > > > > > > > > >> > > > > realize the connector is having records > > sent > > > > to > > > > > > the > > > > > > > > > error > > > > > > > > > > > > > reporter > > > > > > > > > > > > > >> > > before > > > > > > > > > > > > > >> > > > > many are sent. However, if we are > strongly > > > in > > > > > > favor > > > > > > > of > > > > > > > > > > > having > > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > option > > > > > > > > > > > > > >> > > > of > > > > > > > > > > > > > >> > > > > asynchronous functionality available for > > the > > > > > > > > developer, > > > > > > > > > > > then I > > > > > > > > > > > > > am > > > > > > > > > > > > > >> > fine > > > > > > > > > > > > > >> > > > with > > > > > > > > > > > > > >> > > > > that as well. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > Lastly, I am on board with changing the > > name > > > > to > > > > > > > > > > > > > >> failedRecordReporter, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > Please let me know your thoughts. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > Thanks, > > > > > > > > > > > > > >> > > > > Aakash > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > On Fri, May 15, 2020 at 9:10 AM Randall > > > Hauch > > > > < > > > > > > > > > > > > rha...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Konstantine said: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > I notice Randall also used > BiFunction > > in > > > > his > > > > > > > > > example, > > > > > > > > > > I > > > > > > > > > > > > > >> wonder if > > > > > > > > > > > > > >> > > > it's > > > > > > > > > > > > > >> > > > > > for > > > > > > > > > > > > > >> > > > > > > similar reasons. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Nope. Just a typo on my part. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > There appear to be three outstanding > > > > > questions. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > First, Konstantine suggested calling > > this > > > > > > > > > > > > > >> "failedRecordReporter". I > > > > > > > > > > > > > >> > > > think > > > > > > > > > > > > > >> > > > > > this is minor, but using this new name > > may > > > > be > > > > > a > > > > > > > bit > > > > > > > > > more > > > > > > > > > > > > > precise > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > > I'd > > > > > > > > > > > > > >> > > > > be > > > > > > > > > > > > > >> > > > > > fine with this. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Second, should the reporter method be > > > > > > > synchronous? I > > > > > > > > > > think > > > > > > > > > > > > the > > > > > > > > > > > > > >> two > > > > > > > > > > > > > >> > > > > options > > > > > > > > > > > > > >> > > > > > are: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > 2a. Use `BiConsumer<SinkRecord, > > > Throwable>` > > > > > that > > > > > > > > > returns > > > > > > > > > > > > > nothing > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > > > blocks > > > > > > > > > > > > > >> > > > > > (at this time). > > > > > > > > > > > > > >> > > > > > 2b. Use `BiFunction<SinkRecord, > > Throwable, > > > > > > > > > > Future<Void>>` > > > > > > > > > > > > that > > > > > > > > > > > > > >> > > returns > > > > > > > > > > > > > >> > > > a > > > > > > > > > > > > > >> > > > > > future that the user can optionally > use > > to > > > > be > > > > > > > > > > synchronous. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > I do agree with Konstantine that > option > > 2b > > > > > gives > > > > > > > us > > > > > > > > > more > > > > > > > > > > > > room > > > > > > > > > > > > > >> for > > > > > > > > > > > > > >> > > > future > > > > > > > > > > > > > >> > > > > > semantic changes, and since the > producer > > > > write > > > > > > is > > > > > > > > > > already > > > > > > > > > > > > > >> > > asynchronous > > > > > > > > > > > > > >> > > > > this > > > > > > > > > > > > > >> > > > > > should be straightforward to > implement. > > I > > > > > think > > > > > > > the > > > > > > > > > > > concern > > > > > > > > > > > > > >> here is > > > > > > > > > > > > > >> > > > that > > > > > > > > > > > > > >> > > > > if > > > > > > > > > > > > > >> > > > > > the sink task does not *use* the > future > > to > > > > > make > > > > > > > this > > > > > > > > > > > > > >> synchronous, > > > > > > > > > > > > > >> > it > > > > > > > > > > > > > >> > > is > > > > > > > > > > > > > >> > > > > > very possible that the error records > > could > > > > be > > > > > > > > written > > > > > > > > > > out > > > > > > > > > > > of > > > > > > > > > > > > > >> order > > > > > > > > > > > > > >> > > (due > > > > > > > > > > > > > >> > > > > to > > > > > > > > > > > > > >> > > > > > retries). But this won't be an issue > if > > > the > > > > > > > > > > implementation > > > > > > > > > > > > > uses > > > > > > > > > > > > > >> > > > > > > > `max.in.flight.requests.per.connection=1` > > > > for > > > > > > > > writing > > > > > > > > > > the > > > > > > > > > > > > > error > > > > > > > > > > > > > >> > > > records. > > > > > > > > > > > > > >> > > > > > It's a little less clear, but honestly > > IMO > > > > > > passing > > > > > > > > the > > > > > > > > > > > > > reporter > > > > > > > > > > > > > >> in > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > > `put(...)` method helps make this > lambda > > > > > easier > > > > > > to > > > > > > > > > > > > understand, > > > > > > > > > > > > > >> for > > > > > > > > > > > > > >> > > some > > > > > > > > > > > > > >> > > > > > strange reason. So unless there are > good > > > > > reasons > > > > > > > to > > > > > > > > > > avoid > > > > > > > > > > > > > this, > > > > > > > > > > > > > >> I'd > > > > > > > > > > > > > >> > > be > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > >> > > > > > favor of 2b and returning a Future. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Third, how do we pass the reporter > > lambda > > > / > > > > > > method > > > > > > > > > > > reference > > > > > > > > > > > > > to > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> > > > task? > > > > > > > > > > > > > >> > > > > > My proposal to pass the reporter via > an > > > > > overload > > > > > > > > > > > `put(...)` > > > > > > > > > > > > > >> still > > > > > > > > > > > > > >> > is > > > > > > > > > > > > > >> > > > the > > > > > > > > > > > > > >> > > > > > most attractive to me, for several > > > reasons: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > 3a. There's no need to pass the > reporter > > > > > > > separately > > > > > > > > > > *and* > > > > > > > > > > > to > > > > > > > > > > > > > >> > describe > > > > > > > > > > > > > >> > > > the > > > > > > > > > > > > > >> > > > > > changes in method call ordering. > > > > > > > > > > > > > >> > > > > > 3b. As mentioned above, for some > reason > > > > > passing > > > > > > it > > > > > > > > via > > > > > > > > > > > > > >> `put(...)` > > > > > > > > > > > > > >> > > makes > > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > > >> > > > > > intent more clear that it be used when > > > > > > processing > > > > > > > > the > > > > > > > > > > > > > >> SinkRecord, > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > > > that > > > > > > > > > > > > > >> > > > > > it shouldn't be used in `start(...)`, > > > > > > > > > `preCommit(...)`, > > > > > > > > > > > > > >> > > > > > `onPartitionsAssigned(...)`, or any of > > the > > > > > other > > > > > > > > task > > > > > > > > > > > > methods. > > > > > > > > > > > > > >> As > > > > > > > > > > > > > >> > > > Andrew > > > > > > > > > > > > > >> > > > > > pointed out earlier, *describing* this > > in > > > > the > > > > > > KIP > > > > > > > > and > > > > > > > > > in > > > > > > > > > > > > > JavaDoc > > > > > > > > > > > > > >> > will > > > > > > > > > > > > > >> > > > be > > > > > > > > > > > > > >> > > > > > tough to be exact yet succinct. > > > > > > > > > > > > > >> > > > > > 3c. There is already precedence for > > > evolving > > > > > > > > > > > > > >> > > > > > `SourceTask.commitRecord(...)`, and > the > > > > > pattern > > > > > > is > > > > > > > > > > > > identical. > > > > > > > > > > > > > >> > > > > > 3d. Backward compatibility is easy to > > > > > > understand, > > > > > > > > and > > > > > > > > > at > > > > > > > > > > > the > > > > > > > > > > > > > >> same > > > > > > > > > > > > > >> > > time > > > > > > > > > > > > > >> > > > > it's > > > > > > > > > > > > > >> > > > > > pretty easy to describe what > > > implementations > > > > > > that > > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > > take > > > > > > > > > > > > > >> > > > advantage > > > > > > > > > > > > > >> > > > > of > > > > > > > > > > > > > >> > > > > > this feature should do. > > > > > > > > > > > > > >> > > > > > 3e. Minimal changes to the interface: > > > we're > > > > > just > > > > > > > > > > *adding* > > > > > > > > > > > > one > > > > > > > > > > > > > >> > default > > > > > > > > > > > > > >> > > > > > method that calls the existing method > > and > > > > > > > > deprecating > > > > > > > > > > the > > > > > > > > > > > > > >> existing > > > > > > > > > > > > > >> > > > > > `put(...)`. > > > > > > > > > > > > > >> > > > > > 3f. Deprecating the existing > `put(...)` > > > > makes > > > > > it > > > > > > > > more > > > > > > > > > > > clear > > > > > > > > > > > > > in a > > > > > > > > > > > > > >> > > > > > programmatic sense that new sink > > > > > implementations > > > > > > > > > should > > > > > > > > > > > use > > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > > reporter, > > > > > > > > > > > > > >> > > > > > and that we recommend old sinks evolve > > to > > > > use > > > > > > it. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Some of these benefits apply to some > of > > > the > > > > > > other > > > > > > > > > > > > suggestions, > > > > > > > > > > > > > >> but > > > > > > > > > > > > > >> > I > > > > > > > > > > > > > >> > > > > think > > > > > > > > > > > > > >> > > > > > none of the other suggestions have all > > of > > > > > these > > > > > > > > > > benefits. > > > > > > > > > > > > For > > > > > > > > > > > > > >> > > example, > > > > > > > > > > > > > >> > > > > > overloading `initialize(...)` is more > > > > > difficult > > > > > > > > since > > > > > > > > > > most > > > > > > > > > > > > > sink > > > > > > > > > > > > > >> > > > > connectors > > > > > > > > > > > > > >> > > > > > don't override it and therefore would > be > > > > less > > > > > > > > subject > > > > > > > > > to > > > > > > > > > > > > > >> > deprecations > > > > > > > > > > > > > >> > > > > > warnings. Overloading `start(...)` is > > less > > > > > > > > attractive. > > > > > > > > > > > > Adding > > > > > > > > > > > > > a > > > > > > > > > > > > > >> > > method > > > > > > > > > > > > > >> > > > > IMO > > > > > > > > > > > > > >> > > > > > shares the fewest of these benefits. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > The one disadvantage of this approach > is > > > > that > > > > > > sink > > > > > > > > > task > > > > > > > > > > > > > >> > > implementations > > > > > > > > > > > > > >> > > > > > can't rely upon the reporter upon > > startup. > > > > IMO > > > > > > > > that's > > > > > > > > > an > > > > > > > > > > > > > >> acceptable > > > > > > > > > > > > > >> > > > > > tradeoff to get the cleaner and more > > > > explicit > > > > > > API, > > > > > > > > > > > > especially > > > > > > > > > > > > > if > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > >> > > > API > > > > > > > > > > > > > >> > > > > > contract is that Connect will pass the > > > same > > > > > > > reporter > > > > > > > > > > > > instance > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > each > > > > > > > > > > > > > >> > > > > call > > > > > > > > > > > > > >> > > > > > to `put(...)` on a single task > instance. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Best regards, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Randall > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > On Fri, May 15, 2020 at 6:59 AM Andrew > > > > > > Schofield < > > > > > > > > > > > > > >> > > > > > andrew_schofi...@live.com> > > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > Hi, > > > > > > > > > > > > > >> > > > > > > Randall's suggestion is really > good. I > > > > think > > > > > > it > > > > > > > > > gives > > > > > > > > > > > the > > > > > > > > > > > > > >> > > flexibility > > > > > > > > > > > > > >> > > > > > > required and also > > > > > > > > > > > > > >> > > > > > > keeps the interface the right way > > round. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > Thanks, > > > > > > > > > > > > > >> > > > > > > Andrew Schofield > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > On 15/05/2020, 02:07, "Aakash > Shah" < > > > > > > > > > > > as...@confluent.io> > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > Hi Randall, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 1. This is a great suggestion, > but I > > > > find > > > > > > that > > > > > > > > > > adding > > > > >