Hi Chris, Thanks for pointing that out, I hadn't realized that the SubmittedRecords class has almost exactly the same semantics needed for handling offset commits in the per-sink record ack API case. However, I agree that it isn't worth the tradeoff and we've already discussed the backward compatibility concerns imposed on connector developers if we were to consider deprecating / removing the preCommit hook in favor of a new ack-based API.
Thanks, Yash On Thu, Jun 29, 2023 at 7:31 PM Chris Egerton <chr...@aiven.io.invalid> wrote: > Hi Yash, > > Thanks for your continued work on this tricky feature. I have no further > comments or suggestions on the KIP and am ready to vote in favor of it. > > That said, I did want to quickly respond to this comment: > > > On a side note, this also means that the per sink record ack API > that was proposed earlier wouldn't really work for this case since Kafka > consumers themselves don't support per message acknowledgement semantics > (and any sort of manual book-keeping based on offset linearity in a topic > partition would be affected by things like log compaction, control records > for transactional use cases etc.) right? > > I believe we could still use the SubmittedRecords class [1] (with some > small tweaks) to track ack'd messages and the latest-committable offsets > per topic partition, without relying on assumptions about offsets for > consecutive records consumed from Kafka always differing by one. But at > this point I think that, although this approach does come with the > advantage of also enabling fine-grained metrics on record delivery to the > sink system, it's not worth the tradeoff in intuition since it's less clear > why users should prefer that API instead of using SinkTask::preCommit. > > [1] - > > https://github.com/apache/kafka/blob/12be344fdd3b20f338ccab87933b89049ce202a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java > > Cheers, > > Chris > > On Wed, Jun 21, 2023 at 9:46 AM Yash Mayya <yash.ma...@gmail.com> wrote: > > > Hi Chris, > > > > Firstly, thanks for sharing your detailed thoughts on this thorny issue! > > Point taken on Kafka Connect being a brownfield project and I guess we > > might just need to trade off elegant / "clean" interfaces for fixing this > > gap in functionality. Also, thanks for calling out all the existing > > cross-plugin interactions and also the fact that connectors are not and > > should not be developed in silos ignoring the rest of the ecosystem. That > > said, here are my thoughts: > > > > > we could replace these methods with headers that the > > > Connect runtime automatically injects into records directly > > > before dispatching them to SinkTask::put. > > > > Hm, that's an interesting idea to get around the need for connectors to > > handle potential 'NoSuchMethodError's in calls to > > SinkRecord::originalTopic/originalKafkaPartition/originalKafkaOffset. > > However, I'm inclined to agree that retrieving these values from the > record > > headers seems even less intuitive and I'm okay with adding this to the > > rejected alternatives list. > > > > > we can consider eliminating the overridden > > > SinkTask::open/close methods > > > > I tried to further explore the idea of keeping just the existing > > SinkTask::open / SinkTask::close methods but only calling them with > > post-transform topic partitions and ended up coming to the same > conclusion > > that you did earlier in this thread :) > > > > The overloaded SinkTask::open / SinkTask::close are currently the biggest > > sticking points with the latest iteration of this KIP and I'd prefer this > > elimination for now. The primary reasoning is that the information from > > open / close on pre-transform topic partitions can be combined with the > per > > record information of both pre-transform and post-transform topic > > partitions to handle most practical use cases without significantly > > muddying the sink connector related public interfaces. The argument that > > this makes it harder for sink connectors to deal with post-transform > topic > > partitions (i.e. in terms of grouping together or batching records for > > writing to the sink system) can be countered with the fact that it'll be > > similarly challenging even with the overloaded method approach of calling > > open / close with both pre-transform and post-transform topic partitions > > since the batching would be done on post-transform topic partitions > whereas > > offset tracking and reporting for commits would be done on pre-transform > > topic partitions (and the two won't necessarily serially advance in > > lockstep). On a side note, this also means that the per sink record ack > API > > that was proposed earlier wouldn't really work for this case since Kafka > > consumers themselves don't support per message acknowledgement semantics > > (and any sort of manual book-keeping based on offset linearity in a topic > > partition would be affected by things like log compaction, control > records > > for transactional use cases etc.) right? Overall, I think that the only > > benefit of the overloaded open / close methods approach is that the > > framework can enable the eventual closure of any post-transform topic > > partition based writers created by sink tasks using the heuristics we > > discussed earlier (via a cache with a time-based eviction policy) which > > doesn't seem worth it at this point. > > > > Thanks, > > Yash > > > > On Mon, May 22, 2023 at 7:30 PM Chris Egerton <chr...@aiven.io.invalid> > > wrote: > > > > > Hi Yash, > > > > > > I've been following the discussion and have some thoughts. Ultimately > I'm > > > still in favor of this KIP and would hate to see it go dormant, though > we > > > may end up settling for a less-invasive option. > > > > > > > > > On the topic of abstraction and inter-plugin interactions: > > > > > > First, there already are instances of cross-plugin interactions. > Logical > > > type handling is probably the biggest example: a source connector > embeds > > > metadata in the schema for record keys/values it emits that notifies > > > downstream converters about how to handle them. We provide support for > > some > > > logical types in Connect out of the box, but there's nothing stopping > > > connector and converter developers from implementing their own logical > > type > > > support using the exact same mechanism and different logical type > names, > > > which is already done by Debezium, to name one example. > > > > > > Second, although it's been a goal of Connect to abstract away parts of > > > building a data pipeline so that, e.g., connector developers don't have > > to > > > be concerned with converters or consumers, in reality, this layer of > > > abstraction has already been eroded. The example that most-readily > comes > > to > > > mind is how source tasks are notified of the offsets of records that > > > they've emitted after they've been published to Kafka via > > > SourceTask::commitRecord [1]. > > > > > > But, more importantly, it's unlikely that connectors are being > developed > > in > > > complete isolation. Nobody's going to implement the SinkConnector / > > > SinkTask interfaces and then throw that code off to someone else to > > figure > > > out all the details of deployment, configuration, testing, etc. > > Developers > > > will probably have to be aware of at least the converter interface, > some > > of > > > the available implementations of it, and some details of Kafka clients > > > (e.g., consumer groups for sink connectors). And this isn't a bad > > > thing--it's unlikely that someone will write a Kafka connector without > > > having or benefitting from some understanding of Kafka and the steps of > > the > > > data pipeline that it will be a part of. > > > > > > Bringing this to the practical topic of discussion--transformations--I > > > think it's actually in everyone's best interests for connector > developers > > > to be aware of transformations. This isn't just because of the specific > > > problem that the KIP is trying to address. It's because there's plenty > of > > > logic that can be implemented via SMT that a naive connector developer > > will > > > think that they have to implement on their own, which will ultimately > > lead > > > to a sub-par experience for people who end up using those connectors > due > > to > > > inconsistent semantics (especially lack of predicates), inconsistent > > > configuration syntax, increased chances for bugs, and FUD ("why wasn't > > this > > > implemented as an SMT?"). > > > > > > Finally, although preserving clean, composable interfaces that can be > > > understood in isolation is a great principle to start with, we are now > in > > > what Anna McDonald recently referred to as "brownfield" space for > > Connect. > > > We can't go back in time and redesign the SMT interface/contracts to > make > > > things cleaner. And I don't think it's fair to anyone to suddenly drop > > > support for SMTs that mutate t/p/o information for sink records, > > especially > > > since these can be used gainfully with plenty of existing sink > > connectors. > > > > > > Ultimately I still think the path forward that's best for the users is > to > > > make the impossible possible by addressing this long-standing API gap > in > > > Connect. Yes, it adds to the cognitive burden for connector developers, > > but > > > if they can tolerate it, the end result is better for everyone > involved, > > > and if they can't, it's likely that the end result will be a > preservation > > > of existing behavior, which leaves us no worse than before. > > > > > > > > > With all that said, I've thought about how to minimize or at least hide > > the > > > API changes as much as possible. I've had two thoughts: > > > > > > 1. On the > > > SinkRecord::originalTopic/originalKafkaPartition/originalKafkaOffset > > front, > > > we could replace these methods with headers that the Connect runtime > > > automatically injects into records directly before dispatching them to > > > SinkTask::put. The names can be the proposed method names (e.g., > > > "originalTopic"). I believe this is inferior to the current proposal > and > > > should be a rejected alternative, but it at least seemed worth floating > > in > > > the name of compromise. I dislike this approach for two reasons: first, > > it > > > seems even less intuitive, and second, it doesn't come with the benefit > > of > > > encouraging connector developers to understand the SMT interface and > take > > > it into account when designing connectors. > > > > > > 2. Although I'd hate to see the same bookkeeping logic implemented in > > > multiple connectors, we can consider eliminating the overridden > > > SinkTask::open/close methods. A note should be added to both methods > > > clarifying that they are only invoked with the original, pre-transform > > > topic partitions, and developers will be on their own if they want to > > deal > > > with post-transform topic partitions instead. I'm on the fence with > this > > > one, but if it's a choice between passing this KIP without modifying > > > SinkTask::open/close, or letting the KIP go dormant, I'd happily choose > > the > > > former. > > > > > > Thanks Yash and Greg for the discussion so far, and apologies for the > > wall > > > of text. Looking forward to your thoughts. > > > > > > Cheers, > > > > > > Chris > > > > > > [1] - > > > > > > > > > https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata) > > > > > > On Sun, Apr 23, 2023 at 11:20 AM Yash Mayya <yash.ma...@gmail.com> > > wrote: > > > > > > > Hi Greg, > > > > > > > > Thanks for the response and sorry for the late reply. > > > > > > > > > Currently the AK tests have a lot of calls to, for example, new > > > > > SinkRecord(String topic, int partition, Schema keySchema, > > > > > Object key, Schema valueSchema, Object value, long kafkaOffset) > > > > > , a constructor without the original T/P/O values. I assumed that > for > > > > > backwards compatibility these constructors would still be usable in > > > > > new runtimes. I imagine that there are also tests in downstream > > > projects > > > > > which make use of these constructors, whenever a Transform, > > Predicate, > > > > > or Task is tested without a corresponding Converter. My question > was > > > > > about what values are chosen for the original T/P/O methods when > > these > > > > > constructors are used after an upgrade to the latest connect-api. > > > > > > > > That's a good question - since this should only primarily affect > > testing > > > I > > > > think it should be acceptable to simply use the topic, partition and > > > > kafkaOffset values as the originalTopic, originalKafkaPartition > > > > and originalKafkaOffset? > > > > > > > > > If you inject the original T/P/O only before and after the chain, > > SMTs > > > > > after an SMT which changes the original T/P/O will see whatever the > > > > earlier > > > > > SMT emitted. Is this intentional, or should this be avoided? > > > > > > > > Hmm, this sounds like a misbehaving / badly implemented SMTs since > > there > > > > doesn't seem to be any reasonable situation where an SMT should > modify > > a > > > > sink record's original topic / partition / offset data so I'm not in > > > favor > > > > of introducing checks and guards in the framework for this. > > > > > > > > Another point that I've been pondering about is the one you raised > > about > > > > the composability of Connect's plugin ecosystem and the special case > > > > handling we're adding to sink connector plugins to work with certain > > > > transformation plugin types. This really doesn't seem like a good > > > precedent > > > > to be setting / starting (since there don't seem to be any other such > > > > "snowflake" inter-plugin interactions) in my opinion. The alternative > > of > > > > completely managing this in the framework (and only exposing the > > virtual > > > > coordinates to the sink tasks) doesn't seem too appealing either due > to > > > the > > > > backward compatibility concerns while maintaining existing support > and > > > > functionality such as the possibility of implementing exactly-once > > > > semantics, ability for tasks to rewind consumer offsets arbitrarily > > > (which > > > > might require the introduction of some form of persistence for the > > > physical > > > > <-> virtual coordinate mapping) etc. Unfortunately, even though this > > is a > > > > long standing problem that all of us want to fix, I'm considering > > moving > > > > this KIP into a dormant / inactive state since there doesn't seem to > > be a > > > > design that satisfies all the general principles that the Kafka > Connect > > > > framework has striven to uphold. > > > > > > > > Thanks, > > > > Yash > > > > > > > > On Tue, Mar 14, 2023 at 3:31 AM Greg Harris > > <greg.har...@aiven.io.invalid > > > > > > > > wrote: > > > > > > > > > Yash, > > > > > > > > > > > 'm not sure I follow - are you asking about how the tests will be > > > > updated > > > > > post this change or about how upgrades will look like for clusters > in > > > > > production? > > > > > > > > > > Currently the AK tests have a lot of calls to, for example, new > > > > > SinkRecord(String topic, int partition, Schema keySchema, Object > key, > > > > > Schema valueSchema, Object value, long kafkaOffset), a constructor > > > > without > > > > > the original T/P/O values. I assumed that for backwards > compatibility > > > > these > > > > > constructors would still be usable in new runtimes. > > > > > I imagine that there are also tests in downstream projects which > make > > > use > > > > > of these constructors, whenever a Transform, Predicate, or Task is > > > tested > > > > > without a corresponding Converter. My question was about what > values > > > are > > > > > chosen for the original T/P/O methods when these constructors are > > used > > > > > after an upgrade to the latest connect-api. > > > > > > > > > > > There shouldn't be any difference in behavior here - the > framework > > > will > > > > > add > > > > > the original T/P/O metadata to the record after the entire > > > transformation > > > > > chain has been applied and just before sending the record to the > task > > > for > > > > > processing. The KIP doesn't propose that transformations themselves > > > > should > > > > > also be able to retrieve original T/P/O information for a sink > > record. > > > > > > > > > > The KIP includes this: "Note that while the record's offset can't > be > > > > > modified via the standard SinkRecord::newRecord methods that SMTs > are > > > > > expected to use, SinkRecord has public constructors that would > allow > > > SMTs > > > > > to return records with modified offsets. This is why the proposed > > > changes > > > > > include a new SinkRecord::originalKafkaOffset method as well." > > > > > In order to use the new or old SinkRecord constructors outside of > the > > > > > newRecord methods, SMTs will downcast the previous record and may > > > access > > > > > the original T/P/O methods. They may or may not forward this to the > > > next > > > > > SMT, and they may or may not use it in their own computation. > > > > > Since this is acknowledged as a possible implementation, I was just > > > > asking > > > > > about when one SMT changes the original T/P/O, what should later > SMTs > > > and > > > > > predicates see from the original T/P/O methods? > > > > > If you inject the original T/P/O only before and after the chain, > > SMTs > > > > > after an SMT which changes the original T/P/O will see whatever the > > > > earlier > > > > > SMT emitted. Is this intentional, or should this be avoided? > > > > > For existing SMTs use the SinkRecord constructor, either directly > or > > > via > > > > > subclasses of ConnectRecord, they will drop the original T/P/O and > > fall > > > > > back to the logic from question (1). > > > > > > > > > > > The rejected alternative basically says that we can't do a > > > > > deterministic mapping from virtual coordinates to physical > > coordinates > > > > > without doing a lot of book-keeping. > > > > > > > > > > I suppose there is a possible implementation of metadata > book-keeping > > > > which > > > > > provides a reasonable system of virtual coordinates, it just ended > up > > > > > equivalent to hydrating intermediate topics to compute a consistent > > > > record > > > > > ordering. I wasn't convinced by calling it "book-keeping" since > i've > > > seen > > > > > that phrase used to disregard much less complicated state > management, > > > and > > > > > had to see exactly where that solution becomes unreasonable. > > > > > > > > > > Thanks, > > > > > Greg > > > > > > > > > > On Sun, Mar 12, 2023 at 6:30 AM Yash Mayya <yash.ma...@gmail.com> > > > wrote: > > > > > > > > > > > Hi Greg, > > > > > > > > > > > > Thanks for the detailed review! > > > > > > > > > > > > > What is the expected state/behavior for SinkRecords > > > > > > > which do not have original T/P/O information after the > > > > > > > upgrade? Just browsing, it appears that tests make > > > > > > > extensive use of the existing public SinkRecord > > > > > > > constructors for both Transformations and Connectors. > > > > > > > > > > > > I'm not sure I follow - are you asking about how the tests will > be > > > > > updated > > > > > > post this change or about how upgrades will look like for > clusters > > in > > > > > > production? For the latter, we won't have to worry about sink > > records > > > > > > without original T/P/O information at all once a cluster is fully > > > > rolled > > > > > > and we will make it (hopefully) abundantly clear that connectors > > need > > > > to > > > > > > account for missing original T/P/O getter methods if they expect > to > > > be > > > > > > deployed on older Connect runtimes. > > > > > > > > > > > > > What is the expected behavior for Transformation > > > > > > > implementations which do not use the newRecord > > > > > > > methods and instead use public SinkRecord constructors? > > > > > > > The KIP mentions this as a justification for the > > > > > > > originalKafkaOffset method, but if existing implementations > > > > > > > are using the existing constructors, those constructors won't > > > > > > > forward the original T/P/O information to later transforms or > > > > > > > the task. > > > > > > > > > > > > There shouldn't be any difference in behavior here - the > framework > > > will > > > > > add > > > > > > the original T/P/O metadata to the record after the entire > > > > transformation > > > > > > chain has been applied and just before sending the record to the > > task > > > > for > > > > > > processing. The KIP doesn't propose that transformations > themselves > > > > > should > > > > > > also be able to retrieve original T/P/O information for a sink > > > record. > > > > > > > > > > > > > This reasoning and the KIP design seems to imply that the > > > > > > > connector is better equipped to solve this problem than the > > > > > > > framework, but the stated reasons are not convincing for me. > > > > > > > > > > > > This was added to the KIP by the original author, but I don't > think > > > the > > > > > > intention was to imply that the connector is better equipped to > > solve > > > > > this > > > > > > problem than the framework. The intention is to provide complete > > > > > > information to the connector ("physical" and "virtual > coordinates" > > > > > instead > > > > > > of the currently incomplete "virtual coordinates" as you've > termed > > > it) > > > > so > > > > > > that connectors can use the virtual coordinates for writing data > to > > > the > > > > > > sink system and physical coordinates for offset reporting back to > > the > > > > > > framework. The rejected alternative basically says that we can't > > do a > > > > > > deterministic mapping from virtual coordinates to physical > > > coordinates > > > > > > without doing a lot of book-keeping. > > > > > > > > > > > > I agree with the rest of your analysis on the tradeoffs between > the > > > > > > proposed approach versus the seemingly more attractive approach > of > > > > > handling > > > > > > everything purely in the framework and only exposing "virtual > > > > > coordinates" > > > > > > to the connectors. I think the biggest thorn here is maintaining > > > > backward > > > > > > compatibility with the considerable ecosystem of existing > > connectors > > > > > which > > > > > > is something Connect has always been burdened by. > > > > > > > > > > > > Thanks, > > > > > > Yash > > > > > > > > > > > > On Wed, Mar 8, 2023 at 6:54 AM Greg Harris > > > > <greg.har...@aiven.io.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > I always use this issue as an example of a bug being caused by > > > design > > > > > > > rather than by implementation error, and once it's fixed I'll > > need > > > to > > > > > > find > > > > > > > something else to talk about :) > > > > > > > So glad to see this get fixed! > > > > > > > > > > > > > > I'll chime in to support some of the earlier discussions that > > seem > > > to > > > > > > have > > > > > > > been resolved: > > > > > > > > > > > > > > 1. With respect to SinkRecord methods vs an overloaded put(): I > > > agree > > > > > > with > > > > > > > the current design but I justify it a little bit differently > than > > > has > > > > > > > already been discussed. > > > > > > > If we were designing this interface on day 1 without backwards > > > > > > > compatibility in mind, which design would make more sense? Or > > for a > > > > > > > different framing: In the future when old runtimes and > connectors > > > are > > > > > > > retired and the old interfaces are removed, which design is > going > > > to > > > > > look > > > > > > > more strange and unmotivated? > > > > > > > Applied to this design decision, I would say that the original > > > T/P/O > > > > > are > > > > > > > properties of a single SinkRecord and make sense as getters, > and > > it > > > > > would > > > > > > > be strange to store them in an auxiliary map. > > > > > > > > > > > > > > 2. Following up this change with a compatibility library to > make > > > the > > > > > > > interface easier to use is the right choice to make here. This > > > change > > > > > > > should be focused on correctness in allowing developers to fix > > the > > > > > > > incompatibility and we can be concerned with coming up with a > > more > > > > > > > ergonomic solution in the compatibility library. > > > > > > > The API should be focused on generality, correctness, and > > > performance > > > > > > > because those cannot be worked-around after the fact. Connector > > > > > > > implementations and/or libraries can be concerned with trading > > off > > > > some > > > > > > > generality and/or performance for ease-of-use. > > > > > > > > > > > > > > 3. I think that the difference in behavior of the new > open/close > > > > > methods > > > > > > as > > > > > > > compared to the old methods is significant, and requires good > > > > > > documentation > > > > > > > to help connector developers avoid lazy and incorrect > > migrations. I > > > > am > > > > > > > happy to have that addressed in code review after the KIP is > > > > approved. > > > > > > > > > > > > > > I had some questions: > > > > > > > > > > > > > > 4. What is the expected state/behavior for SinkRecords which do > > not > > > > > have > > > > > > > original T/P/O information after the upgrade? Just browsing, it > > > > appears > > > > > > > that tests make extensive use of the existing public SinkRecord > > > > > > > constructors for both Transformations and Connectors. > > > > > > > > > > > > > > 5. What is the expected behavior for Transformation > > implementations > > > > > which > > > > > > > do not use the newRecord methods and instead use public > > SinkRecord > > > > > > > constructors? The KIP mentions this as a justification for the > > > > > > > originalKafkaOffset method, but if existing implementations are > > > using > > > > > the > > > > > > > existing constructors, those constructors won't forward the > > > original > > > > > > T/P/O > > > > > > > information to later transforms or the task. > > > > > > > > > > > > > > For the last few points, I want to discuss this rejected > > > alternative: > > > > > > > > > > > > > > > Address the offsets problem entirely within the framework, > > doing > > > > some > > > > > > > kind of mapping from the transformed topic back to the original > > > > topic. > > > > > > > > * This would only work in the cases where there’s no overlap > > > > between > > > > > > the > > > > > > > transformed topic names, but would break for the rest of the > > > > > > > transformations (e.g. static transformation, topic = “a”). > > > > > > > > * Even if we wanted to limit the support to those cases, it > > would > > > > > > require > > > > > > > considerable bookkeeping to add a validation to verify that the > > > > > > > transformation chain adheres to that expectation (and fail fast > > if > > > it > > > > > > > doesn’t). > > > > > > > > > > > > > > 6. This reasoning and the KIP design seems to imply that the > > > > connector > > > > > is > > > > > > > better equipped to solve this problem than the framework, but > the > > > > > stated > > > > > > > reasons are not convincing for me. > > > > > > > * A static transformation still causes an offset collision in > the > > > > > > connector > > > > > > > * The connector is not permitted to see the transformation > chain > > to > > > > do > > > > > > any > > > > > > > fail-fast assertions > > > > > > > > > > > > > > Suppose we were to think of the records at the end of the > > > > > transformation > > > > > > > chain as being in "virtual partitions" with "virtual offsets". > > > > > > > For example, with identity-routing SMTs, the virtual > coordinates > > > are > > > > > > > exactly the same as the underlying physical coordinates. For > 1-1 > > > > > renames, > > > > > > > each virtual topic would be the renamed topic corresponding to > > the > > > > > > > underlying topic. For fan-out from one topic to multiple > virtual > > > > > topics, > > > > > > > virtual offsets would use the underlying kafka offsets with > gaps > > > for > > > > > > > records going to other virtual partitions. Virtual topics with > > > > dropped > > > > > > > records have similar gaps in the offsets. > > > > > > > Currently, these virtual coordinates are passed into the > > connector > > > > via > > > > > > > SinkTask::put, but SinkTask::open/close/preCommit and > > > > > > > SinkTaskContext::assignment/offsets/pause/resume all use > physical > > > > > > > coordinates. > > > > > > > This proposal patches put,open, and close to have both physical > > and > > > > > > virtual > > > > > > > coordinates, but leaves the other methods with physical > > > coordinates. > > > > > > After > > > > > > > this proposal, connectors would be intentionally made aware of > > the > > > > > > > distinction between physical and virtual coordinates, and > manage > > > > their > > > > > > own > > > > > > > bookkeeping for the two systems. > > > > > > > > > > > > > > To avoid that connector logic, we could use virtual coordinates > > in > > > > all > > > > > > > connector calls, never revealing that they are different from > the > > > > > > physical > > > > > > > coordinates. There's a whole design shopping list that we'd > need: > > > > > > > * Renumbering mechanism for disambiguating and making virtual > > > offsets > > > > > > > monotonic in the case of topic/partition collisions > > > > > > > * Data structure and strategy for translating virtual offsets > > back > > > to > > > > > > > physical offsets > > > > > > > * New limits on SinkTaskContext::offsets() calls to prevent > > > rewinding > > > > > > > before the latest commit > > > > > > > * Backwards compatibility and upgrade design > > > > > > > > > > > > > > 7. This alternative was very appealing to me, because the > > strength > > > > of a > > > > > > > plugin framework is the composability of different components. > > > Among > > > > a > > > > > > > collection of N connectors and M transforms, it should ideally > > only > > > > > take > > > > > > > N + M work to understand how the components combine to build > the > > > > whole. > > > > > > > However, once you start adding special cases to some plugins to > > > > support > > > > > > > interactions with others, the whole system can take N * M work > to > > > > > > > understand. From a complexity standpoint, it would be very good > > for > > > > the > > > > > > > framework to solve this in a way which was connector-agnostic. > > > > > > > The current design compromises the logical isolation of the > > plugins > > > > > > > slightly, but they can collapse offsets very > memory-efficiently, > > > and > > > > > > re-use > > > > > > > the existing raw coordinate functions and keep everything else > > > > > backwards > > > > > > > compatible. After deriving all of the above, I think that's a > > > > > reasonable > > > > > > > tradeoff to make. > > > > > > > > > > > > > > Thanks, > > > > > > > Greg > > > > > > > > > > > > > > On Tue, Feb 21, 2023 at 10:17 AM Chris Egerton > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > We'll probably want to make a few tweaks to the Javadocs for > > the > > > > new > > > > > > > > methods (I'm imagining that notes on compatibility with older > > > > > versions > > > > > > > will > > > > > > > > be required), but I believe what's proposed in the KIP is > good > > > > enough > > > > > > to > > > > > > > > approve with the understanding that it may not exactly match > > what > > > > > gets > > > > > > > > implemented/merged. > > > > > > > > > > > > > > > > LGTM, thanks again for the KIP! > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > On Tue, Feb 21, 2023 at 12:18 PM Yash Mayya < > > > yash.ma...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > we might try to introduce a framework-level configuration > > > > > > > > > > property to dictate which of the pre-transform and > > > > post-transform > > > > > > > > > > topic partitions are used for the fallback call to the > > > > single-arg > > > > > > > > > > variant if a task class has not overridden the multi-arg > > > > variant > > > > > > > > > > > > > > > > > > Thanks for the explanation and I agree that this will be a > > tad > > > > bit > > > > > > too > > > > > > > > > convoluted. :) > > > > > > > > > > > > > > > > > > Please do let me know if you'd like any further amendments > to > > > the > > > > > > KIP! > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > On Tue, Feb 21, 2023 at 8:42 PM Chris Egerton > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > I think the use case for pre-transform TPO coordinates > (and > > > > topic > > > > > > > > > partition > > > > > > > > > > writers created/destroyed in close/open) tends to boil > down > > > to > > > > > > > > > exactly-once > > > > > > > > > > semantics, where it's desirable to preserve the > guarantees > > > that > > > > > > Kafka > > > > > > > > > > provides (every record has a unique TPO trio, and records > > are > > > > > > ordered > > > > > > > > by > > > > > > > > > > offset within a topic partition). > > > > > > > > > > > > > > > > > > > > It's my understanding that this approach is utilized in > > > several > > > > > > > > > connectors > > > > > > > > > > out there today, and it might break these connectors to > > start > > > > > using > > > > > > > the > > > > > > > > > > post-transform topic partitions automatically in their > > > > open/close > > > > > > > > > methods. > > > > > > > > > > > > > > > > > > > > If we want to get really fancy with this and try to > obviate > > > or > > > > at > > > > > > > least > > > > > > > > > > reduce the need for per-connector code changes, we might > > try > > > to > > > > > > > > > introduce a > > > > > > > > > > framework-level configuration property to dictate which > of > > > the > > > > > > > > > > pre-transform and post-transform topic partitions are > used > > > for > > > > > the > > > > > > > > > fallback > > > > > > > > > > call to the single-arg variant if a task class has not > > > > overridden > > > > > > the > > > > > > > > > > multi-arg variant. But I think this is going a bit too > far > > > and > > > > > > would > > > > > > > > > prefer > > > > > > > > > > to keep things simple(r) for now. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya < > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > > I was actually envisioning something like `void > > > > > > > > > > > > open(Collection<TopicPartition> originalPartitions, > > > > > > > > > > > > Collection<TopicPartition> transformedPartitions)` > > > > > > > > > > > > > > > > > > > > > > Ah okay, this does make a lot more sense. Sorry, I > think > > I > > > > > > > > > misunderstood > > > > > > > > > > > you earlier. I do agree with you that this seems better > > > than > > > > > > > > splitting > > > > > > > > > it > > > > > > > > > > > off into two new sets of open / close methods from a > > > > complexity > > > > > > > > > > standpoint. > > > > > > > > > > > > > > > > > > > > > > > Plus, if a connector is intentionally designed to use > > > > > > > > > > > > pre-transformation topic partitions in its open/close > > > > > > > > > > > > methods, wouldn't we just be trading one form of the > > > > > > > > > > > > problem for another by making this switch? > > > > > > > > > > > > > > > > > > > > > > On thinking about this a bit more, I'm not so convinced > > > that > > > > we > > > > > > > need > > > > > > > > to > > > > > > > > > > > expose the pre-transform / original topic partitions in > > the > > > > new > > > > > > > open > > > > > > > > / > > > > > > > > > > > close methods. The purpose of the open / close methods > is > > > to > > > > > > allow > > > > > > > > sink > > > > > > > > > > > tasks to allocate and deallocate resources for each > topic > > > > > > partition > > > > > > > > > > > assigned to the task and the purpose of topic-mutating > > SMTs > > > > is > > > > > to > > > > > > > > > > > essentially modify the source topic name from the point > > of > > > > view > > > > > > of > > > > > > > > the > > > > > > > > > > sink > > > > > > > > > > > connector. Why would a sink connector ever need to or > > want > > > to > > > > > > > > allocate > > > > > > > > > > > resources for pre-transform topic partitions? Is the > > > argument > > > > > > here > > > > > > > > that > > > > > > > > > > > since we'll be exposing both the pre-transform and > > > > > post-transform > > > > > > > > topic > > > > > > > > > > > partitions per record, we should also expose the same > > info > > > > via > > > > > > > open / > > > > > > > > > > close > > > > > > > > > > > and allow sink connector implementations to disregard > > > > > > > topic-mutating > > > > > > > > > SMTs > > > > > > > > > > > completely if they wanted to? > > > > > > > > > > > > > > > > > > > > > > Either way, I've gone ahead and updated the KIP to > > reflect > > > > all > > > > > of > > > > > > > > > > > our previous discussion here since it had become quite > > > > > outdated. > > > > > > > I've > > > > > > > > > > also > > > > > > > > > > > updated the KIP title from "Sink Connectors: Support > > > > > > topic-mutating > > > > > > > > > SMTs > > > > > > > > > > > for async connectors (preCommit users)" to "Allow sink > > > > > connectors > > > > > > > to > > > > > > > > be > > > > > > > > > > > used with topic-mutating SMTs" since the improvements > to > > > the > > > > > > open / > > > > > > > > > close > > > > > > > > > > > mechanism doesn't pertain only to asynchronous sink > > > > connectors. > > > > > > The > > > > > > > > new > > > > > > > > > > KIP > > > > > > > > > > > URL is: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > > > > > I was actually envisioning something like `void > > > > > > > > > > > > open(Collection<TopicPartition> > > > > > > > > > > > > originalPartitions, Collection<TopicPartition> > > > > > > > > > transformedPartitions)`, > > > > > > > > > > > > since we already convert and transform each batch of > > > > records > > > > > > that > > > > > > > > we > > > > > > > > > > poll > > > > > > > > > > > > from the sink task's consumer en masse, meaning we > > could > > > > > > discover > > > > > > > > > > several > > > > > > > > > > > > new transformed partitions in between consecutive > calls > > > to > > > > > > > > > > SinkTask::put. > > > > > > > > > > > > > > > > > > > > > > > > It's also worth noting that we'll probably want to > > > > deprecate > > > > > > the > > > > > > > > > > existing > > > > > > > > > > > > open/close methods, at which point keeping one > > > > non-deprecated > > > > > > > > variant > > > > > > > > > > of > > > > > > > > > > > > each seems more appealing and less complex than > keeping > > > > two. > > > > > > > > > > > > > > > > > > > > > > > > Honestly though, I think we're both on the same page > > > enough > > > > > > that > > > > > > > I > > > > > > > > > > > wouldn't > > > > > > > > > > > > object to either approach. We've probably reached the > > > > > > saturation > > > > > > > > > point > > > > > > > > > > > for > > > > > > > > > > > > ROI here and as long as we provide developers a way > to > > > get > > > > > the > > > > > > > > > > > information > > > > > > > > > > > > they need from the runtime and take care to add > > Javadocs > > > > and > > > > > > > update > > > > > > > > > our > > > > > > > > > > > > docs page (possibly including the connector > development > > > > > > > > quickstart), > > > > > > > > > it > > > > > > > > > > > > should be fine. > > > > > > > > > > > > > > > > > > > > > > > > At this point, it might be worth updating the KIP > based > > > on > > > > > > recent > > > > > > > > > > > > discussion so that others can see the latest > proposal, > > > and > > > > we > > > > > > can > > > > > > > > > both > > > > > > > > > > > take > > > > > > > > > > > > a look and make sure everything looks good enough > > before > > > > > > opening > > > > > > > a > > > > > > > > > vote > > > > > > > > > > > > thread. > > > > > > > > > > > > > > > > > > > > > > > > Finally, I think you make a convincing case for a > > > > time-based > > > > > > > > eviction > > > > > > > > > > > > policy. I wasn't thinking about the fairly common SMT > > > > pattern > > > > > > of > > > > > > > > > > > deriving a > > > > > > > > > > > > topic name from, e.g., a record field or header. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya < > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > > > > > > Plus, if a connector is intentionally designed to > > > > > > > > > > > > > > use pre-transformation topic partitions in its > > > > > > > > > > > > > > open/close methods, wouldn't we just be trading > > > > > > > > > > > > > > one form of the problem for another by making > this > > > > > > > > > > > > > > switch? > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, this makes sense, and given that the KIP > > > already > > > > > > > > proposes a > > > > > > > > > > way > > > > > > > > > > > > for > > > > > > > > > > > > > sink connector implementations to distinguish > between > > > > > > > > pre-transform > > > > > > > > > > and > > > > > > > > > > > > > post-transform topics per record, I think I'm > > convinced > > > > > that > > > > > > > > going > > > > > > > > > > with > > > > > > > > > > > > new > > > > > > > > > > > > > `open()` / `close()` methods is the right approach. > > > > > However, > > > > > > I > > > > > > > > > still > > > > > > > > > > > feel > > > > > > > > > > > > > like having overloaded methods will make it a lot > > less > > > > > > > > unintuitive > > > > > > > > > > > given > > > > > > > > > > > > > that the two sets of methods would be different in > > > terms > > > > of > > > > > > > when > > > > > > > > > > > they're > > > > > > > > > > > > > called and what arguments they are passed (also I'm > > > > > presuming > > > > > > > > that > > > > > > > > > > the > > > > > > > > > > > > > overloaded methods you're prescribing will only > have > > a > > > > > single > > > > > > > > > > > > > `TopicPartition` rather than a > > > > `Collection<TopicPartition>` > > > > > > as > > > > > > > > > their > > > > > > > > > > > > > parameters). I guess my concern is largely around > the > > > > fact > > > > > > that > > > > > > > > it > > > > > > > > > > > won't > > > > > > > > > > > > be > > > > > > > > > > > > > possible to distinguish between the overloaded > > methods' > > > > use > > > > > > > cases > > > > > > > > > > just > > > > > > > > > > > > from > > > > > > > > > > > > > the method signatures. I agree that naming is going > > to > > > be > > > > > > > > difficult > > > > > > > > > > > here, > > > > > > > > > > > > > but I think that having two sets of > > > `SinkTask::openXyz` / > > > > > > > > > > > > > `SinkTask::closeXyz` methods will be less > complicated > > > to > > > > > > > > understand > > > > > > > > > > > from > > > > > > > > > > > > a > > > > > > > > > > > > > connector developer perspective (as compared to > > > > overloaded > > > > > > > > methods > > > > > > > > > > with > > > > > > > > > > > > > only differing documentation). Of your suggested > > > > options, I > > > > > > > think > > > > > > > > > > > > > `openPreTransform` / `openPostTransform` are the > most > > > > > > > > > comprehensible > > > > > > > > > > > > ones. > > > > > > > > > > > > > > > > > > > > > > > > > > > BTW, I wouldn't say that we can't make > assumptions > > > > > > > > > > > > > > about the relationships between pre- and > > > > > > post-transformation > > > > > > > > > > > > > > topic partitions. > > > > > > > > > > > > > > > > > > > > > > > > > > I meant that the framework wouldn't be able to > > > > > > > deterministically > > > > > > > > > know > > > > > > > > > > > > when > > > > > > > > > > > > > to close a post-transform topic partition given > that > > > SMTs > > > > > > could > > > > > > > > use > > > > > > > > > > > > > per-record data / metadata to manipulate the topic > > > names > > > > as > > > > > > and > > > > > > > > how > > > > > > > > > > > > > required (which supports the suggestion to use an > > > > eviction > > > > > > > policy > > > > > > > > > > based > > > > > > > > > > > > > mechanism to call SinkTask::close for > post-transform > > > > topic > > > > > > > > > > partitions). > > > > > > > > > > > > > > > > > > > > > > > > > > > We might utilize a policy that assumes a > > > deterministic > > > > > > > > > > > > > > mapping from the former to the latter, for > example. > > > > > > > > > > > > > > > > > > > > > > > > > > Wouldn't this be making the assumption that SMTs > only > > > use > > > > > the > > > > > > > > topic > > > > > > > > > > > name > > > > > > > > > > > > > itself and no other data / metadata while computing > > the > > > > new > > > > > > > topic > > > > > > > > > > name? > > > > > > > > > > > > Are > > > > > > > > > > > > > you suggesting that since this assumption could > work > > > for > > > > a > > > > > > > > majority > > > > > > > > > > of > > > > > > > > > > > > > SMTs, it might be more efficient overall in terms > of > > > > > reducing > > > > > > > the > > > > > > > > > > > number > > > > > > > > > > > > of > > > > > > > > > > > > > "false-positive" calls to > > > `SinkTask::closePostTransform` > > > > > (and > > > > > > > > we'll > > > > > > > > > > > also > > > > > > > > > > > > be > > > > > > > > > > > > > able to call `SinkTask::closePostTransform` > > immediately > > > > > after > > > > > > > > topic > > > > > > > > > > > > > partitions are revoked from the consumer)? I was > > > thinking > > > > > > > > something > > > > > > > > > > > more > > > > > > > > > > > > > generic along the lines of a simple time based > > eviction > > > > > > policy > > > > > > > > that > > > > > > > > > > > > > wouldn't be making any assumptions regarding the > SMT > > > > > > > > > implementations. > > > > > > > > > > > > > Either way, I do like your earlier suggestion of > > > keeping > > > > > this > > > > > > > > logic > > > > > > > > > > > > > internal and not painting ourselves into a corner > by > > > > > > promising > > > > > > > > any > > > > > > > > > > > > > particular behavior in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think the key difference between adding > > > > > methods/overloads > > > > > > > > > related > > > > > > > > > > > to > > > > > > > > > > > > > > SinkTask::open/SinkTask::close and SinkTask::put > is > > > > that > > > > > > this > > > > > > > > > isn't > > > > > > > > > > > > > > auxiliary information that may or may not be > useful > > > to > > > > > > > > connector > > > > > > > > > > > > > > developers. It's actually critical for them to > > > > understand > > > > > > the > > > > > > > > > > > > difference > > > > > > > > > > > > > > between the two concepts here, even if they look > > very > > > > > > > similar. > > > > > > > > > And > > > > > > > > > > > > yes, I > > > > > > > > > > > > > > do believe that switching from pre-transform to > > > > > > > post-transform > > > > > > > > > > topic > > > > > > > > > > > > > > partitions is too big a change in behavior here. > > > Plus, > > > > > if a > > > > > > > > > > connector > > > > > > > > > > > > is > > > > > > > > > > > > > > intentionally designed to use pre-transformation > > > topic > > > > > > > > partitions > > > > > > > > > > in > > > > > > > > > > > > its > > > > > > > > > > > > > > open/close methods, wouldn't we just be trading > one > > > > form > > > > > of > > > > > > > the > > > > > > > > > > > problem > > > > > > > > > > > > > for > > > > > > > > > > > > > > another by making this switch? > > > > > > > > > > > > > > > > > > > > > > > > > > > > One possible alternative to overloading the > > existing > > > > > > methods > > > > > > > is > > > > > > > > > to > > > > > > > > > > > > split > > > > > > > > > > > > > > SinkTask::open into openOriginal (or possibly > > > > > openPhysical > > > > > > or > > > > > > > > > > > > > > openPreTransform) and openTransformed (or > > openLogical > > > > or > > > > > > > > > > > > > > openPostTransform), with a similar change for > > > > > > > SinkTask::close. > > > > > > > > > The > > > > > > > > > > > > > default > > > > > > > > > > > > > > implementation for SinkTask::openOriginal can be > to > > > > call > > > > > > > > > > > > SinkTask::open, > > > > > > > > > > > > > > and the same can go for SinkTask::close. > However, I > > > > > prefer > > > > > > > > > > > overloading > > > > > > > > > > > > > the > > > > > > > > > > > > > > existing methods since this alternative increases > > > > > > complexity > > > > > > > > and > > > > > > > > > > none > > > > > > > > > > > > of > > > > > > > > > > > > > > the names are very informative. > > > > > > > > > > > > > > > > > > > > > > > > > > > > BTW, I wouldn't say that we can't make > assumptions > > > > about > > > > > > the > > > > > > > > > > > > > relationships > > > > > > > > > > > > > > between pre- and post-transformation topic > > > partitions. > > > > We > > > > > > > might > > > > > > > > > > > > utilize a > > > > > > > > > > > > > > policy that assumes a deterministic mapping from > > the > > > > > former > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > latter, > > > > > > > > > > > > > > for example. The distinction I'd draw is that the > > > > > > assumptions > > > > > > > > we > > > > > > > > > > make > > > > > > > > > > > > can > > > > > > > > > > > > > > and probably should favor some cases in terms of > > > > > > performance > > > > > > > > > (i.e., > > > > > > > > > > > > > > reducing the number of unnecessary calls to > > > close/open > > > > > > over a > > > > > > > > > given > > > > > > > > > > > > sink > > > > > > > > > > > > > > task's lifetime), but should not lead to > guaranteed > > > > > > resource > > > > > > > > > leaks > > > > > > > > > > or > > > > > > > > > > > > > > failure to obey API contract in any cases. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya < > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > especially if connectors are intentionally > > > designed > > > > > > > around > > > > > > > > > > > > > > > > original topic partitions instead of > > transformed > > > > > ones. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ha, that's a good point and reminds me of > Hyrum's > > > Law > > > > > [1] > > > > > > > :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think we have to provide connector > developers > > > > with > > > > > > some > > > > > > > > > > > > > > > > way to differentiate between the two, but > maybe > > > > > > there's a > > > > > > > > way > > > > > > > > > > > > > > > > to do this that I haven't thought of yet > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I can't think of a better way to do this > either; > > > > would > > > > > > > > invoking > > > > > > > > > > the > > > > > > > > > > > > > > > existing `SinkTask::open` and `SinkTask::close` > > > > methods > > > > > > > with > > > > > > > > > > > > > > post-transform > > > > > > > > > > > > > > > topic partitions instead of pre-transform topic > > > > > > partitions > > > > > > > > not > > > > > > > > > be > > > > > > > > > > > > > > > acceptable even in a minor / major AK release? > I > > > feel > > > > > > like > > > > > > > > the > > > > > > > > > > > > proposed > > > > > > > > > > > > > > > approach of adding overloaded `SinkTask::open` > / > > > > > > > > > > `SinkTask::close` > > > > > > > > > > > > > > methods > > > > > > > > > > > > > > > to differentiate between pre-transform and > > > > > post-transform > > > > > > > > topic > > > > > > > > > > > > > > partitions > > > > > > > > > > > > > > > has similar pitfalls to the idea of the > > overloaded > > > > > > > > > > `SinkTask::put` > > > > > > > > > > > > > method > > > > > > > > > > > > > > > we discarded earlier. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Either way, I'm glad that the general idea > of a > > > > cache > > > > > > and > > > > > > > > > > > > > > > > eviction policy for SinkTask::close seem > > > > reasonable; > > > > > if > > > > > > > > > > > > > > > > we decide to go this route, it might make > sense > > > for > > > > > the > > > > > > > KIP > > > > > > > > > > > > > > > > to include an outline of one or more > high-level > > > > > > > strategies > > > > > > > > > > > > > > > > we might take, but without promising any > > > particular > > > > > > > > behavior > > > > > > > > > > > > > > > > beyond occasionally calling SinkTask::close > for > > > > > > > > > post-transform > > > > > > > > > > > > > > > > topic partitions. I'm hoping that this logic > > can > > > > stay > > > > > > > > > internal, > > > > > > > > > > > > > > > > and by notpainting ourselves into a corner > with > > > the > > > > > > KIP, > > > > > > > we > > > > > > > > > > > > > > > > give ourselves leeway to tweak it in the > future > > > if > > > > > > > > necessary > > > > > > > > > > > > > > > > without filing another KIP or introducing a > > > > pluggable > > > > > > > > > > interface. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, that's a good idea. Given the > flexibility > > > of > > > > > > SMTs, > > > > > > > > the > > > > > > > > > > > > > framework > > > > > > > > > > > > > > > can't really make any assumptions around topic > > > > > partitions > > > > > > > > post > > > > > > > > > > > > > > > transformation nor does it have any way to > > > > definitively > > > > > > get > > > > > > > > any > > > > > > > > > > > such > > > > > > > > > > > > > > > information from transformations which is why > the > > > > idea > > > > > > of a > > > > > > > > > cache > > > > > > > > > > > > with > > > > > > > > > > > > > an > > > > > > > > > > > > > > > eviction policy makes perfect sense! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] - https://www.hyrumslaw.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton > > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So it looks like with the current state of > > > > affairs, > > > > > > > sink > > > > > > > > > > tasks > > > > > > > > > > > > that > > > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > instantiate writers in the SinkTask::open > > method > > > > (and > > > > > > > don't > > > > > > > > > do > > > > > > > > > > > the > > > > > > > > > > > > > lazy > > > > > > > > > > > > > > > > instantiation in SinkTask::put that you > > > mentioned) > > > > > > might > > > > > > > > fail > > > > > > > > > > > when > > > > > > > > > > > > > used > > > > > > > > > > > > > > > > with topic/partition mutating SMTs even if > they > > > > don't > > > > > > do > > > > > > > > any > > > > > > > > > > > > > > asynchronous > > > > > > > > > > > > > > > > processing? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yep, exactly 👍 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What do you think about retaining just the > > > > existing > > > > > > > > methods > > > > > > > > > > > > > > > > but changing when they're called in the > Connect > > > > > > runtime? > > > > > > > > For > > > > > > > > > > > > > instance, > > > > > > > > > > > > > > > > instead of calling SinkTask::open after > > partition > > > > > > > > assignment > > > > > > > > > > > post a > > > > > > > > > > > > > > > > consumer group rebalance, we could cache the > > > > > currently > > > > > > > > "seen" > > > > > > > > > > > topic > > > > > > > > > > > > > > > > partitions (post transformation) and before > > each > > > > call > > > > > > to > > > > > > > > > > > > > SinkTask::put > > > > > > > > > > > > > > > > check whether there's any new "unseen" topic > > > > > > partitions, > > > > > > > > and > > > > > > > > > if > > > > > > > > > > > so > > > > > > > > > > > > > call > > > > > > > > > > > > > > > > SinkTask::open (and also update the cache of > > > > course). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > IMO the issue here is that it's a drastic > > change > > > in > > > > > > > > behavior > > > > > > > > > to > > > > > > > > > > > > start > > > > > > > > > > > > > > > > invoking SinkTask::open and SinkTask::close > > with > > > > > > > > > post-transform > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > partitions instead of pre-transform, > especially > > > if > > > > > > > > connectors > > > > > > > > > > are > > > > > > > > > > > > > > > > intentionally designed around original topic > > > > > partitions > > > > > > > > > instead > > > > > > > > > > > of > > > > > > > > > > > > > > > > transformed ones. I think we have to provide > > > > > connector > > > > > > > > > > developers > > > > > > > > > > > > > with > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > way to differentiate between the two, but > maybe > > > > > > there's a > > > > > > > > way > > > > > > > > > > to > > > > > > > > > > > do > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > that I haven't thought of yet. Interested to > > hear > > > > > your > > > > > > > > > > thoughts. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Either way, I'm glad that the general idea > of a > > > > cache > > > > > > and > > > > > > > > > > > eviction > > > > > > > > > > > > > > policy > > > > > > > > > > > > > > > > for SinkTask::close seem reasonable; if we > > decide > > > > to > > > > > go > > > > > > > > this > > > > > > > > > > > route, > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > might make sense for the KIP to include an > > > outline > > > > of > > > > > > one > > > > > > > > or > > > > > > > > > > more > > > > > > > > > > > > > > > > high-level strategies we might take, but > > without > > > > > > > promising > > > > > > > > > any > > > > > > > > > > > > > > particular > > > > > > > > > > > > > > > > behavior beyond occasionally calling > > > > SinkTask::close > > > > > > for > > > > > > > > > > > > > post-transform > > > > > > > > > > > > > > > > topic partitions. I'm hoping that this logic > > can > > > > stay > > > > > > > > > internal, > > > > > > > > > > > and > > > > > > > > > > > > > by > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > painting ourselves into a corner with the > KIP, > > we > > > > > give > > > > > > > > > > ourselves > > > > > > > > > > > > > leeway > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > tweak it in the future if necessary without > > > filing > > > > > > > another > > > > > > > > > KIP > > > > > > > > > > or > > > > > > > > > > > > > > > > introducing a pluggable interface. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya < > > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) That's a fair point; while I did scan > > > > everything > > > > > > > > > publicly > > > > > > > > > > > > > > available > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > GitHub, you're right in that it won't cover > > all > > > > > > > possible > > > > > > > > > SMTs > > > > > > > > > > > > that > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > out > > > > > > > > > > > > > > > > > there. Thanks for the example use-case as > > well, > > > > > I've > > > > > > > > > updated > > > > > > > > > > > the > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > add > > > > > > > > > > > > > > > > > the two new proposed methods. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) So it looks like with the current state > of > > > > > > affairs, > > > > > > > > sink > > > > > > > > > > > tasks > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > > instantiate writers in the SinkTask::open > > > method > > > > > (and > > > > > > > > don't > > > > > > > > > > do > > > > > > > > > > > > the > > > > > > > > > > > > > > lazy > > > > > > > > > > > > > > > > > instantiation in SinkTask::put that you > > > > mentioned) > > > > > > > might > > > > > > > > > fail > > > > > > > > > > > > when > > > > > > > > > > > > > > used > > > > > > > > > > > > > > > > > with topic/partition mutating SMTs even if > > they > > > > > don't > > > > > > > do > > > > > > > > > any > > > > > > > > > > > > > > > asynchronous > > > > > > > > > > > > > > > > > processing? Since they could encounter > > records > > > in > > > > > > > > > > SinkTask::put > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > topics/partitions that they might not have > > > > created > > > > > > > > writers > > > > > > > > > > for. > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > pointing this out, it's definitely another > > > > > > > > incompatibility > > > > > > > > > > that > > > > > > > > > > > > > needs > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > called out and fixed. The overloaded method > > > > > approach > > > > > > is > > > > > > > > > > > > > interesting, > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > > comes with the caveat of yet more new > methods > > > > that > > > > > > will > > > > > > > > > need > > > > > > > > > > to > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > implemented by existing connectors if they > > want > > > > to > > > > > > make > > > > > > > > use > > > > > > > > > > of > > > > > > > > > > > > this > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > functionality. What do you think about > > > retaining > > > > > just > > > > > > > the > > > > > > > > > > > > existing > > > > > > > > > > > > > > > > methods > > > > > > > > > > > > > > > > > but changing when they're called in the > > Connect > > > > > > > runtime? > > > > > > > > > For > > > > > > > > > > > > > > instance, > > > > > > > > > > > > > > > > > instead of calling SinkTask::open after > > > partition > > > > > > > > > assignment > > > > > > > > > > > > post a > > > > > > > > > > > > > > > > > consumer group rebalance, we could cache > the > > > > > > currently > > > > > > > > > "seen" > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > partitions (post transformation) and before > > > each > > > > > call > > > > > > > to > > > > > > > > > > > > > > SinkTask::put > > > > > > > > > > > > > > > > > check whether there's any new "unseen" > topic > > > > > > > partitions, > > > > > > > > > and > > > > > > > > > > if > > > > > > > > > > > > so > > > > > > > > > > > > > > call > > > > > > > > > > > > > > > > > SinkTask::open (and also update the cache > of > > > > > > course). I > > > > > > > > > don't > > > > > > > > > > > > think > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > would break the existing contract with sink > > > tasks > > > > > > where > > > > > > > > > > > > > > SinkTask::open > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > expected to be called for a topic partition > > > > before > > > > > > any > > > > > > > > > > records > > > > > > > > > > > > from > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > topic partition are sent via SinkTask::put? > > The > > > > > > > > > > SinkTask::close > > > > > > > > > > > > > case > > > > > > > > > > > > > > > is a > > > > > > > > > > > > > > > > > lot trickier however, and would require > some > > > sort > > > > > of > > > > > > > > cache > > > > > > > > > > > > eviction > > > > > > > > > > > > > > > > policy > > > > > > > > > > > > > > > > > that would be deemed appropriate as you > > pointed > > > > out > > > > > > > too. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris > Egerton > > > > > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I've had some time to think on this KIP > > and I > > > > > think > > > > > > > I'm > > > > > > > > > in > > > > > > > > > > > > > > agreement > > > > > > > > > > > > > > > > > about > > > > > > > > > > > > > > > > > > not blocking it on an official > > compatibility > > > > > > library > > > > > > > or > > > > > > > > > > > adding > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > "ack" > > > > > > > > > > > > > > > > > > API for sink records. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I only have two more thoughts: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Because it is possible to manipulate > > sink > > > > > record > > > > > > > > > > > partitions > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > offsets > > > > > > > > > > > > > > > > > > with the current API we provide for > > > > > > transformations, > > > > > > > I > > > > > > > > > > still > > > > > > > > > > > > > > believe > > > > > > > > > > > > > > > > > > methods should be added to the SinkRecord > > > class > > > > > to > > > > > > > > expose > > > > > > > > > > the > > > > > > > > > > > > > > > original > > > > > > > > > > > > > > > > > > partition and offset, not just the > original > > > > > topic. > > > > > > > The > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > cognitive > > > > > > > > > > > > > > > > > > burden from these two methods is going to > > be > > > > > > minimal > > > > > > > > > > anyways; > > > > > > > > > > > > > once > > > > > > > > > > > > > > > > users > > > > > > > > > > > > > > > > > > understand the difference between the > > > > transformed > > > > > > > topic > > > > > > > > > > name > > > > > > > > > > > > and > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > original one, it's going to be trivial > for > > > them > > > > > to > > > > > > > > > > understand > > > > > > > > > > > > how > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > same > > > > > > > > > > > > > > > > > > difference applies for partitions and > > > offsets. > > > > > It's > > > > > > > not > > > > > > > > > > > enough > > > > > > > > > > > > to > > > > > > > > > > > > > > > scan > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > set of SMTs provided out of the box with > > > > Connect, > > > > > > > ones > > > > > > > > > > > > developed > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > > > Confluent, or even everything available > on > > > > > GitHub, > > > > > > > > since > > > > > > > > > > > there > > > > > > > > > > > > > may > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > closed-source projects out there that > rely > > on > > > > > this > > > > > > > > > ability. > > > > > > > > > > > One > > > > > > > > > > > > > > > > potential > > > > > > > > > > > > > > > > > > use case could be re-routing partitions > > > between > > > > > > Kafka > > > > > > > > and > > > > > > > > > > > some > > > > > > > > > > > > > > other > > > > > > > > > > > > > > > > > > sharded system. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. We still have to address the > > > SinkTask::open > > > > > [1] > > > > > > > and > > > > > > > > > > > > > > > SinkTask::close > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > methods. If a connector writes to the > > > external > > > > > > system > > > > > > > > > using > > > > > > > > > > > the > > > > > > > > > > > > > > > > > transformed > > > > > > > > > > > > > > > > > > topic partitions it reads from Kafka, > then > > > it's > > > > > > > > possible > > > > > > > > > > for > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > connector > > > > > > > > > > > > > > > > > > to lazily instantiate writers for topic > > > > > partitions > > > > > > as > > > > > > > > it > > > > > > > > > > > > > encounters > > > > > > > > > > > > > > > > them > > > > > > > > > > > > > > > > > > from records provided in SinkTask::put. > > > > However, > > > > > > > > > connectors > > > > > > > > > > > > also > > > > > > > > > > > > > > > need a > > > > > > > > > > > > > > > > > way > > > > > > > > > > > > > > > > > > to de-allocate those writers (and the > > > resources > > > > > > used > > > > > > > by > > > > > > > > > > them) > > > > > > > > > > > > > over > > > > > > > > > > > > > > > > time, > > > > > > > > > > > > > > > > > > which they can't do as easily. One > possible > > > > > > approach > > > > > > > > here > > > > > > > > > > is > > > > > > > > > > > to > > > > > > > > > > > > > > > > overload > > > > > > > > > > > > > > > > > > SinkTask::open and SinkTask::close with > > > > variants > > > > > > that > > > > > > > > > > > > distinguish > > > > > > > > > > > > > > > > between > > > > > > > > > > > > > > > > > > transformed and original topic > partitions, > > > and > > > > > > > default > > > > > > > > to > > > > > > > > > > > > > invoking > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > existing methods with just the original > > topic > > > > > > > > partitions. > > > > > > > > > > We > > > > > > > > > > > > > would > > > > > > > > > > > > > > > then > > > > > > > > > > > > > > > > > > have several options for how the Connect > > > > runtime > > > > > > can > > > > > > > > > invoke > > > > > > > > > > > > these > > > > > > > > > > > > > > > > > methods, > > > > > > > > > > > > > > > > > > but in general, an approach that > guarantees > > > > that > > > > > > > tasks > > > > > > > > > are > > > > > > > > > > > > > notified > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > transformed topic partitions in > > > SinkTask::open > > > > > > before > > > > > > > > any > > > > > > > > > > > > records > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > partition are given to it in > SinkTask::put, > > > and > > > > > > > makes a > > > > > > > > > > > > > best-effort > > > > > > > > > > > > > > > > > attempt > > > > > > > > > > > > > > > > > > to close transformed topic partitions > that > > > > appear > > > > > > to > > > > > > > no > > > > > > > > > > > longer > > > > > > > > > > > > be > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > based on some eviction policy, would > > probably > > > > be > > > > > > > > > > sufficient. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection) > > > > > > > > > > > > > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash > Mayya < > > > > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your inputs! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > would provide a simple, clean > interface > > > for > > > > > > > > > developers > > > > > > > > > > to > > > > > > > > > > > > > > > determine > > > > > > > > > > > > > > > > > > > > which features are supported by the > > > version > > > > > of > > > > > > > the > > > > > > > > > > > Connect > > > > > > > > > > > > > > > runtime > > > > > > > > > > > > > > > > > > > > that their plugin has been deployed > > onto > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I do like the idea of having such a > > public > > > > > > > > > compatibility > > > > > > > > > > > > > library > > > > > > > > > > > > > > - > > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > > > > it would remove a lot of restrictions > > from > > > > > > > framework > > > > > > > > > > > > > development > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > > were > > > > > > > > > > > > > > > > > > > to be widely adopted. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > we might consider adding an API to > > "ack" > > > > sink > > > > > > > > records > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree that this does seem like a more > > > > > intuitive > > > > > > > and > > > > > > > > > > clean > > > > > > > > > > > > > API, > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > > I'm > > > > > > > > > > > > > > > > > > > concerned about the backward > > compatibility > > > > > > headache > > > > > > > > > we'd > > > > > > > > > > be > > > > > > > > > > > > > > > imposing > > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > all > > > > > > > > > > > > > > > > > > > existing sink connectors. Connector > > > > developers > > > > > > will > > > > > > > > > have > > > > > > > > > > to > > > > > > > > > > > > > > > maintain > > > > > > > > > > > > > > > > > two > > > > > > > > > > > > > > > > > > > separate ways of doing offset > management > > if > > > > > they > > > > > > > want > > > > > > > > > to > > > > > > > > > > > use > > > > > > > > > > > > > the > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > API > > > > > > > > > > > > > > > > > > > but continue supporting older versions > of > > > > Kafka > > > > > > > > > Connect. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For now, I've reverted the KIP to the > > > > previous > > > > > > > > > iteration > > > > > > > > > > > > which > > > > > > > > > > > > > > > > proposed > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > addition of a new `SinkRecord` method > to > > > > obtain > > > > > > the > > > > > > > > > > > original > > > > > > > > > > > > > > Kafka > > > > > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > > > pre-transformation. One thing to note > is > > > that > > > > > > I've > > > > > > > > > > removed > > > > > > > > > > > > the > > > > > > > > > > > > > > > method > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > obtaining the original Kafka partition > > > after > > > > a > > > > > > > > cursory > > > > > > > > > > > search > > > > > > > > > > > > > > > showed > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > use cases for partition modifying SMTs > > are > > > > > > > primarily > > > > > > > > on > > > > > > > > > > the > > > > > > > > > > > > > > source > > > > > > > > > > > > > > > > > > > connector side. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris > > > Egerton > > > > > > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have more comments I'd like to make > > on > > > > this > > > > > > KIP > > > > > > > > > when > > > > > > > > > > I > > > > > > > > > > > > have > > > > > > > > > > > > > > > time > > > > > > > > > > > > > > > > > > (sorry > > > > > > > > > > > > > > > > > > > > for the delay, Yash, and thanks for > > your > > > > > > > > patience!), > > > > > > > > > > but > > > > > > > > > > > I > > > > > > > > > > > > > did > > > > > > > > > > > > > > > want > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > chime in and say that I'm also not > sure > > > > about > > > > > > > > > > overloading > > > > > > > > > > > > > > > > > > SinkTask::put. > > > > > > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > > > share the concerns about creating an > > > > > intuitive, > > > > > > > > > simple > > > > > > > > > > > API > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > > > > > raised. In addition, this approach > > > doesn't > > > > > seem > > > > > > > > very > > > > > > > > > > > > > > > > > sustainable--what > > > > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > > > > > we do if we encounter another case in > > the > > > > > > future > > > > > > > > that > > > > > > > > > > > would > > > > > > > > > > > > > > > > warrant a > > > > > > > > > > > > > > > > > > > > similar solution? We probably don't > > want > > > to > > > > > > > create > > > > > > > > > > three, > > > > > > > > > > > > > four, > > > > > > > > > > > > > > > > etc. > > > > > > > > > > > > > > > > > > > > overloaded variants of the method, > each > > > of > > > > > > which > > > > > > > > > would > > > > > > > > > > > have > > > > > > > > > > > > > to > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > > implemented by connector developers > who > > > > want > > > > > to > > > > > > > > both > > > > > > > > > > > > leverage > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > latest > > > > > > > > > > > > > > > > > > > > and greatest connector APIs and > > maintain > > > > > > > > > compatibility > > > > > > > > > > > with > > > > > > > > > > > > > > > connect > > > > > > > > > > > > > > > > > > > > Clusters running older versions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I haven't been able to flesh this out > > > into > > > > a > > > > > > > design > > > > > > > > > > worth > > > > > > > > > > > > > > > > publishing > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > its > > > > > > > > > > > > > > > > > > > > own KIP yet, but one alternative I've > > > > pitched > > > > > > to > > > > > > > a > > > > > > > > > few > > > > > > > > > > > > people > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > generally positive interest has been > to > > > > > develop > > > > > > > an > > > > > > > > > > > official > > > > > > > > > > > > > > > > > > compatibility > > > > > > > > > > > > > > > > > > > > library for Connect developers. This > > > > library > > > > > > > would > > > > > > > > be > > > > > > > > > > > > > released > > > > > > > > > > > > > > as > > > > > > > > > > > > > > > > its > > > > > > > > > > > > > > > > > > own > > > > > > > > > > > > > > > > > > > > Maven artifact (separate from > > > connect-api, > > > > > > > > > > > connect-runtime, > > > > > > > > > > > > > > etc.) > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > provide a simple, clean interface for > > > > > > developers > > > > > > > to > > > > > > > > > > > > determine > > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > > > > > features are supported by the version > > of > > > > the > > > > > > > > Connect > > > > > > > > > > > > runtime > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > their > > > > > > > > > > > > > > > > > > > > plugin has been deployed onto. Under > > the > > > > > hood, > > > > > > > this > > > > > > > > > > > library > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > > > reflection to determine whether > > classes, > > > > > > methods, > > > > > > > > > etc. > > > > > > > > > > > are > > > > > > > > > > > > > > > > available, > > > > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > > > > > the developer wouldn't have to do > > > anything > > > > > more > > > > > > > > than > > > > > > > > > > > check > > > > > > > > > > > > > (for > > > > > > > > > > > > > > > > > > example) > > > > > > > > > > > > > > > > > > > > > > > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()` > > > > > > > > > > to > > > > > > > > > > > > know > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > any > > > > > > > > > > > > > > > > > > > point > > > > > > > > > > > > > > > > > > > > in the lifetime of their > connector/task > > > > > whether > > > > > > > > that > > > > > > > > > > > > feature > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > provided > > > > > > > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > > > > > the runtime. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > One other high-level comment: this > > > doesn't > > > > > > > address > > > > > > > > > > every > > > > > > > > > > > > > case, > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > > > > > > consider adding an API to "ack" sink > > > > records. > > > > > > > This > > > > > > > > > > could > > > > > > > > > > > > use > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > SubmittedRecords class [1] (with some > > > > slight > > > > > > > > tweaks) > > > > > > > > > > > under > > > > > > > > > > > > > the > > > > > > > > > > > > > > > hood > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > track the latest-acked offset for > each > > > > topic > > > > > > > > > partition. > > > > > > > > > > > > This > > > > > > > > > > > > > > way, > > > > > > > > > > > > > > > > > > > connector > > > > > > > > > > > > > > > > > > > > developers won't be responsible for > > > > tracking > > > > > > > > offsets > > > > > > > > > at > > > > > > > > > > > all > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > their > > > > > > > > > > > > > > > > > > sink > > > > > > > > > > > > > > > > > > > > tasks (eliminating issues with the > > > accuracy > > > > > of > > > > > > > > > > > > > > > post-transformation > > > > > > > > > > > > > > > > > > T/P/O > > > > > > > > > > > > > > > > > > > > sink record information), and they'll > > > only > > > > > have > > > > > > > to > > > > > > > > > > notify > > > > > > > > > > > > the > > > > > > > > > > > > > > > > Connect > > > > > > > > > > > > > > > > > > > > framework when a record has been > > > > successfully > > > > > > > > > > dispatched > > > > > > > > > > > to > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > external > > > > > > > > > > > > > > > > > > > > system. This provides a cleaner, > > > friendlier > > > > > > API, > > > > > > > > and > > > > > > > > > > also > > > > > > > > > > > > > > enables > > > > > > > > > > > > > > > > > more > > > > > > > > > > > > > > > > > > > > fine-grained metrics like the ones > > > proposed > > > > > in > > > > > > > > > KIP-767 > > > > > > > > > > > [2]. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java > > > > > > > > > > > > > > > > > > > > [2] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash > > > Mayya > > > > < > > > > > > > > > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It's been a while for this one but > > the > > > > > more I > > > > > > > > think > > > > > > > > > > > about > > > > > > > > > > > > > it, > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > more > > > > > > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > > > > feel like the current approach > with a > > > new > > > > > > > > > overloaded > > > > > > > > > > > > > > > > > `SinkTask::put` > > > > > > > > > > > > > > > > > > > > method > > > > > > > > > > > > > > > > > > > > > might not be optimal. We're trying > to > > > > fix a > > > > > > > > pretty > > > > > > > > > > > corner > > > > > > > > > > > > > > case > > > > > > > > > > > > > > > > bug > > > > > > > > > > > > > > > > > > here > > > > > > > > > > > > > > > > > > > > > (usage of topic mutating SMTs with > > sink > > > > > > > > connectors > > > > > > > > > > that > > > > > > > > > > > > do > > > > > > > > > > > > > > > their > > > > > > > > > > > > > > > > > own > > > > > > > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > > > > > > tracking) and I'm not sure that > > > warrants > > > > a > > > > > > > change > > > > > > > > > to > > > > > > > > > > > > such a > > > > > > > > > > > > > > > > central > > > > > > > > > > > > > > > > > > > > > interface method. The new > > > `SinkTask::put` > > > > > > > method > > > > > > > > > just > > > > > > > > > > > > seems > > > > > > > > > > > > > > > > > somewhat > > > > > > > > > > > > > > > > > > > odd > > > > > > > > > > > > > > > > > > > > > and it may not be very > understandable > > > > for a > > > > > > new > > > > > > > > > > reader > > > > > > > > > > > - > > > > > > > > > > > > I > > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > should be the case for a public > > > interface > > > > > > > method. > > > > > > > > > > > > > > Furthermore, > > > > > > > > > > > > > > > > even > > > > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > elaborate documentation in place, > I'm > > > not > > > > > > sure > > > > > > > if > > > > > > > > > > it'll > > > > > > > > > > > > be > > > > > > > > > > > > > > very > > > > > > > > > > > > > > > > > > obvious > > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > most people what the purpose of > > having > > > > > these > > > > > > > two > > > > > > > > > > `put` > > > > > > > > > > > > > > methods > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > how > > > > > > > > > > > > > > > > > > > > > they should be used by sink task > > > > > > > implementations. > > > > > > > > > > What > > > > > > > > > > > do > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM > Yash > > > > Mayya > > > > > < > > > > > > > > > > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your valuable > > > feedback > > > > > so > > > > > > > far! > > > > > > > > > > I've > > > > > > > > > > > > > > updated > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > > > > > > > > based > > > > > > > > > > > > > > > > > > > > > > on our discussion above. Could > you > > > > please > > > > > > > take > > > > > > > > > > > another > > > > > > > > > > > > > > look? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM > > > Randall > > > > > > > Hauch < > > > > > > > > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM > > Yash > > > > > > Mayya < > > > > > > > > > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > Hi Randall, > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > Thanks for elaborating. I > think > > > > these > > > > > > are > > > > > > > > all > > > > > > > > > > very > > > > > > > > > > > > > good > > > > > > > > > > > > > > > > points > > > > > > > > > > > > > > > > > > > and I > > > > > > > > > > > > > > > > > > > > > see > > > > > > > > > > > > > > > > > > > > > >> > why the overloaded > > `SinkTask::put` > > > > > > method > > > > > > > > is a > > > > > > > > > > > > cleaner > > > > > > > > > > > > > > > > > solution > > > > > > > > > > > > > > > > > > > > > overall. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > public void > > > > > put(Collection<SinkRecord> > > > > > > > > > > records, > > > > > > > > > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > > > > > > > > > >> > TopicPartition> > > > > > updatedTopicPartitions) > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > I think this should be > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > `public void > > > > > put(Collection<SinkRecord> > > > > > > > > > records, > > > > > > > > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > > > > > > > > > >> > TopicPartition> > > > > > > originalTopicPartitions)` > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > instead because the sink > records > > > > > > > themselves > > > > > > > > > have > > > > > > > > > > > the > > > > > > > > > > > > > > > updated > > > > > > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > > > > > >> > partitions (i.e. after all > > > > > > transformations > > > > > > > > > have > > > > > > > > > > > been > > > > > > > > > > > > > > > > applied) > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> KIP > > > > > > > > > > > > > > > > > > > > > >> > is proposing a way for the > tasks > > > to > > > > be > > > > > > > able > > > > > > > > to > > > > > > > > > > > > access > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > original > > > > > > > > > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > > > > > >> > partition (i.e. before > > > > transformations > > > > > > > have > > > > > > > > > been > > > > > > > > > > > > > > applied). > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> Sounds good. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > Of course, if the developer > > does > > > > not > > > > > > > need > > > > > > > > > > > separate > > > > > > > > > > > > > > > > methods, > > > > > > > > > > > > > > > > > > they > > > > > > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > > > > > >> > easily have the older `put` > > method > > > > > > simply > > > > > > > > > > delegate > > > > > > > > > > > > to > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > newer > > > > > > > > > > > > > > > > > > > > > method. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > If the developer does not need > > > > > separate > > > > > > > > > methods > > > > > > > > > > > > (i.e. > > > > > > > > > > > > > > they > > > > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > >> > use this new addition), they > can > > > > > simply > > > > > > > > > continue > > > > > > > > > > > > > > > > implementing > > > > > > > > > > > > > > > > > > just > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> > older `put` method right? > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> Correct. We should update the > > > JavaDoc > > > > of > > > > > > > both > > > > > > > > > > > methods > > > > > > > > > > > > to > > > > > > > > > > > > > > > make > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > clear, > > > > > > > > > > > > > > > > > > > > > >> and in general how the two > methods > > > > > should > > > > > > > are > > > > > > > > > used > > > > > > > > > > > and > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > > > >> implemented. That can be part of > > the > > > > PR, > > > > > > and > > > > > > > > the > > > > > > > > > > KIP > > > > > > > > > > > > > > doesn't > > > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > >> wording. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > Finally, this gives us a > > roadmap > > > > for > > > > > > > > > > > *eventually* > > > > > > > > > > > > > > > > > deprecating > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> older > > > > > > > > > > > > > > > > > > > > > >> > method, once the Connect > runtime > > > > > > versions > > > > > > > > > > without > > > > > > > > > > > > this > > > > > > > > > > > > > > > > change > > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > old > > > > > > > > > > > > > > > > > > > > > >> > enough. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > I'm not sure we'd ever want to > > > > > deprecate > > > > > > > the > > > > > > > > > > older > > > > > > > > > > > > > > method. > > > > > > > > > > > > > > > > > Most > > > > > > > > > > > > > > > > > > > > common > > > > > > > > > > > > > > > > > > > > > >> sink > > > > > > > > > > > > > > > > > > > > > >> > connector implementations do > not > > > do > > > > > > their > > > > > > > > own > > > > > > > > > > > offset > > > > > > > > > > > > > > > > tracking > > > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > > > >> > asynchronous processing and > will > > > > > > probably > > > > > > > > > never > > > > > > > > > > > > have a > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> > additional parameter > > > > `Map<SinkRecord, > > > > > > > > > > > > TopicPartition> > > > > > > > > > > > > > > > > > > > > > >> > originalTopicPartitions` in > the > > > > > proposed > > > > > > > new > > > > > > > > > > `put` > > > > > > > > > > > > > > method. > > > > > > > > > > > > > > > > > These > > > > > > > > > > > > > > > > > > > > > >> connectors > > > > > > > > > > > > > > > > > > > > > >> > can continue implementing only > > the > > > > > > > existing > > > > > > > > > > > > > > > `SinkTask::put` > > > > > > > > > > > > > > > > > > method > > > > > > > > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > > > > > > >> > will be called by the default > > > > > > > implementation > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > > newer > > > > > > > > > > > > > > > > > > > overloaded > > > > > > > > > > > > > > > > > > > > > >> `put` > > > > > > > > > > > > > > > > > > > > > >> > method. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> +1 > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > the pre-commit methods use > the > > > > same > > > > > > > > > > > > > > `Map<TopicPartition, > > > > > > > > > > > > > > > > > > > > > >> > OffsetAndMetadata> > > currentOffsets` > > > > > data > > > > > > > > > > structure > > > > > > > > > > > > I'm > > > > > > > > > > > > > > > > > suggesting > > > > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > > > >> used. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > The data structure you're > > > suggesting > > > > > be > > > > > > > used > > > > > > > > > is > > > > > > > > > > a > > > > > > > > > > > > > > > > > > `Map<SinkRecord, > > > > > > > > > > > > > > > > > > > > > >> > TopicPartition>` which will > map > > > > > > > `SinkRecord` > > > > > > > > > > > objects > > > > > > > > > > > > > to > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > original > > > > > > > > > > > > > > > > > > > > > >> topic > > > > > > > > > > > > > > > > > > > > > >> > partition of the corresponding > > > > > > > > > `ConsumerRecord` > > > > > > > > > > > > right? > > > > > > > > > > > > > > To > > > > > > > > > > > > > > > > > > clarify, > > > > > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > > >> is > > > > > > > > > > > > > > > > > > > > > >> > a new data structure that will > > > need > > > > to > > > > > > be > > > > > > > > > > managed > > > > > > > > > > > in > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> `WorkerSinkTask`. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> Ah, you're right. Thanks for the > > > > > > correction. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> Best regards, > > > > > > > > > > > > > > > > > > > > > >> Randall > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > Thanks, > > > > > > > > > > > > > > > > > > > > > >> > Yash > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM > > > > Randall > > > > > > > > Hauch < > > > > > > > > > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > Hi, Yash. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > I'm not sure I quite > > understand > > > > why > > > > > it > > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > > > > "easier" > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > connector > > > > > > > > > > > > > > > > > > > > > >> > > > developers to account for > > > > > > implementing > > > > > > > > two > > > > > > > > > > > > > different > > > > > > > > > > > > > > > > > > > overloaded > > > > > > > > > > > > > > > > > > > > > >> `put` > > > > > > > > > > > > > > > > > > > > > >> > > > methods (assuming that > they > > > want > > > > > to > > > > > > > use > > > > > > > > > this > > > > > > > > > > > new > > > > > > > > > > > > > > > > feature) > > > > > > > > > > > > > > > > > > > versus > > > > > > > > > > > > > > > > > > > > > >> using > > > > > > > > > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > > > > > > > > > >> > > > try-catch block around > > > > > `SinkRecord` > > > > > > > > access > > > > > > > > > > > > > methods? > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > Using a try-catch to try > > around > > > an > > > > > API > > > > > > > > > method > > > > > > > > > > > that > > > > > > > > > > > > > > > *might* > > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > > there > > > > > > > > > > > > > > > > > > > > > >> is a > > > > > > > > > > > > > > > > > > > > > >> > > very unusual thing for most > > > > > > developers. > > > > > > > > > > > > > Unfortunately, > > > > > > > > > > > > > > > > we've > > > > > > > > > > > > > > > > > > had > > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > >> > resort > > > > > > > > > > > > > > > > > > > > > >> > > to this atypical approach > with > > > > > Connect > > > > > > > in > > > > > > > > > > places > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > there > > > > > > > > > > > > > > > > > > was > > > > > > > > > > > > > > > > > > > no > > > > > > > > > > > > > > > > > > > > > >> good > > > > > > > > > > > > > > > > > > > > > >> > > alternative. We seem to > > relying > > > > upon > > > > > > > > pattern > > > > > > > > > > > > because > > > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > > > > easier > > > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > >> us, > > > > > > > > > > > > > > > > > > > > > >> > > not because it offers a > better > > > > > > > experience > > > > > > > > > for > > > > > > > > > > > > > > Connector > > > > > > > > > > > > > > > > > > > > developers. > > > > > > > > > > > > > > > > > > > > > >> IMO, > > > > > > > > > > > > > > > > > > > > > >> > if > > > > > > > > > > > > > > > > > > > > > >> > > there's a practical > > alternative > > > > that > > > > > > > uses > > > > > > > > > > normal > > > > > > > > > > > > > > > > development > > > > > > > > > > > > > > > > > > > > > practices > > > > > > > > > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > > > > > > > > > >> > > techniques, then we should > use > > > > that > > > > > > > > > > alternative. > > > > > > > > > > > > > IIUC, > > > > > > > > > > > > > > > > there > > > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > >> least > > > > > > > > > > > > > > > > > > > > > >> > > one practical alternative > for > > > this > > > > > KIP > > > > > > > > that > > > > > > > > > > > would > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > require > > > > > > > > > > > > > > > > > > > > > >> developers > > > > > > > > > > > > > > > > > > > > > >> > to > > > > > > > > > > > > > > > > > > > > > >> > > use the unusual try-catch to > > > > handle > > > > > > the > > > > > > > > case > > > > > > > > > > > where > > > > > > > > > > > > > > > methods > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > > > > >> found. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > I also think having two > `put` > > > > > methods > > > > > > is > > > > > > > > > > easier > > > > > > > > > > > > when > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > Connector > > > > > > > > > > > > > > > > > > > > > >> has to > > > > > > > > > > > > > > > > > > > > > >> > > do different things for > > > different > > > > > > > Connect > > > > > > > > > > > > runtimes, > > > > > > > > > > > > > > too. > > > > > > > > > > > > > > > > One > > > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > > > those > > > > > > > > > > > > > > > > > > > > > >> > > methods is called by newer > > > Connect > > > > > > > > runtimes > > > > > > > > > > with > > > > > > > > > > > > the > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > > > behavior, > > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > > > > > > >> > > other method is called by an > > > older > > > > > > > Connect > > > > > > > > > > > > runtime. > > > > > > > > > > > > > Of > > > > > > > > > > > > > > > > > course, > > > > > > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> > > developer does not need > > separate > > > > > > > methods, > > > > > > > > > they > > > > > > > > > > > can > > > > > > > > > > > > > > > easily > > > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> older > > > > > > > > > > > > > > > > > > > > > >> > > `put` method simply delegate > > to > > > > the > > > > > > > newer > > > > > > > > > > > method. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > Finally, this gives us a > > roadmap > > > > for > > > > > > > > > > > *eventually* > > > > > > > > > > > > > > > > > deprecating > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> older > > > > > > > > > > > > > > > > > > > > > >> > > method, once the Connect > > runtime > > > > > > > versions > > > > > > > > > > > without > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > change > > > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > > old > > > > > > > > > > > > > > > > > > > > > >> > > enough. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > I think the advantage of > going > > > > with > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> > > > proposed approach in the > KIP > > > is > > > > > that > > > > > > > it > > > > > > > > > > > wouldn't > > > > > > > > > > > > > > > require > > > > > > > > > > > > > > > > > > extra > > > > > > > > > > > > > > > > > > > > > >> > > book-keeping > > > > > > > > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > > > > > > > > > > > > > >> > > > TopicPartition> in > > > > > `WorkerSinkTask` > > > > > > in > > > > > > > > > your > > > > > > > > > > > > > proposed > > > > > > > > > > > > > > > > > > approach) > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > The connector does have to > do > > > some > > > > > of > > > > > > > this > > > > > > > > > > > > > bookkeeping > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > how > > > > > > > > > > > > > > > > > > > they > > > > > > > > > > > > > > > > > > > > > >> track > > > > > > > > > > > > > > > > > > > > > >> > > the topic partition offsets > > used > > > > in > > > > > > the > > > > > > > > > > > > `preCommit`, > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> pre-commit > > > > > > > > > > > > > > > > > > > > > >> > > methods use the same > > > > > > > `Map<TopicPartition, > > > > > > > > > > > > > > > > OffsetAndMetadata> > > > > > > > > > > > > > > > > > > > > > >> > > currentOffsets` > > > > > > > > > > > > > > > > > > > > > >> > > data structure I'm > suggesting > > be > > > > > used. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > I hope that helps. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > Best regards, > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > Randall > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 > > AM > > > > Yash > > > > > > > > Mayya < > > > > > > > > > > > > > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > Hi Randall, > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > Thanks for reviewing the > > KIP! > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > That latter logic can > get > > > > quite > > > > > > > ugly. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > I'm not sure I quite > > > understand > > > > > why > > > > > > it > > > > > > > > > would > > > > > > > > > > > be > > > > > > > > > > > > > > > "easier" > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > >> connector > > > > > > > > > > > > > > > > > > > > > >> > > > developers to account for > > > > > > implementing > > > > > > > > two > > > > > > > > > > > > > different > > > > > > > > > > > > > > > > > > > overloaded > > > > > > > > > > > > > > > > > > > > > >> `put` > > > > > > > > > > > > > > > > > > > > > >> > > > methods (assuming that > they > > > want > > > > > to > > > > > > > use > > > > > > > > > this > > > > > > > > > > > new > > > > > > > > > > > > > > > > feature) > > > > > > > > > > > > > > > > > > > versus > > > > > > > > > > > > > > > > > > > > > >> using > > > > > > > > > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > > > > > > > > > >> > > > try-catch block around > > > > > `SinkRecord` > > > > > > > > access > > > > > > > > > > > > > methods? > > > > > > > > > > > > > > In > > > > > > > > > > > > > > > > > both > > > > > > > > > > > > > > > > > > > > > cases, a > > > > > > > > > > > > > > > > > > > > > >> > > > connector developer would > > need > > > > to > > > > > > > write > > > > > > > > > > > > additional > > > > > > > > > > > > > > > code > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > order > > > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > >> > > ensure > > > > > > > > > > > > > > > > > > > > > >> > > > that their connector > > continues > > > > > > working > > > > > > > > > with > > > > > > > > > > > > older > > > > > > > > > > > > > > > > Connect > > > > > > > > > > > > > > > > > > > > > runtimes. > > > > > > > > > > > > > > > > > > > > > >> > > > Furthermore, we would > > probably > > > > > need > > > > > > to > > > > > > > > > > > carefully > > > > > > > > > > > > > > > > document > > > > > > > > > > > > > > > > > > how > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > >> > > > implementation for the > older > > > > `put` > > > > > > > > method > > > > > > > > > > > should > > > > > > > > > > > > > > look > > > > > > > > > > > > > > > > like > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > > >> > connectors > > > > > > > > > > > > > > > > > > > > > >> > > > that want to use this new > > > > > feature. I > > > > > > > > think > > > > > > > > > > the > > > > > > > > > > > > > > > advantage > > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > > going > > > > > > > > > > > > > > > > > > > > > >> with > > > > > > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > > > > > > >> > > > proposed approach in the > KIP > > > is > > > > > that > > > > > > > it > > > > > > > > > > > wouldn't > > > > > > > > > > > > > > > require > > > > > > > > > > > > > > > > > > extra > > > > > > > > > > > > > > > > > > > > > >> > > book-keeping > > > > > > > > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord, > > > > > > > > > > > > > > > > > > > > > >> > > > TopicPartition> in > > > > > `WorkerSinkTask` > > > > > > in > > > > > > > > > your > > > > > > > > > > > > > proposed > > > > > > > > > > > > > > > > > > approach) > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > >> also > > > > > > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > > > > > > >> > > > fact that the try-catch > > based > > > > > logic > > > > > > is > > > > > > > > an > > > > > > > > > > > > already > > > > > > > > > > > > > > > > > > established > > > > > > > > > > > > > > > > > > > > > >> pattern > > > > > > > > > > > > > > > > > > > > > >> > > > through > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors > > > > > > > > > > > > > > > > > > > > > >> > > > and other KIPs which added > > > > methods > > > > > > to > > > > > > > > > > > > source/sink > > > > > > > > > > > > > > > > > > > connector/task > > > > > > > > > > > > > > > > > > > > > >> > > contexts. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > Let me know if you still > > feel > > > > that > > > > > > > > having > > > > > > > > > a > > > > > > > > > > > new > > > > > > > > > > > > > > > > overloaded > > > > > > > > > > > > > > > > > > put > > > > > > > > > > > > > > > > > > > > > >> method > > > > > > > > > > > > > > > > > > > > > >> > is > > > > > > > > > > > > > > > > > > > > > >> > > a > > > > > > > > > > > > > > > > > > > > > >> > > > cleaner solution and I'd > be > > > > happy > > > > > to > > > > > > > > > > > reconsider! > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > > > > > > > > > > > > > >> > > > Yash > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at > > 11:18 > > > PM > > > > > > > Randall > > > > > > > > > > > Hauch < > > > > > > > > > > > > > > > > > > > > rha...@gmail.com> > > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for > > picking > > > > up > > > > > > this > > > > > > > > KIP > > > > > > > > > > and > > > > > > > > > > > > > > > > discussion. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > The KIP includes this > > > rejected > > > > > > > > > > alternative: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put > > in > > > > any > > > > > > way > > > > > > > to > > > > > > > > > > pass > > > > > > > > > > > > the > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > > > > information > > > > > > > > > > > > > > > > > > > > > >> > outside > > > > > > > > > > > > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map > > or > > > a > > > > > > > derived > > > > > > > > > > class) > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > - > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > Much more > disruptive > > > > change > > > > > > > > without > > > > > > > > > > > > > > > considerable > > > > > > > > > > > > > > > > > pros > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > One advantage about > doing > > > this > > > > > is > > > > > > > that > > > > > > > > > > sink > > > > > > > > > > > > > > > connector > > > > > > > > > > > > > > > > > > > > > >> implementations > > > > > > > > > > > > > > > > > > > > > >> > > can > > > > > > > > > > > > > > > > > > > > > >> > > > > more easily implement > two > > > > > > different > > > > > > > > > > > "put(...)" > > > > > > > > > > > > > > > methods > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > handle > > > > > > > > > > > > > > > > > > > > > >> > > running > > > > > > > > > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > > > > > > > > > >> > > > > a variety of runtimes, > > > without > > > > > > > having > > > > > > > > to > > > > > > > > > > use > > > > > > > > > > > > > > > try-catch > > > > > > > > > > > > > > > > > > logic > > > > > > > > > > > > > > > > > > > > > >> around > > > > > > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > > > > > > >> > > > > newer SinkRecord access > > > > methods. > > > > > > > That > > > > > > > > > > latter > > > > > > > > > > > > > logic > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > get > > > > > > > > > > > > > > > > > > > > quite > > > > > > > > > > > > > > > > > > > > > >> > ugly. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > For example, the > existing > > > > `put` > > > > > > > method > > > > > > > > > has > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > signature: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > public abstract void > > > > > > > > > > > > put(Collection<SinkRecord> > > > > > > > > > > > > > > > > > records); > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > If we added an > overloaded > > > > method > > > > > > > that > > > > > > > > > > passed > > > > > > > > > > > > in > > > > > > > > > > > > > a > > > > > > > > > > > > > > > map > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > old > > > > > > > > > > > > > > > > > > > > > >> > > > > topic+partition for each > > > > record > > > > > > (and > > > > > > > > > > defined > > > > > > > > > > > > the > > > > > > > > > > > > > > > > absence > > > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > > > > > >> entry > > > > > > > > > > > > > > > > > > > > > >> > as > > > > > > > > > > > > > > > > > > > > > >> > > > > having an unchanged > topic > > > and > > > > > > > > > partition): > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > public void > > > > > > > put(Collection<SinkRecord> > > > > > > > > > > > > records, > > > > > > > > > > > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > > > > > > > > > >> > > > > TopicPartition> > > > > > > > > updatedTopicPartitions) > > > > > > > > > { > > > > > > > > > > > > > > > > > > > > > >> > > > > put(records); > > > > > > > > > > > > > > > > > > > > > >> > > > > } > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > then a `SinkTask` > > > > implementation > > > > > > > that > > > > > > > > > > wants > > > > > > > > > > > to > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > > > > > >> feature > > > > > > > > > > > > > > > > > > > > > >> > > could > > > > > > > > > > > > > > > > > > > > > >> > > > > simply implement both > > > methods: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > public void > > > > > > > put(Collection<SinkRecord> > > > > > > > > > > > > records) > > > > > > > > > > > > > { > > > > > > > > > > > > > > > > > > > > > >> > > > > // Running in an older > > > > runtime, > > > > > so > > > > > > > no > > > > > > > > > > > tracking > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > SMT-modified > > > > > > > > > > > > > > > > > > > > > >> topic > > > > > > > > > > > > > > > > > > > > > >> > > > names > > > > > > > > > > > > > > > > > > > > > >> > > > > or partitions > > > > > > > > > > > > > > > > > > > > > >> > > > > put(records, Map.of()); > > > > > > > > > > > > > > > > > > > > > >> > > > > } > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > public void > > > > > > > put(Collection<SinkRecord> > > > > > > > > > > > > records, > > > > > > > > > > > > > > > > > > > > Map<SinkRecord, > > > > > > > > > > > > > > > > > > > > > >> > > > > TopicPartition> > > > > > > > > updatedTopicPartitions) > > > > > > > > > { > > > > > > > > > > > > > > > > > > > > > >> > > > > // real logic here > > > > > > > > > > > > > > > > > > > > > >> > > > > } > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > This seems a lot easier > > than > > > > > > having > > > > > > > to > > > > > > > > > use > > > > > > > > > > > > > > try-catch > > > > > > > > > > > > > > > > > > logic, > > > > > > > > > > > > > > > > > > > > yet > > > > > > > > > > > > > > > > > > > > > >> still > > > > > > > > > > > > > > > > > > > > > >> > > > > allows sink connectors > to > > > > > utilize > > > > > > > the > > > > > > > > > new > > > > > > > > > > > > > > > > functionality > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > still > > > > > > > > > > > > > > > > > > > > > >> > work > > > > > > > > > > > > > > > > > > > > > >> > > > with > > > > > > > > > > > > > > > > > > > > > >> > > > > older Connect runtimes. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > Randall > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at > > 7:03 > > > AM > > > > > > Yash > > > > > > > > > Mayya > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > yash.ma...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > I would like to > > (re)start > > > a > > > > > new > > > > > > > > > > discussion > > > > > > > > > > > > > > thread > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > > KIP-793 > > > > > > > > > > > > > > > > > > > > > >> (Kafka > > > > > > > > > > > > > > > > > > > > > >> > > > > > Connect) which > proposes > > > some > > > > > > > > additions > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > > > > public > > > > > > > > > > > > > > > > > > > > > SinkRecord > > > > > > > > > > > > > > > > > > > > > >> > > > interface > > > > > > > > > > > > > > > > > > > > > >> > > > > > in order to support > > topic > > > > > > mutating > > > > > > > > > SMTs > > > > > > > > > > > for > > > > > > > > > > > > > sink > > > > > > > > > > > > > > > > > > > connectors > > > > > > > > > > > > > > > > > > > > > >> that do > > > > > > > > > > > > > > > > > > > > > >> > > > their > > > > > > > > > > > > > > > > > > > > > >> > > > > > own offset tracking. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > Links: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > KIP: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830 > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > Older discussion > thread: > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h > > > > > > > > > > > > , > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > Jira: > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13431 > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > >> > > > > > Yash > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >