Hi Yash,

We'll probably want to make a few tweaks to the Javadocs for the new
methods (I'm imagining that notes on compatibility with older versions will
be required), but I believe what's proposed in the KIP is good enough to
approve with the understanding that it may not exactly match what gets
implemented/merged.

LGTM, thanks again for the KIP!

Cheers,

Chris

On Tue, Feb 21, 2023 at 12:18 PM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Chris,
>
> > we might try to introduce a framework-level configuration
> > property to dictate which of the pre-transform and post-transform
> > topic partitions are used for the fallback call to the single-arg
> > variant if a task class has not overridden the multi-arg variant
>
> Thanks for the explanation and I agree that this will be a tad bit too
> convoluted. :)
>
> Please do let me know if you'd like any further amendments to the KIP!
>
> Thanks,
> Yash
>
> On Tue, Feb 21, 2023 at 8:42 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Yash,
> >
> > I think the use case for pre-transform TPO coordinates (and topic
> partition
> > writers created/destroyed in close/open) tends to boil down to
> exactly-once
> > semantics, where it's desirable to preserve the guarantees that Kafka
> > provides (every record has a unique TPO trio, and records are ordered by
> > offset within a topic partition).
> >
> > It's my understanding that this approach is utilized in several
> connectors
> > out there today, and it might break these connectors to start using the
> > post-transform topic partitions automatically in their open/close
> methods.
> >
> > If we want to get really fancy with this and try to obviate or at least
> > reduce the need for per-connector code changes, we might try to
> introduce a
> > framework-level configuration property to dictate which of the
> > pre-transform and post-transform topic partitions are used for the
> fallback
> > call to the single-arg variant if a task class has not overridden the
> > multi-arg variant. But I think this is going a bit too far and would
> prefer
> > to keep things simple(r) for now.
> >
> > Cheers,
> >
> > Chris
> >
> >
> > On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya <yash.ma...@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > > > I was actually envisioning something like `void
> > > > open(Collection<TopicPartition> originalPartitions,
> > > > Collection<TopicPartition> transformedPartitions)`
> > >
> > > Ah okay, this does make a lot more sense. Sorry, I think I
> misunderstood
> > > you earlier. I do agree with you that this seems better than splitting
> it
> > > off into two new sets of open / close methods from a complexity
> > standpoint.
> > >
> > > > Plus, if a connector is intentionally designed to use
> > > > pre-transformation topic partitions in its open/close
> > > > methods, wouldn't we just be trading one form of the
> > > >  problem for another by making this switch?
> > >
> > > On thinking about this a bit more, I'm not so convinced that we need to
> > > expose the pre-transform / original topic partitions in the new open /
> > > close methods. The purpose of the open / close methods is to allow sink
> > > tasks to allocate and deallocate resources for each topic partition
> > > assigned to the task and the purpose of topic-mutating SMTs is to
> > > essentially modify the source topic name from the point of view of the
> > sink
> > > connector. Why would a sink connector ever need to or want to allocate
> > > resources for pre-transform topic partitions? Is the argument here that
> > > since we'll be exposing both the pre-transform and post-transform topic
> > > partitions per record, we should also expose the same info via open /
> > close
> > > and allow sink connector implementations to disregard topic-mutating
> SMTs
> > > completely if they wanted to?
> > >
> > > Either way, I've gone ahead and updated the KIP to reflect all of
> > > our previous discussion here since it had become quite outdated. I've
> > also
> > > updated the KIP title from "Sink Connectors: Support topic-mutating
> SMTs
> > > for async connectors (preCommit users)" to "Allow sink connectors to be
> > > used with topic-mutating SMTs" since the improvements to the open /
> close
> > > mechanism doesn't pertain only to asynchronous sink connectors. The new
> > KIP
> > > URL is:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs
> > >
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton <chr...@aiven.io.invalid
> >
> > > wrote:
> > >
> > > > Hi Yash,
> > > >
> > > > I was actually envisioning something like `void
> > > > open(Collection<TopicPartition>
> > > > originalPartitions, Collection<TopicPartition>
> transformedPartitions)`,
> > > > since we already convert and transform each batch of records that we
> > poll
> > > > from the sink task's consumer en masse, meaning we could discover
> > several
> > > > new transformed partitions in between consecutive calls to
> > SinkTask::put.
> > > >
> > > > It's also worth noting that we'll probably want to deprecate the
> > existing
> > > > open/close methods, at which point keeping one non-deprecated variant
> > of
> > > > each seems more appealing and less complex than keeping two.
> > > >
> > > > Honestly though, I think we're both on the same page enough that I
> > > wouldn't
> > > > object to either approach. We've probably reached the saturation
> point
> > > for
> > > > ROI here and as long as we provide developers a way to get the
> > > information
> > > > they need from the runtime and take care to add Javadocs and update
> our
> > > > docs page (possibly including the connector development quickstart),
> it
> > > > should be fine.
> > > >
> > > > At this point, it might be worth updating the KIP based on recent
> > > > discussion so that others can see the latest proposal, and we can
> both
> > > take
> > > > a look and make sure everything looks good enough before opening a
> vote
> > > > thread.
> > > >
> > > > Finally, I think you make a convincing case for a time-based eviction
> > > > policy. I wasn't thinking about the fairly common SMT pattern of
> > > deriving a
> > > > topic name from, e.g., a record field or header.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <yash.ma...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > > Plus, if a connector is intentionally designed to
> > > > > > use pre-transformation topic partitions in its
> > > > > > open/close methods, wouldn't we just be trading
> > > > > > one form of the problem for another by making this
> > > > > > switch?
> > > > >
> > > > > Thanks, this makes sense, and given that the KIP already proposes a
> > way
> > > > for
> > > > > sink connector implementations to distinguish between pre-transform
> > and
> > > > > post-transform topics per record, I think I'm convinced that going
> > with
> > > > new
> > > > > `open()` / `close()` methods is the right approach. However, I
> still
> > > feel
> > > > > like having overloaded methods will make it a lot less unintuitive
> > > given
> > > > > that the two sets of methods would be different in terms of when
> > > they're
> > > > > called and what arguments they are passed (also I'm presuming that
> > the
> > > > > overloaded methods you're prescribing will only have a single
> > > > > `TopicPartition` rather than a `Collection<TopicPartition>` as
> their
> > > > > parameters). I guess my concern is largely around the fact that it
> > > won't
> > > > be
> > > > > possible to distinguish between the overloaded methods' use cases
> > just
> > > > from
> > > > > the method signatures. I agree that naming is going to be difficult
> > > here,
> > > > > but I think that having two sets of `SinkTask::openXyz` /
> > > > > `SinkTask::closeXyz` methods will be less complicated to understand
> > > from
> > > > a
> > > > > connector developer perspective (as compared to overloaded methods
> > with
> > > > > only differing documentation). Of your suggested options, I think
> > > > > `openPreTransform` / `openPostTransform` are the most
> comprehensible
> > > > ones.
> > > > >
> > > > > > BTW, I wouldn't say that we can't make assumptions
> > > > > > about the relationships between pre- and post-transformation
> > > > > >  topic partitions.
> > > > >
> > > > > I meant that the framework wouldn't be able to deterministically
> know
> > > > when
> > > > > to close a post-transform topic partition given that SMTs could use
> > > > > per-record data / metadata to manipulate the topic names as and how
> > > > > required (which supports the suggestion to use an eviction policy
> > based
> > > > > mechanism to call SinkTask::close for post-transform topic
> > partitions).
> > > > >
> > > > > > We might utilize a policy that assumes a deterministic
> > > > > > mapping from the former to the latter, for example.
> > > > >
> > > > > Wouldn't this be making the assumption that SMTs only use the topic
> > > name
> > > > > itself and no other data / metadata while computing the new topic
> > name?
> > > > Are
> > > > > you suggesting that since this assumption could work for a majority
> > of
> > > > > SMTs, it might be more efficient overall in terms of reducing the
> > > number
> > > > of
> > > > > "false-positive" calls to `SinkTask::closePostTransform` (and we'll
> > > also
> > > > be
> > > > > able to call `SinkTask::closePostTransform` immediately after topic
> > > > > partitions are revoked from the consumer)? I was thinking something
> > > more
> > > > > generic along the lines of a simple time based eviction policy that
> > > > > wouldn't be making any assumptions regarding the SMT
> implementations.
> > > > > Either way, I do like your earlier suggestion of keeping this logic
> > > > > internal and not painting ourselves into a corner by promising any
> > > > > particular behavior in the KIP.
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton
> > <chr...@aiven.io.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Yash,
> > > > > >
> > > > > > I think the key difference between adding methods/overloads
> related
> > > to
> > > > > > SinkTask::open/SinkTask::close and SinkTask::put is that this
> isn't
> > > > > > auxiliary information that may or may not be useful to connector
> > > > > > developers. It's actually critical for them to understand the
> > > > difference
> > > > > > between the two concepts here, even if they look very similar.
> And
> > > > yes, I
> > > > > > do believe that switching from pre-transform to post-transform
> > topic
> > > > > > partitions is too big a change in behavior here. Plus, if a
> > connector
> > > > is
> > > > > > intentionally designed to use pre-transformation topic partitions
> > in
> > > > its
> > > > > > open/close methods, wouldn't we just be trading one form of the
> > > problem
> > > > > for
> > > > > > another by making this switch?
> > > > > >
> > > > > > One possible alternative to overloading the existing methods is
> to
> > > > split
> > > > > > SinkTask::open into openOriginal (or possibly openPhysical or
> > > > > > openPreTransform) and openTransformed (or openLogical or
> > > > > > openPostTransform), with a similar change for SinkTask::close.
> The
> > > > > default
> > > > > > implementation for SinkTask::openOriginal can be to call
> > > > SinkTask::open,
> > > > > > and the same can go for SinkTask::close. However, I prefer
> > > overloading
> > > > > the
> > > > > > existing methods since this alternative increases complexity and
> > none
> > > > of
> > > > > > the names are very informative.
> > > > > >
> > > > > > BTW, I wouldn't say that we can't make assumptions about the
> > > > > relationships
> > > > > > between pre- and post-transformation topic partitions. We might
> > > > utilize a
> > > > > > policy that assumes a deterministic mapping from the former to
> the
> > > > > latter,
> > > > > > for example. The distinction I'd draw is that the assumptions we
> > make
> > > > can
> > > > > > and probably should favor some cases in terms of performance
> (i.e.,
> > > > > > reducing the number of unnecessary calls to close/open over a
> given
> > > > sink
> > > > > > task's lifetime), but should not lead to guaranteed resource
> leaks
> > or
> > > > > > failure to obey API contract in any cases.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya <
> yash.ma...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Chris,
> > > > > > >
> > > > > > > > especially if connectors are intentionally designed around
> > > > > > > > original topic partitions instead of transformed ones.
> > > > > > >
> > > > > > > Ha, that's a good point and reminds me of Hyrum's Law [1] :)
> > > > > > >
> > > > > > > > I think we have to provide connector developers with some
> > > > > > > > way to differentiate between the two, but maybe there's a way
> > > > > > > >  to do this that I haven't thought of yet
> > > > > > >
> > > > > > > I can't think of a better way to do this either; would invoking
> > the
> > > > > > > existing `SinkTask::open` and `SinkTask::close` methods with
> > > > > > post-transform
> > > > > > > topic partitions instead of pre-transform topic partitions not
> be
> > > > > > > acceptable even in a minor / major AK release? I feel like the
> > > > proposed
> > > > > > > approach of adding overloaded `SinkTask::open` /
> > `SinkTask::close`
> > > > > > methods
> > > > > > > to differentiate between pre-transform and post-transform topic
> > > > > > partitions
> > > > > > > has similar pitfalls to the idea of the overloaded
> > `SinkTask::put`
> > > > > method
> > > > > > > we discarded earlier.
> > > > > > >
> > > > > > > > Either way, I'm glad that the general idea of a cache and
> > > > > > > > eviction policy for SinkTask::close seem reasonable; if
> > > > > > > > we decide to go this route, it might make sense for the KIP
> > > > > > > > to include an outline of one or more high-level strategies
> > > > > > > > we might take, but without promising any particular behavior
> > > > > > > > beyond occasionally calling SinkTask::close for
> post-transform
> > > > > > > > topic partitions. I'm hoping that this logic can stay
> internal,
> > > > > > > > and by notpainting ourselves into a corner with the KIP, we
> > > > > > > > give ourselves leeway to tweak it in the future if necessary
> > > > > > > > without filing another KIP or introducing a pluggable
> > interface.
> > > > > > >
> > > > > > > Thanks, that's a good idea. Given the flexibility of SMTs, the
> > > > > framework
> > > > > > > can't really make any assumptions around topic partitions post
> > > > > > > transformation nor does it have any way to definitively get any
> > > such
> > > > > > > information from transformations which is why the idea of a
> cache
> > > > with
> > > > > an
> > > > > > > eviction policy makes perfect sense!
> > > > > > >
> > > > > > > [1] - https://www.hyrumslaw.com/
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yash
> > > > > > >
> > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton
> > > <chr...@aiven.io.invalid
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Yash,
> > > > > > > >
> > > > > > > > > So it looks like with the current state of affairs, sink
> > tasks
> > > > that
> > > > > > > only
> > > > > > > > instantiate writers in the SinkTask::open method (and don't
> do
> > > the
> > > > > lazy
> > > > > > > > instantiation in SinkTask::put that you mentioned) might fail
> > > when
> > > > > used
> > > > > > > > with topic/partition mutating SMTs even if they don't do any
> > > > > > asynchronous
> > > > > > > > processing?
> > > > > > > >
> > > > > > > > Yep, exactly 👍
> > > > > > > >
> > > > > > > > > What do you think about retaining just the existing methods
> > > > > > > > but changing when they're called in the Connect runtime? For
> > > > > instance,
> > > > > > > > instead of calling SinkTask::open after partition assignment
> > > post a
> > > > > > > > consumer group rebalance, we could cache the currently "seen"
> > > topic
> > > > > > > > partitions (post transformation) and before each call to
> > > > > SinkTask::put
> > > > > > > > check whether there's any new "unseen" topic partitions, and
> if
> > > so
> > > > > call
> > > > > > > > SinkTask::open (and also update the cache of course).
> > > > > > > >
> > > > > > > > IMO the issue here is that it's a drastic change in behavior
> to
> > > > start
> > > > > > > > invoking SinkTask::open and SinkTask::close with
> post-transform
> > > > topic
> > > > > > > > partitions instead of pre-transform, especially if connectors
> > are
> > > > > > > > intentionally designed around original topic partitions
> instead
> > > of
> > > > > > > > transformed ones. I think we have to provide connector
> > developers
> > > > > with
> > > > > > > some
> > > > > > > > way to differentiate between the two, but maybe there's a way
> > to
> > > do
> > > > > > this
> > > > > > > > that I haven't thought of yet. Interested to hear your
> > thoughts.
> > > > > > > >
> > > > > > > > Either way, I'm glad that the general idea of a cache and
> > > eviction
> > > > > > policy
> > > > > > > > for SinkTask::close seem reasonable; if we decide to go this
> > > route,
> > > > > it
> > > > > > > > might make sense for the KIP to include an outline of one or
> > more
> > > > > > > > high-level strategies we might take, but without promising
> any
> > > > > > particular
> > > > > > > > behavior beyond occasionally calling SinkTask::close for
> > > > > post-transform
> > > > > > > > topic partitions. I'm hoping that this logic can stay
> internal,
> > > and
> > > > > by
> > > > > > > not
> > > > > > > > painting ourselves into a corner with the KIP, we give
> > ourselves
> > > > > leeway
> > > > > > > to
> > > > > > > > tweak it in the future if necessary without filing another
> KIP
> > or
> > > > > > > > introducing a pluggable interface.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <
> > yash.ma...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Chris,
> > > > > > > > >
> > > > > > > > > Thanks for the feedback.
> > > > > > > > >
> > > > > > > > > 1) That's a fair point; while I did scan everything
> publicly
> > > > > > available
> > > > > > > on
> > > > > > > > > GitHub, you're right in that it won't cover all possible
> SMTs
> > > > that
> > > > > > are
> > > > > > > > out
> > > > > > > > > there. Thanks for the example use-case as well, I've
> updated
> > > the
> > > > > KIP
> > > > > > to
> > > > > > > > add
> > > > > > > > > the two new proposed methods.
> > > > > > > > >
> > > > > > > > > 2) So it looks like with the current state of affairs, sink
> > > tasks
> > > > > > that
> > > > > > > > only
> > > > > > > > > instantiate writers in the SinkTask::open method (and don't
> > do
> > > > the
> > > > > > lazy
> > > > > > > > > instantiation in SinkTask::put that you mentioned) might
> fail
> > > > when
> > > > > > used
> > > > > > > > > with topic/partition mutating SMTs even if they don't do
> any
> > > > > > > asynchronous
> > > > > > > > > processing? Since they could encounter records in
> > SinkTask::put
> > > > > with
> > > > > > > > > topics/partitions that they might not have created writers
> > for.
> > > > > > Thanks
> > > > > > > > for
> > > > > > > > > pointing this out, it's definitely another incompatibility
> > that
> > > > > needs
> > > > > > > to
> > > > > > > > be
> > > > > > > > > called out and fixed. The overloaded method approach is
> > > > > interesting,
> > > > > > > but
> > > > > > > > > comes with the caveat of yet more new methods that will
> need
> > to
> > > > be
> > > > > > > > > implemented by existing connectors if they want to make use
> > of
> > > > this
> > > > > > new
> > > > > > > > > functionality. What do you think about retaining just the
> > > > existing
> > > > > > > > methods
> > > > > > > > > but changing when they're called in the Connect runtime?
> For
> > > > > > instance,
> > > > > > > > > instead of calling SinkTask::open after partition
> assignment
> > > > post a
> > > > > > > > > consumer group rebalance, we could cache the currently
> "seen"
> > > > topic
> > > > > > > > > partitions (post transformation) and before each call to
> > > > > > SinkTask::put
> > > > > > > > > check whether there's any new "unseen" topic partitions,
> and
> > if
> > > > so
> > > > > > call
> > > > > > > > > SinkTask::open (and also update the cache of course). I
> don't
> > > > think
> > > > > > > this
> > > > > > > > > would break the existing contract with sink tasks where
> > > > > > SinkTask::open
> > > > > > > is
> > > > > > > > > expected to be called for a topic partition before any
> > records
> > > > from
> > > > > > the
> > > > > > > > > topic partition are sent via SinkTask::put? The
> > SinkTask::close
> > > > > case
> > > > > > > is a
> > > > > > > > > lot trickier however, and would require some sort of cache
> > > > eviction
> > > > > > > > policy
> > > > > > > > > that would be deemed appropriate as you pointed out too.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yash
> > > > > > > > >
> > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris Egerton
> > > > > > <chr...@aiven.io.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Yash,
> > > > > > > > > >
> > > > > > > > > > I've had some time to think on this KIP and I think I'm
> in
> > > > > > agreement
> > > > > > > > > about
> > > > > > > > > > not blocking it on an official compatibility library or
> > > adding
> > > > > the
> > > > > > > > "ack"
> > > > > > > > > > API for sink records.
> > > > > > > > > >
> > > > > > > > > > I only have two more thoughts:
> > > > > > > > > >
> > > > > > > > > > 1. Because it is possible to manipulate sink record
> > > partitions
> > > > > and
> > > > > > > > > offsets
> > > > > > > > > > with the current API we provide for transformations, I
> > still
> > > > > > believe
> > > > > > > > > > methods should be added to the SinkRecord class to expose
> > the
> > > > > > > original
> > > > > > > > > > partition and offset, not just the original topic. The
> > > > additional
> > > > > > > > > cognitive
> > > > > > > > > > burden from these two methods is going to be minimal
> > anyways;
> > > > > once
> > > > > > > > users
> > > > > > > > > > understand the difference between the transformed topic
> > name
> > > > and
> > > > > > the
> > > > > > > > > > original one, it's going to be trivial for them to
> > understand
> > > > how
> > > > > > > that
> > > > > > > > > same
> > > > > > > > > > difference applies for partitions and offsets. It's not
> > > enough
> > > > to
> > > > > > > scan
> > > > > > > > > the
> > > > > > > > > > set of SMTs provided out of the box with Connect, ones
> > > > developed
> > > > > by
> > > > > > > > > > Confluent, or even everything available on GitHub, since
> > > there
> > > > > may
> > > > > > be
> > > > > > > > > > closed-source projects out there that rely on this
> ability.
> > > One
> > > > > > > > potential
> > > > > > > > > > use case could be re-routing partitions between Kafka and
> > > some
> > > > > > other
> > > > > > > > > > sharded system.
> > > > > > > > > >
> > > > > > > > > > 2. We still have to address the SinkTask::open [1] and
> > > > > > > SinkTask::close
> > > > > > > > > [2]
> > > > > > > > > > methods. If a connector writes to the external system
> using
> > > the
> > > > > > > > > transformed
> > > > > > > > > > topic partitions it reads from Kafka, then it's possible
> > for
> > > > the
> > > > > > > > > connector
> > > > > > > > > > to lazily instantiate writers for topic partitions as it
> > > > > encounters
> > > > > > > > them
> > > > > > > > > > from records provided in SinkTask::put. However,
> connectors
> > > > also
> > > > > > > need a
> > > > > > > > > way
> > > > > > > > > > to de-allocate those writers (and the resources used by
> > them)
> > > > > over
> > > > > > > > time,
> > > > > > > > > > which they can't do as easily. One possible approach here
> > is
> > > to
> > > > > > > > overload
> > > > > > > > > > SinkTask::open and SinkTask::close with variants that
> > > > distinguish
> > > > > > > > between
> > > > > > > > > > transformed and original topic partitions, and default to
> > > > > invoking
> > > > > > > the
> > > > > > > > > > existing methods with just the original topic partitions.
> > We
> > > > > would
> > > > > > > then
> > > > > > > > > > have several options for how the Connect runtime can
> invoke
> > > > these
> > > > > > > > > methods,
> > > > > > > > > > but in general, an approach that guarantees that tasks
> are
> > > > > notified
> > > > > > > of
> > > > > > > > > > transformed topic partitions in SinkTask::open before any
> > > > records
> > > > > > for
> > > > > > > > > that
> > > > > > > > > > partition are given to it in SinkTask::put, and makes a
> > > > > best-effort
> > > > > > > > > attempt
> > > > > > > > > > to close transformed topic partitions that appear to no
> > > longer
> > > > be
> > > > > > in
> > > > > > > > use
> > > > > > > > > > based on some eviction policy, would probably be
> > sufficient.
> > > > > > > > > >
> > > > > > > > > > [1] -
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection)
> > > > > > > > > > [2] -
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Chris
> > > > > > > > > >
> > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash Mayya <
> > > > yash.ma...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Chris,
> > > > > > > > > > >
> > > > > > > > > > > Thanks a lot for your inputs!
> > > > > > > > > > >
> > > > > > > > > > > > would provide a simple, clean interface for
> developers
> > to
> > > > > > > determine
> > > > > > > > > > > > which features are supported by the version of the
> > > Connect
> > > > > > > runtime
> > > > > > > > > > > > that their plugin has been deployed onto
> > > > > > > > > > >
> > > > > > > > > > > I do like the idea of having such a public
> compatibility
> > > > > library
> > > > > > -
> > > > > > > I
> > > > > > > > > > think
> > > > > > > > > > > it would remove a lot of restrictions from framework
> > > > > development
> > > > > > if
> > > > > > > > it
> > > > > > > > > > were
> > > > > > > > > > > to be widely adopted.
> > > > > > > > > > >
> > > > > > > > > > > > we might consider adding an API to "ack" sink records
> > > > > > > > > > >
> > > > > > > > > > > I agree that this does seem like a more intuitive and
> > clean
> > > > > API,
> > > > > > > but
> > > > > > > > > I'm
> > > > > > > > > > > concerned about the backward compatibility headache
> we'd
> > be
> > > > > > > imposing
> > > > > > > > on
> > > > > > > > > > all
> > > > > > > > > > > existing sink connectors. Connector developers will
> have
> > to
> > > > > > > maintain
> > > > > > > > > two
> > > > > > > > > > > separate ways of doing offset management if they want
> to
> > > use
> > > > > the
> > > > > > > new
> > > > > > > > > API
> > > > > > > > > > > but continue supporting older versions of Kafka
> Connect.
> > > > > > > > > > >
> > > > > > > > > > > For now, I've reverted the KIP to the previous
> iteration
> > > > which
> > > > > > > > proposed
> > > > > > > > > > the
> > > > > > > > > > > addition of a new `SinkRecord` method to obtain the
> > > original
> > > > > > Kafka
> > > > > > > > > topic
> > > > > > > > > > > pre-transformation. One thing to note is that I've
> > removed
> > > > the
> > > > > > > method
> > > > > > > > > for
> > > > > > > > > > > obtaining the original Kafka partition after a cursory
> > > search
> > > > > > > showed
> > > > > > > > > that
> > > > > > > > > > > use cases for partition modifying SMTs are primarily on
> > the
> > > > > > source
> > > > > > > > > > > connector side.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yash
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris Egerton
> > > > > > > <chr...@aiven.io.invalid
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi all,
> > > > > > > > > > > >
> > > > > > > > > > > > I have more comments I'd like to make on this KIP
> when
> > I
> > > > have
> > > > > > > time
> > > > > > > > > > (sorry
> > > > > > > > > > > > for the delay, Yash, and thanks for your patience!),
> > but
> > > I
> > > > > did
> > > > > > > want
> > > > > > > > > to
> > > > > > > > > > > > chime in and say that I'm also not sure about
> > overloading
> > > > > > > > > > SinkTask::put.
> > > > > > > > > > > I
> > > > > > > > > > > > share the concerns about creating an intuitive,
> simple
> > > API
> > > > > that
> > > > > > > > Yash
> > > > > > > > > > has
> > > > > > > > > > > > raised. In addition, this approach doesn't seem very
> > > > > > > > > sustainable--what
> > > > > > > > > > do
> > > > > > > > > > > > we do if we encounter another case in the future that
> > > would
> > > > > > > > warrant a
> > > > > > > > > > > > similar solution? We probably don't want to create
> > three,
> > > > > four,
> > > > > > > > etc.
> > > > > > > > > > > > overloaded variants of the method, each of which
> would
> > > have
> > > > > to
> > > > > > be
> > > > > > > > > > > > implemented by connector developers who want to both
> > > > leverage
> > > > > > the
> > > > > > > > > > latest
> > > > > > > > > > > > and greatest connector APIs and maintain
> compatibility
> > > with
> > > > > > > connect
> > > > > > > > > > > > Clusters running older versions.
> > > > > > > > > > > >
> > > > > > > > > > > > I haven't been able to flesh this out into a design
> > worth
> > > > > > > > publishing
> > > > > > > > > in
> > > > > > > > > > > its
> > > > > > > > > > > > own KIP yet, but one alternative I've pitched to a
> few
> > > > people
> > > > > > > with
> > > > > > > > > > > > generally positive interest has been to develop an
> > > official
> > > > > > > > > > compatibility
> > > > > > > > > > > > library for Connect developers. This library would be
> > > > > released
> > > > > > as
> > > > > > > > its
> > > > > > > > > > own
> > > > > > > > > > > > Maven artifact (separate from connect-api,
> > > connect-runtime,
> > > > > > etc.)
> > > > > > > > and
> > > > > > > > > > > would
> > > > > > > > > > > > provide a simple, clean interface for developers to
> > > > determine
> > > > > > > which
> > > > > > > > > > > > features are supported by the version of the Connect
> > > > runtime
> > > > > > that
> > > > > > > > > their
> > > > > > > > > > > > plugin has been deployed onto. Under the hood, this
> > > library
> > > > > > might
> > > > > > > > use
> > > > > > > > > > > > reflection to determine whether classes, methods,
> etc.
> > > are
> > > > > > > > available,
> > > > > > > > > > but
> > > > > > > > > > > > the developer wouldn't have to do anything more than
> > > check
> > > > > (for
> > > > > > > > > > example)
> > > > > > > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()`
> > to
> > > > know
> > > > > > at
> > > > > > > > any
> > > > > > > > > > > point
> > > > > > > > > > > > in the lifetime of their connector/task whether that
> > > > feature
> > > > > is
> > > > > > > > > > provided
> > > > > > > > > > > by
> > > > > > > > > > > > the runtime.
> > > > > > > > > > > >
> > > > > > > > > > > > One other high-level comment: this doesn't address
> > every
> > > > > case,
> > > > > > > but
> > > > > > > > we
> > > > > > > > > > > might
> > > > > > > > > > > > consider adding an API to "ack" sink records. This
> > could
> > > > use
> > > > > > the
> > > > > > > > > > > > SubmittedRecords class [1] (with some slight tweaks)
> > > under
> > > > > the
> > > > > > > hood
> > > > > > > > > to
> > > > > > > > > > > > track the latest-acked offset for each topic
> partition.
> > > > This
> > > > > > way,
> > > > > > > > > > > connector
> > > > > > > > > > > > developers won't be responsible for tracking offsets
> at
> > > all
> > > > > in
> > > > > > > > their
> > > > > > > > > > sink
> > > > > > > > > > > > tasks (eliminating issues with the accuracy of
> > > > > > > post-transformation
> > > > > > > > > > T/P/O
> > > > > > > > > > > > sink record information), and they'll only have to
> > notify
> > > > the
> > > > > > > > Connect
> > > > > > > > > > > > framework when a record has been successfully
> > dispatched
> > > to
> > > > > the
> > > > > > > > > > external
> > > > > > > > > > > > system. This provides a cleaner, friendlier API, and
> > also
> > > > > > enables
> > > > > > > > > more
> > > > > > > > > > > > fine-grained metrics like the ones proposed in
> KIP-767
> > > [2].
> > > > > > > > > > > >
> > > > > > > > > > > > [1] -
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
> > > > > > > > > > > > [2] -
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Chris
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash Mayya <
> > > > > > yash.ma...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > >
> > > > > > > > > > > > > It's been a while for this one but the more I think
> > > about
> > > > > it,
> > > > > > > the
> > > > > > > > > > more
> > > > > > > > > > > I
> > > > > > > > > > > > > feel like the current approach with a new
> overloaded
> > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > method
> > > > > > > > > > > > > might not be optimal. We're trying to fix a pretty
> > > corner
> > > > > > case
> > > > > > > > bug
> > > > > > > > > > here
> > > > > > > > > > > > > (usage of topic mutating SMTs with sink connectors
> > that
> > > > do
> > > > > > > their
> > > > > > > > > own
> > > > > > > > > > > > offset
> > > > > > > > > > > > > tracking) and I'm not sure that warrants a change
> to
> > > > such a
> > > > > > > > central
> > > > > > > > > > > > > interface method. The new `SinkTask::put` method
> just
> > > > seems
> > > > > > > > > somewhat
> > > > > > > > > > > odd
> > > > > > > > > > > > > and it may not be very understandable for a new
> > reader
> > > -
> > > > I
> > > > > > > don't
> > > > > > > > > > think
> > > > > > > > > > > > this
> > > > > > > > > > > > > should be the case for a public interface method.
> > > > > > Furthermore,
> > > > > > > > even
> > > > > > > > > > > with
> > > > > > > > > > > > > elaborate documentation in place, I'm not sure if
> > it'll
> > > > be
> > > > > > very
> > > > > > > > > > obvious
> > > > > > > > > > > > to
> > > > > > > > > > > > > most people what the purpose of having these two
> > `put`
> > > > > > methods
> > > > > > > is
> > > > > > > > > and
> > > > > > > > > > > how
> > > > > > > > > > > > > they should be used by sink task implementations.
> > What
> > > do
> > > > > you
> > > > > > > > > think?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yash
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <
> > > > > > > yash.ma...@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks a lot for your valuable feedback so far!
> > I've
> > > > > > updated
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > > > based
> > > > > > > > > > > > > > on our discussion above. Could you please take
> > > another
> > > > > > look?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <
> > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <
> > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Hi Randall,
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Thanks for elaborating. I think these are all
> > very
> > > > > good
> > > > > > > > points
> > > > > > > > > > > and I
> > > > > > > > > > > > > see
> > > > > > > > > > > > > >> > why the overloaded `SinkTask::put` method is a
> > > > cleaner
> > > > > > > > > solution
> > > > > > > > > > > > > overall.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > public void put(Collection<SinkRecord>
> > records,
> > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > >> > TopicPartition> updatedTopicPartitions)
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > I think this should be
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > `public void put(Collection<SinkRecord>
> records,
> > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > >> > TopicPartition> originalTopicPartitions)`
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > instead because the sink records themselves
> have
> > > the
> > > > > > > updated
> > > > > > > > > > topic
> > > > > > > > > > > > > >> > partitions (i.e. after all transformations
> have
> > > been
> > > > > > > > applied)
> > > > > > > > > > and
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> KIP
> > > > > > > > > > > > > >> > is proposing a way for the tasks to be able to
> > > > access
> > > > > > the
> > > > > > > > > > original
> > > > > > > > > > > > > topic
> > > > > > > > > > > > > >> > partition (i.e. before transformations have
> been
> > > > > > applied).
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Sounds good.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > Of course, if the developer does not need
> > > separate
> > > > > > > > methods,
> > > > > > > > > > they
> > > > > > > > > > > > can
> > > > > > > > > > > > > >> > easily have the older `put` method simply
> > delegate
> > > > to
> > > > > > the
> > > > > > > > > newer
> > > > > > > > > > > > > method.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > If the developer does not need separate
> methods
> > > > (i.e.
> > > > > > they
> > > > > > > > > don't
> > > > > > > > > > > > need
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> > use this new addition), they can simply
> continue
> > > > > > > > implementing
> > > > > > > > > > just
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> > older `put` method right?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Correct. We should update the JavaDoc of both
> > > methods
> > > > to
> > > > > > > make
> > > > > > > > > this
> > > > > > > > > > > > > clear,
> > > > > > > > > > > > > >> and in general how the two methods should are
> used
> > > and
> > > > > > > should
> > > > > > > > be
> > > > > > > > > > > > > >> implemented. That can be part of the PR, and the
> > KIP
> > > > > > doesn't
> > > > > > > > > need
> > > > > > > > > > > this
> > > > > > > > > > > > > >> wording.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for
> > > *eventually*
> > > > > > > > > deprecating
> > > > > > > > > > > the
> > > > > > > > > > > > > >> older
> > > > > > > > > > > > > >> > method, once the Connect runtime versions
> > without
> > > > this
> > > > > > > > change
> > > > > > > > > > are
> > > > > > > > > > > > old
> > > > > > > > > > > > > >> > enough.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > I'm not sure we'd ever want to deprecate the
> > older
> > > > > > method.
> > > > > > > > > Most
> > > > > > > > > > > > common
> > > > > > > > > > > > > >> sink
> > > > > > > > > > > > > >> > connector implementations do not do their own
> > > offset
> > > > > > > > tracking
> > > > > > > > > > with
> > > > > > > > > > > > > >> > asynchronous processing and will probably
> never
> > > > have a
> > > > > > > need
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > additional parameter `Map<SinkRecord,
> > > > TopicPartition>
> > > > > > > > > > > > > >> > originalTopicPartitions` in the proposed new
> > `put`
> > > > > > method.
> > > > > > > > > These
> > > > > > > > > > > > > >> connectors
> > > > > > > > > > > > > >> > can continue implementing only the existing
> > > > > > > `SinkTask::put`
> > > > > > > > > > method
> > > > > > > > > > > > > which
> > > > > > > > > > > > > >> > will be called by the default implementation
> of
> > > the
> > > > > > newer
> > > > > > > > > > > overloaded
> > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > >> > method.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> +1
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > the pre-commit methods use the same
> > > > > > `Map<TopicPartition,
> > > > > > > > > > > > > >> > OffsetAndMetadata> currentOffsets` data
> > structure
> > > > I'm
> > > > > > > > > suggesting
> > > > > > > > > > > be
> > > > > > > > > > > > > >> used.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > The data structure you're suggesting be used
> is
> > a
> > > > > > > > > > `Map<SinkRecord,
> > > > > > > > > > > > > >> > TopicPartition>` which will map `SinkRecord`
> > > objects
> > > > > to
> > > > > > > the
> > > > > > > > > > > original
> > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > >> > partition of the corresponding
> `ConsumerRecord`
> > > > right?
> > > > > > To
> > > > > > > > > > clarify,
> > > > > > > > > > > > > this
> > > > > > > > > > > > > >> is
> > > > > > > > > > > > > >> > a new data structure that will need to be
> > managed
> > > in
> > > > > the
> > > > > > > > > > > > > >> `WorkerSinkTask`.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Ah, you're right. Thanks for the correction.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Best regards,
> > > > > > > > > > > > > >> Randall
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > >> > Yash
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch <
> > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > Hi, Yash.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > I'm not sure I quite understand why it would
> > be
> > > > > > "easier"
> > > > > > > > for
> > > > > > > > > > > > > connector
> > > > > > > > > > > > > >> > > > developers to account for implementing two
> > > > > different
> > > > > > > > > > > overloaded
> > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > >> > > > methods (assuming that they want to use
> this
> > > new
> > > > > > > > feature)
> > > > > > > > > > > versus
> > > > > > > > > > > > > >> using
> > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access
> > > > > methods?
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Using a try-catch to try around an API
> method
> > > that
> > > > > > > *might*
> > > > > > > > > be
> > > > > > > > > > > > there
> > > > > > > > > > > > > >> is a
> > > > > > > > > > > > > >> > > very unusual thing for most developers.
> > > > > Unfortunately,
> > > > > > > > we've
> > > > > > > > > > had
> > > > > > > > > > > > to
> > > > > > > > > > > > > >> > resort
> > > > > > > > > > > > > >> > > to this atypical approach with Connect in
> > places
> > > > > when
> > > > > > > > there
> > > > > > > > > > was
> > > > > > > > > > > no
> > > > > > > > > > > > > >> good
> > > > > > > > > > > > > >> > > alternative. We seem to relying upon pattern
> > > > because
> > > > > > > it's
> > > > > > > > > > easier
> > > > > > > > > > > > for
> > > > > > > > > > > > > >> us,
> > > > > > > > > > > > > >> > > not because it offers a better experience
> for
> > > > > > Connector
> > > > > > > > > > > > developers.
> > > > > > > > > > > > > >> IMO,
> > > > > > > > > > > > > >> > if
> > > > > > > > > > > > > >> > > there's a practical alternative that uses
> > normal
> > > > > > > > development
> > > > > > > > > > > > > practices
> > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > >> > > techniques, then we should use that
> > alternative.
> > > > > IIUC,
> > > > > > > > there
> > > > > > > > > > is
> > > > > > > > > > > at
> > > > > > > > > > > > > >> least
> > > > > > > > > > > > > >> > > one practical alternative for this KIP that
> > > would
> > > > > not
> > > > > > > > > require
> > > > > > > > > > > > > >> developers
> > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > >> > > use the unusual try-catch to handle the case
> > > where
> > > > > > > methods
> > > > > > > > > are
> > > > > > > > > > > not
> > > > > > > > > > > > > >> found.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > I also think having two `put` methods is
> > easier
> > > > when
> > > > > > the
> > > > > > > > > > > Connector
> > > > > > > > > > > > > >> has to
> > > > > > > > > > > > > >> > > do different things for different Connect
> > > > runtimes,
> > > > > > too.
> > > > > > > > One
> > > > > > > > > > of
> > > > > > > > > > > > > those
> > > > > > > > > > > > > >> > > methods is called by newer Connect runtimes
> > with
> > > > the
> > > > > > new
> > > > > > > > > > > behavior,
> > > > > > > > > > > > > and
> > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > >> > > other method is called by an older Connect
> > > > runtime.
> > > > > Of
> > > > > > > > > course,
> > > > > > > > > > > if
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> > > developer does not need separate methods,
> they
> > > can
> > > > > > > easily
> > > > > > > > > have
> > > > > > > > > > > the
> > > > > > > > > > > > > >> older
> > > > > > > > > > > > > >> > > `put` method simply delegate to the newer
> > > method.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Finally, this gives us a roadmap for
> > > *eventually*
> > > > > > > > > deprecating
> > > > > > > > > > > the
> > > > > > > > > > > > > >> older
> > > > > > > > > > > > > >> > > method, once the Connect runtime versions
> > > without
> > > > > this
> > > > > > > > > change
> > > > > > > > > > > are
> > > > > > > > > > > > > old
> > > > > > > > > > > > > >> > > enough.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > I think the advantage of going with the
> > > > > > > > > > > > > >> > > > proposed approach in the KIP is that it
> > > wouldn't
> > > > > > > require
> > > > > > > > > > extra
> > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in
> your
> > > > > proposed
> > > > > > > > > > approach)
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > The connector does have to do some of this
> > > > > bookkeeping
> > > > > > > in
> > > > > > > > > how
> > > > > > > > > > > they
> > > > > > > > > > > > > >> track
> > > > > > > > > > > > > >> > > the topic partition offsets used in the
> > > > `preCommit`,
> > > > > > and
> > > > > > > > the
> > > > > > > > > > > > > >> pre-commit
> > > > > > > > > > > > > >> > > methods use the same `Map<TopicPartition,
> > > > > > > > OffsetAndMetadata>
> > > > > > > > > > > > > >> > > currentOffsets`
> > > > > > > > > > > > > >> > > data structure I'm suggesting be used.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > I hope that helps.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Best regards,
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Randall
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya <
> > > > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > Hi Randall,
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > Thanks for reviewing the KIP!
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > > That latter logic can get quite ugly.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > I'm not sure I quite understand why it
> would
> > > be
> > > > > > > "easier"
> > > > > > > > > for
> > > > > > > > > > > > > >> connector
> > > > > > > > > > > > > >> > > > developers to account for implementing two
> > > > > different
> > > > > > > > > > > overloaded
> > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > >> > > > methods (assuming that they want to use
> this
> > > new
> > > > > > > > feature)
> > > > > > > > > > > versus
> > > > > > > > > > > > > >> using
> > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > >> > > > try-catch block around `SinkRecord` access
> > > > > methods?
> > > > > > In
> > > > > > > > > both
> > > > > > > > > > > > > cases, a
> > > > > > > > > > > > > >> > > > connector developer would need to write
> > > > additional
> > > > > > > code
> > > > > > > > in
> > > > > > > > > > > order
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> > > ensure
> > > > > > > > > > > > > >> > > > that their connector continues working
> with
> > > > older
> > > > > > > > Connect
> > > > > > > > > > > > > runtimes.
> > > > > > > > > > > > > >> > > > Furthermore, we would probably need to
> > > carefully
> > > > > > > > document
> > > > > > > > > > how
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> > > > implementation for the older `put` method
> > > should
> > > > > > look
> > > > > > > > like
> > > > > > > > > > for
> > > > > > > > > > > > > >> > connectors
> > > > > > > > > > > > > >> > > > that want to use this new feature. I think
> > the
> > > > > > > advantage
> > > > > > > > > of
> > > > > > > > > > > > going
> > > > > > > > > > > > > >> with
> > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > >> > > > proposed approach in the KIP is that it
> > > wouldn't
> > > > > > > require
> > > > > > > > > > extra
> > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > >> > > > TopicPartition> in `WorkerSinkTask` in
> your
> > > > > proposed
> > > > > > > > > > approach)
> > > > > > > > > > > > and
> > > > > > > > > > > > > >> also
> > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > >> > > > fact that the try-catch based logic is an
> > > > already
> > > > > > > > > > established
> > > > > > > > > > > > > >> pattern
> > > > > > > > > > > > > >> > > > through
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > > > > > > > > > > > > >> > > > and other KIPs which added methods to
> > > > source/sink
> > > > > > > > > > > connector/task
> > > > > > > > > > > > > >> > > contexts.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > Let me know if you still feel that having
> a
> > > new
> > > > > > > > overloaded
> > > > > > > > > > put
> > > > > > > > > > > > > >> method
> > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > >> > > a
> > > > > > > > > > > > > >> > > > cleaner solution and I'd be happy to
> > > reconsider!
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > >> > > > Yash
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall
> > > Hauch <
> > > > > > > > > > > > rha...@gmail.com>
> > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for picking up this KIP
> > and
> > > > > > > > discussion.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > The KIP includes this rejected
> > alternative:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put in any way to
> > pass
> > > > the
> > > > > > new
> > > > > > > > > > > > information
> > > > > > > > > > > > > >> > outside
> > > > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map or a derived
> > class)
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > >    -
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > >    Much more disruptive change without
> > > > > > > considerable
> > > > > > > > > pros
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > One advantage about doing this is that
> > sink
> > > > > > > connector
> > > > > > > > > > > > > >> implementations
> > > > > > > > > > > > > >> > > can
> > > > > > > > > > > > > >> > > > > more easily implement two different
> > > "put(...)"
> > > > > > > methods
> > > > > > > > > to
> > > > > > > > > > > > handle
> > > > > > > > > > > > > >> > > running
> > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > >> > > > > a variety of runtimes, without having to
> > use
> > > > > > > try-catch
> > > > > > > > > > logic
> > > > > > > > > > > > > >> around
> > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > >> > > > > newer SinkRecord access methods. That
> > latter
> > > > > logic
> > > > > > > can
> > > > > > > > > get
> > > > > > > > > > > > quite
> > > > > > > > > > > > > >> > ugly.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > For example, the existing `put` method
> has
> > > > this
> > > > > > > > > signature:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > public abstract void
> > > > put(Collection<SinkRecord>
> > > > > > > > > records);
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > If we added an overloaded method that
> > passed
> > > > in
> > > > > a
> > > > > > > map
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > old
> > > > > > > > > > > > > >> > > > > topic+partition for each record (and
> > defined
> > > > the
> > > > > > > > absence
> > > > > > > > > > of
> > > > > > > > > > > an
> > > > > > > > > > > > > >> entry
> > > > > > > > > > > > > >> > as
> > > > > > > > > > > > > >> > > > > having an unchanged topic and
> partition):
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord>
> > > > records,
> > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions)
> {
> > > > > > > > > > > > > >> > > > > put(records);
> > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > then a `SinkTask` implementation that
> > wants
> > > to
> > > > > use
> > > > > > > > this
> > > > > > > > > > new
> > > > > > > > > > > > > >> feature
> > > > > > > > > > > > > >> > > could
> > > > > > > > > > > > > >> > > > > simply implement both methods:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord>
> > > > records)
> > > > > {
> > > > > > > > > > > > > >> > > > > // Running in an older runtime, so no
> > > tracking
> > > > > of
> > > > > > > > > > > SMT-modified
> > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > >> > > > names
> > > > > > > > > > > > > >> > > > > or partitions
> > > > > > > > > > > > > >> > > > > put(records, Map.of());
> > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > public void put(Collection<SinkRecord>
> > > > records,
> > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > >> > > > > TopicPartition> updatedTopicPartitions)
> {
> > > > > > > > > > > > > >> > > > > // real logic here
> > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > This seems a lot easier than having to
> use
> > > > > > try-catch
> > > > > > > > > > logic,
> > > > > > > > > > > > yet
> > > > > > > > > > > > > >> still
> > > > > > > > > > > > > >> > > > > allows sink connectors to utilize the
> new
> > > > > > > > functionality
> > > > > > > > > > and
> > > > > > > > > > > > > still
> > > > > > > > > > > > > >> > work
> > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > >> > > > > older Connect runtimes.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > WDYT?
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Randall
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash
> Mayya
> > <
> > > > > > > > > > > > yash.ma...@gmail.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > > Hi all,
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > I would like to (re)start a new
> > discussion
> > > > > > thread
> > > > > > > on
> > > > > > > > > > > KIP-793
> > > > > > > > > > > > > >> (Kafka
> > > > > > > > > > > > > >> > > > > > Connect) which proposes some additions
> > to
> > > > the
> > > > > > > public
> > > > > > > > > > > > > SinkRecord
> > > > > > > > > > > > > >> > > > interface
> > > > > > > > > > > > > >> > > > > > in order to support topic mutating
> SMTs
> > > for
> > > > > sink
> > > > > > > > > > > connectors
> > > > > > > > > > > > > >> that do
> > > > > > > > > > > > > >> > > > their
> > > > > > > > > > > > > >> > > > > > own offset tracking.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Links:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > KIP:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Older discussion thread:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >>
> > > > > > > > >
> > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h
> > > > ,
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >>
> > > > > > > > >
> > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Jira:
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13431
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > >> > > > > > Yash
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to