Hi all,

It's been a few days, if there is no further comments or questions I'd like
to call for a vote. There is an existing VOTE thread if you search for
KIP-759.

Thank you,
Shay

On Wed, Jul 26, 2023 at 7:30 PM Shay Lin <lqxs...@gmail.com> wrote:

> Very good catch, Matthias. I updated the KIP to state that the new
> DSLOperation will return a new, mutated KStream.
>
> Thank you,
> Shay
>
> On Wed, Jul 26, 2023 at 6:13 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> One last question. What should happen for the following case:
>>
>> KStream myStream = build.stream(...).map(...);
>> myStream.markAsPartiitoned().groupByKey().aggregate(...);
>> myStream.join(...)
>>
>> The question is about the "fan-out" pattern. `myStream`, which is marked
>> for partitioning, is fed into two downstream operations. Thus, it's
>> clear that the aggregation won't trigger a rebalance. However, the
>> fan-out happens before `markAsRepartiitoned` and thus I would assume
>> that the join would trigger a repartitioning?
>>
>> This question is important, because if we follow what I said above,
>> `markAsRepartiitoned` returns a new KStream object, but does mutate the
>> upstream KStream object, what is semantically two different things. It
>> also has an impact on how we need to implement the feature. The KIP
>> should explicitly explain this case.
>>
>>
>> -Matthias
>>
>> On 7/26/23 4:58 PM, Shay Lin wrote:
>> > Hi John,
>> >
>> > Thanks for your reply. I updated the KIP to reflect the changes we
>> > discussed in the thread today.
>> > #1 is duly noted, I learned from the examples Sophie sent earlier! =)
>> >
>> > In the new version, I also talked about why IQ and joins will not work
>> with
>> > the interface and talked about the mitigation. The proposal
>> > now specifically states we are solving the unneeded partition problem
>> when
>> > IQ or join does not coexist in the kafka streams. In the concerns
>> section,
>> > the proposal talks about having a reverse mapping would make this new
>> > interface compatible with IQ and join again but is subject to demand.
>> >
>> > Let me know what you think. Thanks!
>> > Shay
>> >
>> >
>> >
>> > On Wed, Jul 26, 2023 at 2:35 PM John Roesler <vvcep...@apache.org>
>> wrote:
>> >
>> >> Hello Shay,
>> >>
>> >> Thanks for the KIP!
>> >>
>> >> I just took a look in preparation to vote, and there are two small-ish
>> >> things that I'd like to fix first. Apologies if this stuff has already
>> come
>> >> up in the discussion thread; I only skimmed it.
>> >>
>> >> 1. The KIP only mentions the name of the method instead of providing a
>> >> code snippet showing exactly what the method signature will be in the
>> >> interface. Normally, KIPs do the latter because it removes all
>> ambiguity
>> >> from the proposal. It also gives you an opportunity to write down the
>> >> Javadoc you would add to the method instead of just mentioning the
>> points
>> >> that you plan to document.
>> >>
>> >> 2. The KIP lists some concerns, but not what you will do to mitigate
>> them.
>> >> For example, the concern about IQ not behaving correctly. Will you
>> disable
>> >> the use of the implicit partitioner downstream of one of these
>> >> cancellations? Or provide a new interface to supply the "reverse
>> mapping"
>> >> you mentioned? Or include documentation in the Javadoc for how to deal
>> with
>> >> the situation? I think there are a range of options for each of those
>> >> concerns, and we should state up front what we plan to do.
>> >>
>> >> Thanks again!
>> >> -John
>> >>
>> >> On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:
>> >>> Thanks Shay! You and Matthias have convinced me, I'm happy with the
>> >> current
>> >>> proposal. I think once you make the minor
>> >>> updates to the KIP document this will be ready for voting again.
>> >>>
>> >>> Cheers,
>> >>> Sophie
>> >>>
>> >>> On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote:
>> >>>
>> >>>> Hi Sophie and Matthias, thanks for your comments and replies.
>> >>>>
>> >>>> 1. Scope of change: KStreams only or KStreams/KTable
>> >>>> I took some time to digest your points, looking through how KStreams
>> >>>> triggers repartitions today. I noticed that `repartitionRequired`is a
>> >> flag
>> >>>> in KStreamImpl etc and not in KTableImpl etc. When I look further, in
>> >> the
>> >>>> case of KTable, instead of passing in a boolean flag, a repartition
>> >> node `
>> >>>> TableRepartitionMapNode` is directly created. I went back and
>> >> referenced
>> >>>> the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
>> >>>> focused on KStreams, i.e. not to change the partition why the input
>> >> streams
>> >>>> are already correctly keyed. Is it possible that in the case of
>> KTable,
>> >>>> users always intend to repartition (change key) when they call on
>> >>>> aggregate? -- (this was written before I saw Matthias's comment)
>> >>>>
>> >>>> Overall, based on the tickets, I see the benefit of doing a contained
>> >>>> change focusing on KStreams, i.e. repartitionRequired, which would
>> >> solve
>> >>>> the pain points nicely. If we ran into similar
>> complaints/optimization
>> >>>> requests for KTable down the line, we can address them on top of
>> >> this(let
>> >>>> me know if we have these requests already, I might just be
>> negligent).
>> >>>>
>> >>>> 2. API: markAsPartitioned() vs config
>> >>>> If we go with the KStreams only scope, markAsPartition() is more
>> >>>> adequate, i.e. maps nicely to repartitionRequired. There is a list of
>> >>>> NamedOperations that may or may not trigger repartition based on its
>> >>>> context(KStreams or KTable) which would make the implementation more
>> >>>> confusing.
>> >>>>
>> >>>> 3. KIP documentation: Thanks for providing the links to previous
>> KIPs.
>> >> I
>> >>>> will be adding the three use cases and javadoc. I will also document
>> >> the
>> >>>> risks when it relates to IQ and Join.
>> >>>>
>> >>>> Best,
>> >>>> Shay
>> >>>>
>> >>>> On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org>
>> >> wrote:
>> >>>>
>> >>>>> I agree that it could easily be misused. There is a few Jira tickets
>> >> for
>> >>>>> cases when people want to "cancel" a repartition step. I would hope
>> >>>>> those tickets are linked to the KIP (if not, we should do this, and
>> >>>>> maybe even c&p those cases as motivation into the KIP itself)?
>> >>>>>
>> >>>>> It's always a tricky question to what extend we want to guide users,
>> >> and
>> >>>>> to what extend we need to give levers for advances case (and how to
>> >>>>> design those levers...) It's for sure a good idea to call out "use
>> >> with
>> >>>>> case" in the JavaDocs for the new method.
>> >>>>>
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>> On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
>> >>>>>> I guess I felt a bit uneasy about how this could be used/abused
>> >> while
>> >>>>>> reading the KIP, but if we truly believe this is an advanced
>> >> feature,
>> >>>> I'm
>> >>>>>> fine with the way things currently are. It doesn't feel like the
>> >> best
>> >>>>> API,
>> >>>>>> but it does seem to be the best *possible* API given the way things
>> >>>> are.
>> >>>>>>
>> >>>>>> W.r.t the KTable notes, that all makes sense to me. I just wanted
>> >> to
>> >>>> lay
>> >>>>>> out all the potential cases to make sure we had our bases covered.
>> >>>>>>
>> >>>>>> I still think an example or two would help, but the only thing I
>> >> will
>> >>>>>> actually wait on before feeling comfortable enough to vote on this
>> >>>> would
>> >>>>> be
>> >>>>>> a clear method signature (and maybe sample javadocs) in the "Public
>> >>>>>> Interfaces" section.
>> >>>>>>
>> >>>>>> Thanks again for the KIP Shay! Hope I haven't dragged it out too
>> >> much
>> >>>>>>
>> >>>>>> On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>>> Some thought about the API question.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>>> A. kstream.groupBy(...).aggregate(...)
>> >>>>>>>
>> >>>>>>> This can be re-writtten as
>> >>>>>>>
>> >>>>>>> kstream.selectKey(...)
>> >>>>>>>           .markAsRepartitioned()
>> >>>>>>>           .groupByKey()
>> >>>>>>>           .aggregate()
>> >>>>>>>
>> >>>>>>> Given that `markAsRepartitoned` is an advanced feature, I think it
>> >>>> would
>> >>>>>>> be ok?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>>> B. ktable.groupBy(...).aggregate(...)
>> >>>>>>>
>> >>>>>>> For KTable aggregation, not sure how useful it would be? In the
>> >> end,
>> >>>> an
>> >>>>>>> table aggregation does only make sense if we pick something from
>> >> the
>> >>>>>>> value, ie, we indeed change the key?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>>> C. kstream.selectKey(...).join(ktable)
>> >>>>>>>
>> >>>>>>> We can just insert a `markAsRepartitioned()` after `selectKey` to
>> >>>> avoid
>> >>>>>>> repartitioning of the left input KStream.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>> KStream.selectKey(...).toTable().join(...)
>> >>>>>>>
>> >>>>>>> Not sure if I understand what you try to say with this example?
>> >> In the
>> >>>>>>> end, `selectKey(...).toTable()` would repartiton. If I know that
>> >> one
>> >>>> can
>> >>>>>>> upsert directly, one inserts a `markAsRepartitioned()` in between.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> In general, the use case seems to be that the key is not in the
>> >> right
>> >>>>>>> "format", or there is no key, but data was partitioned by a
>> >>>>>>> value-attribute upstream and we just want to extract this
>> >>>>>>> value-attribute into the key. Both seems to be KStream cases?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> -Matthias
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
>> >>>>>>>> Hey Shay, while I don't have any specific concerns about the new
>> >>>> public
>> >>>>>>> API
>> >>>>>>>> in this KIP, I'd like to better understand how this feature will
>> >> work
>> >>>>>>>> before I vote. We should document the behavior of this new
>> >> operator
>> >>>>>>> clearly
>> >>>>>>>> in the KIP as well -- you don't necessarily need to write the
>> >>>> complete
>> >>>>>>>> javadocs up front, but it should be possible for a user to read
>> >> the
>> >>>> KIP
>> >>>>>>> and
>> >>>>>>>> then understand how this feature will work and how they would
>> >> need to
>> >>>>>>> apply
>> >>>>>>>> it.
>> >>>>>>>>
>> >>>>>>>> To that end, I recommend framing this proposal with a few
>> >> examples to
>> >>>>>>> help
>> >>>>>>>> clarify the semantics. When and where can you apply the
>> >>>>>>> markAsPartitioned()
>> >>>>>>>> operator? Some suggestions below.
>> >>>>>>>>
>> >>>>>>>> Specific notes:
>> >>>>>>>>
>> >>>>>>>> 1. The KIP opens with "Each key changing operation in Kafka
>> >> Streams
>> >>>>>>>> (selectKey, map, transform, etc.) now leads to automatic
>> >> repartition
>> >>>>>>> before
>> >>>>>>>> an aggregation." We should change "aggregation" to "stateful
>> >>>> operation"
>> >>>>>>> as
>> >>>>>>>> this is true for things like joins as well as aggregations
>> >>>>>>>> 2. The callout on IQ makes me a bit uncomfortable -- basically it
>> >>>> says
>> >>>>>>> this
>> >>>>>>>> should not be a concern "if we use markAsPartitioned correctly".
>> >> Does
>> >>>>>>> this
>> >>>>>>>> mean if we, the devs implementing this, write the feature
>> >> correctly?
>> >>>> Or
>> >>>>>>> is
>> >>>>>>>> it saying that this won't be a problem as long as "we", the
>> >> users of
>> >>>>> this
>> >>>>>>>> feature, use it correctly"? Just wondering if you've put any
>> >> thought
>> >>>>> into
>> >>>>>>>> how this would work yet (I personally have not)
>> >>>>>>>> 3. The KIP should lay out the proposed API exactly, even if
>> >> there's
>> >>>>> only
>> >>>>>>>> one new method. Check out this KIP
>> >>>>>>>> <
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>> >>>>>>>>
>> >>>>>>>> (or this KIP
>> >>>>>>>> <
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808
>> >>>>>>>> )
>> >>>>>>>> for a good reference on what the Public Interfaces section should
>> >>>>> include
>> >>>>>>>> 4. Regarding the proposed API itself, I wonder if KStream is
>> >> really
>> >>>> the
>> >>>>>>>> most appropriate interface for the new operator. A repartition
>> >> can be
>> >>>>>>>> triggered on just a KTable. Here's where some examples would
>> >> help.
>> >>>>>>> Perhaps
>> >>>>>>>> we could focus on these three cases:
>> >>>>>>>>
>> >>>>>>>> A. kstream.groupBy(...).aggregate(...)
>> >>>>>>>> B. ktable.groupBy(...).aggregate(...)
>> >>>>>>>> C. kstream.selectKey(...).join(ktable)
>> >>>>>>>>
>> >>>>>>>> I'm sure someone will correct me if I'm missing any additional
>> >> vital
>> >>>>>>>> examples, but at the very least, these are the three to consider:
>> >>>>> either
>> >>>>>>> a
>> >>>>>>>> KStream or KTable followed by a groupBy/aggregation, or a KStream
>> >>>> with
>> >>>>>>>> key-changing operator followed by a join. Note that you could
>> >> have
>> >>>>>>>> something like KStream.selectKey(...).toTable().join(...) as
>> >> well,
>> >>>> but
>> >>>>>>>> since there are no pure key-changing operators (like #selectKey)
>> >> on
>> >>>>>>>> KTables, only groupBy() which must always be followed by
>> >> aggregation,
>> >>>>>>> this
>> >>>>>>>> 4th case can be reduced to an example like C of a KStream with
>> >>>>>>> key-changing
>> >>>>>>>> operation and downstream join -- ie there's no way to do this
>> >> without
>> >>>>>>>> #toTable which is more like syntactic sugar for the purposes of
>> >> this
>> >>>>>>>> repartitioning discussion.
>> >>>>>>>>
>> >>>>>>>> I worry that making this a DSL operator on KStream is too
>> >> generic,
>> >>>> and
>> >>>>> we
>> >>>>>>>> would also need to add it to KTable for example B, despite
>> >> KTables
>> >>>> not
>> >>>>>>>> having any true pure key-changing operators outside of #groupBy.
>> >>>> Would
>> >>>>> we
>> >>>>>>>> throw an exception if you invoked #markAsPartitioned on a KTable
>> >> that
>> >>>>>>>> wasn't followed by a groupBy? If you have multiple key-changing
>> >>>>>>> operators,
>> >>>>>>>> would you need to add markAsPartitioned after each one? If not,
>> >> what
>> >>>>> are
>> >>>>>>>> the semantics of that?  These are the main questions that got me
>> >>>>> thinking
>> >>>>>>>> here, and will definitely need to be clarified in the KIP if we
>> >> do go
>> >>>>>>> with
>> >>>>>>>> the current proposal. But I wanted to throw out another idea for
>> >> an
>> >>>>> API I
>> >>>>>>>> think would help with some of this awkwardness by having clearly
>> >>>>> defined
>> >>>>>>>> semantics:
>> >>>>>>>>
>> >>>>>>>> Fundamentally it seems to me that these issues are arising from
>> >> that
>> >>>>>>> "being
>> >>>>>>>> partitioned" is conceptually a property of other operations
>> >> applied
>> >>>> to
>> >>>>> a
>> >>>>>>>> KStream/KTable, rather than an operation itself. So rather than
>> >>>> making
>> >>>>>>> this
>> >>>>>>>> a DSL operator itself, what if we added it to the Grouped and
>> >> various
>> >>>>>>>> Joined configuration classes? It would allow us to more
>> >> carefully hit
>> >>>>>>> only
>> >>>>>>>> the relevant parts of the DSL, so there are no questions about
>> >>>>>>> whether/when
>> >>>>>>>> to throw errors when the operator is incorrectly applied -- there
>> >>>> would
>> >>>>>>> be
>> >>>>>>>> no way to apply it incorrectly. The main drawback I can think of
>> >> is
>> >>>>>>> simply
>> >>>>>>>> that this touches on a larger surface area of the API. I
>> >> personally
>> >>>>> don't
>> >>>>>>>> believe this is a good enough reason to make it a DSL operator
>> >> as one
>> >>>>>>> could
>> >>>>>>>> make that argument for nearly any kind of KStream or KTable
>> >> operator
>> >>>>>>>> configuration going forward, and would explode the
>> >> KStream/KTable API
>> >>>>>>>> surface area instead. Perhaps this was discussed during the
>> >> previous
>> >>>>>>>> iteration of this KIP, or I'm missing something here, so I just
>> >>>> wanted
>> >>>>> to
>> >>>>>>>> put this out there and see what people think
>> >>>>>>>>
>> >>>>>>>> Either way, thanks for picking up this KIP. It's been a long time
>> >>>>> coming
>> >>>>>>> :)
>> >>>>>>>>
>> >>>>>>>> -Sophie
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com>
>> >> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi all,
>> >>>>>>>>>
>> >>>>>>>>> It's been a few days so I went ahead with editing the KIP, the
>> >> main
>> >>>>>>> change
>> >>>>>>>>> is on the method name
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
>> >>>>>>>>> .
>> >>>>>>>>> I will follow up with a VOTE separately.
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Shay
>> >>>>>>>>>
>> >>>>>>>>> On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax <
>> >> mj...@apache.org>
>> >>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Shay,
>> >>>>>>>>>>
>> >>>>>>>>>> thanks for picking up this KIP. It's a pity that the discussion
>> >>>>> stalled
>> >>>>>>>>>> for such a long time.
>> >>>>>>>>>>
>> >>>>>>>>>> As expressed previously, I am happy with the name
>> >>>>> `markAsPartitioned()`
>> >>>>>>>>>> and also believe it's ok to just document the impact and leave
>> >> it
>> >>>> to
>> >>>>>>> the
>> >>>>>>>>>> user to do the right thing.
>> >>>>>>>>>>
>> >>>>>>>>>> If we really get a lot of users that ask about it, because
>> >> they did
>> >>>>> not
>> >>>>>>>>>> do the right thing, we could still add something (eg, a
>> >>>>> reverse-mapper
>> >>>>>>>>>> function) in a follow-up KIP. But we don't know if it's
>> >> necessary;
>> >>>>>>> thus,
>> >>>>>>>>>> making a small incremental step sounds like a good approach to
>> >> me.
>> >>>>>>>>>>
>> >>>>>>>>>> Let's see if others agree or not.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> -Matthias
>> >>>>>>>>>>
>> >>>>>>>>>> On 6/28/23 5:29 PM, Shay Lin wrote:
>> >>>>>>>>>>> Hi all,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Great discussion thread. May I take this KIP up? If it’s
>> >> alright
>> >>>> my
>> >>>>>>>>> plan
>> >>>>>>>>>> is
>> >>>>>>>>>>> to update the KIP with the operator `markAsPartitioned()`.
>> >>>>>>>>>>>
>> >>>>>>>>>>> As you have discussed and pointed out, there are implications
>> >> to
>> >>>>>>>>>> downstream
>> >>>>>>>>>>> joins or aggregation operations. Still, the operator is
>> >> intended
>> >>>> for
>> >>>>>>>>>>> advanced users so my two cents is it would be a valuable
>> >> addition
>> >>>>>>>>>>> nonetheless. We could add this as a caution/consideration as
>> >> part
>> >>>> of
>> >>>>>>>>> the
>> >>>>>>>>>>> java doc.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Let me know, thanks.
>> >>>>>>>>>>> Shay
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to