Hi Randall,

It's been a while for this one but the more I think about it, the more I
feel like the current approach with a new overloaded `SinkTask::put` method
might not be optimal. We're trying to fix a pretty corner case bug here
(usage of topic mutating SMTs with sink connectors that do their own offset
tracking) and I'm not sure that warrants a change to such a central
interface method. The new `SinkTask::put` method just seems somewhat odd
and it may not be very understandable for a new reader - I don't think this
should be the case for a public interface method. Furthermore, even with
elaborate documentation in place, I'm not sure if it'll be very obvious to
most people what the purpose of having these two `put` methods is and how
they should be used by sink task implementations. What do you think?

Thanks,
Yash

On Mon, Oct 10, 2022 at 9:33 PM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Randall,
>
> Thanks a lot for your valuable feedback so far! I've updated the KIP based
> on our discussion above. Could you please take another look?
>
> Thanks,
> Yash
>
> On Tue, Oct 4, 2022 at 12:40 AM Randall Hauch <rha...@gmail.com> wrote:
>
>> On Mon, Oct 3, 2022 at 11:45 AM Yash Mayya <yash.ma...@gmail.com> wrote:
>>
>> > Hi Randall,
>> >
>> > Thanks for elaborating. I think these are all very good points and I see
>> > why the overloaded `SinkTask::put` method is a cleaner solution overall.
>> >
>> > > public void put(Collection<SinkRecord> records, Map<SinkRecord,
>> > TopicPartition> updatedTopicPartitions)
>> >
>> > I think this should be
>> >
>> > `public void put(Collection<SinkRecord> records, Map<SinkRecord,
>> > TopicPartition> originalTopicPartitions)`
>> >
>> > instead because the sink records themselves have the updated topic
>> > partitions (i.e. after all transformations have been applied) and the
>> KIP
>> > is proposing a way for the tasks to be able to access the original topic
>> > partition (i.e. before transformations have been applied).
>> >
>>
>> Sounds good.
>>
>>
>> >
>> > > Of course, if the developer does not need separate methods, they can
>> > easily have the older `put` method simply delegate to the newer method.
>> >
>> > If the developer does not need separate methods (i.e. they don't need to
>> > use this new addition), they can simply continue implementing just the
>> > older `put` method right?
>> >
>>
>> Correct. We should update the JavaDoc of both methods to make this clear,
>> and in general how the two methods should are used and should be
>> implemented. That can be part of the PR, and the KIP doesn't need this
>> wording.
>>
>> >
>> > > Finally, this gives us a roadmap for *eventually* deprecating the
>> older
>> > method, once the Connect runtime versions without this change are old
>> > enough.
>> >
>> > I'm not sure we'd ever want to deprecate the older method. Most common
>> sink
>> > connector implementations do not do their own offset tracking with
>> > asynchronous processing and will probably never have a need for the
>> > additional parameter `Map<SinkRecord, TopicPartition>
>> > originalTopicPartitions` in the proposed new `put` method. These
>> connectors
>> > can continue implementing only the existing `SinkTask::put` method which
>> > will be called by the default implementation of the newer overloaded
>> `put`
>> > method.
>> >
>>
>> +1
>>
>>
>> >
>> > > the pre-commit methods use the same `Map<TopicPartition,
>> > OffsetAndMetadata> currentOffsets` data structure I'm suggesting be
>> used.
>> >
>> > The data structure you're suggesting be used is a `Map<SinkRecord,
>> > TopicPartition>` which will map `SinkRecord` objects to the original
>> topic
>> > partition of the corresponding `ConsumerRecord` right? To clarify, this
>> is
>> > a new data structure that will need to be managed in the
>> `WorkerSinkTask`.
>> >
>>
>> Ah, you're right. Thanks for the correction.
>>
>> Best regards,
>> Randall
>>
>>
>> > Thanks,
>> > Yash
>>
>>
>> > On Mon, Oct 3, 2022 at 1:20 AM Randall Hauch <rha...@gmail.com> wrote:
>> >
>> > > Hi, Yash.
>> > >
>> > > I'm not sure I quite understand why it would be "easier" for connector
>> > > > developers to account for implementing two different overloaded
>> `put`
>> > > > methods (assuming that they want to use this new feature) versus
>> using
>> > a
>> > > > try-catch block around `SinkRecord` access methods?
>> > >
>> > >
>> > > Using a try-catch to try around an API method that *might* be there
>> is a
>> > > very unusual thing for most developers. Unfortunately, we've had to
>> > resort
>> > > to this atypical approach with Connect in places when there was no
>> good
>> > > alternative. We seem to relying upon pattern because it's easier for
>> us,
>> > > not because it offers a better experience for Connector developers.
>> IMO,
>> > if
>> > > there's a practical alternative that uses normal development practices
>> > and
>> > > techniques, then we should use that alternative. IIUC, there is at
>> least
>> > > one practical alternative for this KIP that would not require
>> developers
>> > to
>> > > use the unusual try-catch to handle the case where methods are not
>> found.
>> > >
>> > > I also think having two `put` methods is easier when the Connector
>> has to
>> > > do different things for different Connect runtimes, too. One of those
>> > > methods is called by newer Connect runtimes with the new behavior, and
>> > the
>> > > other method is called by an older Connect runtime. Of course, if the
>> > > developer does not need separate methods, they can easily have the
>> older
>> > > `put` method simply delegate to the newer method.
>> > >
>> > > Finally, this gives us a roadmap for *eventually* deprecating the
>> older
>> > > method, once the Connect runtime versions without this change are old
>> > > enough.
>> > >
>> > > I think the advantage of going with the
>> > > > proposed approach in the KIP is that it wouldn't require extra
>> > > book-keeping
>> > > > (the Map<SinkRecord,
>> > > > TopicPartition> in `WorkerSinkTask` in your proposed approach)
>> > > >
>> > >
>> > > The connector does have to do some of this bookkeeping in how they
>> track
>> > > the topic partition offsets used in the `preCommit`, and the
>> pre-commit
>> > > methods use the same `Map<TopicPartition, OffsetAndMetadata>
>> > > currentOffsets`
>> > > data structure I'm suggesting be used.
>> > >
>> > > I hope that helps.
>> > >
>> > > Best regards,
>> > >
>> > > Randall
>> > >
>> > > On Mon, Sep 26, 2022 at 9:38 AM Yash Mayya <yash.ma...@gmail.com>
>> wrote:
>> > >
>> > > > Hi Randall,
>> > > >
>> > > > Thanks for reviewing the KIP!
>> > > >
>> > > > > That latter logic can get quite ugly.
>> > > >
>> > > > I'm not sure I quite understand why it would be "easier" for
>> connector
>> > > > developers to account for implementing two different overloaded
>> `put`
>> > > > methods (assuming that they want to use this new feature) versus
>> using
>> > a
>> > > > try-catch block around `SinkRecord` access methods? In both cases, a
>> > > > connector developer would need to write additional code in order to
>> > > ensure
>> > > > that their connector continues working with older Connect runtimes.
>> > > > Furthermore, we would probably need to carefully document how the
>> > > > implementation for the older `put` method should look like for
>> > connectors
>> > > > that want to use this new feature. I think the advantage of going
>> with
>> > > the
>> > > > proposed approach in the KIP is that it wouldn't require extra
>> > > book-keeping
>> > > > (the Map<SinkRecord,
>> > > > TopicPartition> in `WorkerSinkTask` in your proposed approach) and
>> also
>> > > the
>> > > > fact that the try-catch based logic is an already established
>> pattern
>> > > > through
>> > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
>> > > > and other KIPs which added methods to source/sink connector/task
>> > > contexts.
>> > > >
>> > > > Let me know if you still feel that having a new overloaded put
>> method
>> > is
>> > > a
>> > > > cleaner solution and I'd be happy to reconsider!
>> > > >
>> > > > Thanks,
>> > > > Yash
>> > > >
>> > > > On Thu, Sep 22, 2022 at 11:18 PM Randall Hauch <rha...@gmail.com>
>> > wrote:
>> > > >
>> > > > > Hi, Yash. Thanks for picking up this KIP and discussion.
>> > > > >
>> > > > > The KIP includes this rejected alternative:
>> > > > >
>> > > > > > 4. Update SinkTask.put in any way to pass the new information
>> > outside
>> > > > > > SinkRecord (e.g. a Map or a derived class)
>> > > > > >
>> > > > > >    -
>> > > > > >
>> > > > > >    Much more disruptive change without considerable pros
>> > > > > >
>> > > > > >
>> > > > > One advantage about doing this is that sink connector
>> implementations
>> > > can
>> > > > > more easily implement two different "put(...)" methods to handle
>> > > running
>> > > > in
>> > > > > a variety of runtimes, without having to use try-catch logic
>> around
>> > the
>> > > > > newer SinkRecord access methods. That latter logic can get quite
>> > ugly.
>> > > > >
>> > > > > For example, the existing `put` method has this signature:
>> > > > >
>> > > > > public abstract void put(Collection<SinkRecord> records);
>> > > > >
>> > > > > If we added an overloaded method that passed in a map of the old
>> > > > > topic+partition for each record (and defined the absence of an
>> entry
>> > as
>> > > > > having an unchanged topic and partition):
>> > > > >
>> > > > > public void put(Collection<SinkRecord> records, Map<SinkRecord,
>> > > > > TopicPartition> updatedTopicPartitions) {
>> > > > > put(records);
>> > > > > }
>> > > > >
>> > > > > then a `SinkTask` implementation that wants to use this new
>> feature
>> > > could
>> > > > > simply implement both methods:
>> > > > >
>> > > > > public void put(Collection<SinkRecord> records) {
>> > > > > // Running in an older runtime, so no tracking of SMT-modified
>> topic
>> > > > names
>> > > > > or partitions
>> > > > > put(records, Map.of());
>> > > > > }
>> > > > >
>> > > > > public void put(Collection<SinkRecord> records, Map<SinkRecord,
>> > > > > TopicPartition> updatedTopicPartitions) {
>> > > > > // real logic here
>> > > > > }
>> > > > >
>> > > > > This seems a lot easier than having to use try-catch logic, yet
>> still
>> > > > > allows sink connectors to utilize the new functionality and still
>> > work
>> > > > with
>> > > > > older Connect runtimes.
>> > > > >
>> > > > > WDYT?
>> > > > >
>> > > > > Randall
>> > > > >
>> > > > >
>> > > > > On Thu, Sep 8, 2022 at 7:03 AM Yash Mayya <yash.ma...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > I would like to (re)start a new discussion thread on KIP-793
>> (Kafka
>> > > > > > Connect) which proposes some additions to the public SinkRecord
>> > > > interface
>> > > > > > in order to support topic mutating SMTs for sink connectors
>> that do
>> > > > their
>> > > > > > own offset tracking.
>> > > > > >
>> > > > > > Links:
>> > > > > >
>> > > > > > KIP:
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
>> > > > > >
>> > > > > > Older discussion thread:
>> > > > > >
>> https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h,
>> > > > > >
>> https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
>> > > > > >
>> > > > > > Jira: https://issues.apache.org/jira/browse/KAFKA-13431
>> > > > > >
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Yash
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to