Hi all, It's been a few days, if there is no further comments or questions I'd like to call for a vote. There is an existing VOTE thread if you search for KIP-759.
Thank you, Shay On Wed, Jul 26, 2023 at 7:30 PM Shay Lin <lqxs...@gmail.com> wrote: > Very good catch, Matthias. I updated the KIP to state that the new > DSLOperation will return a new, mutated KStream. > > Thank you, > Shay > > On Wed, Jul 26, 2023 at 6:13 PM Matthias J. Sax <mj...@apache.org> wrote: > >> One last question. What should happen for the following case: >> >> KStream myStream = build.stream(...).map(...); >> myStream.markAsPartiitoned().groupByKey().aggregate(...); >> myStream.join(...) >> >> The question is about the "fan-out" pattern. `myStream`, which is marked >> for partitioning, is fed into two downstream operations. Thus, it's >> clear that the aggregation won't trigger a rebalance. However, the >> fan-out happens before `markAsRepartiitoned` and thus I would assume >> that the join would trigger a repartitioning? >> >> This question is important, because if we follow what I said above, >> `markAsRepartiitoned` returns a new KStream object, but does mutate the >> upstream KStream object, what is semantically two different things. It >> also has an impact on how we need to implement the feature. The KIP >> should explicitly explain this case. >> >> >> -Matthias >> >> On 7/26/23 4:58 PM, Shay Lin wrote: >> > Hi John, >> > >> > Thanks for your reply. I updated the KIP to reflect the changes we >> > discussed in the thread today. >> > #1 is duly noted, I learned from the examples Sophie sent earlier! =) >> > >> > In the new version, I also talked about why IQ and joins will not work >> with >> > the interface and talked about the mitigation. The proposal >> > now specifically states we are solving the unneeded partition problem >> when >> > IQ or join does not coexist in the kafka streams. In the concerns >> section, >> > the proposal talks about having a reverse mapping would make this new >> > interface compatible with IQ and join again but is subject to demand. >> > >> > Let me know what you think. Thanks! >> > Shay >> > >> > >> > >> > On Wed, Jul 26, 2023 at 2:35 PM John Roesler <vvcep...@apache.org> >> wrote: >> > >> >> Hello Shay, >> >> >> >> Thanks for the KIP! >> >> >> >> I just took a look in preparation to vote, and there are two small-ish >> >> things that I'd like to fix first. Apologies if this stuff has already >> come >> >> up in the discussion thread; I only skimmed it. >> >> >> >> 1. The KIP only mentions the name of the method instead of providing a >> >> code snippet showing exactly what the method signature will be in the >> >> interface. Normally, KIPs do the latter because it removes all >> ambiguity >> >> from the proposal. It also gives you an opportunity to write down the >> >> Javadoc you would add to the method instead of just mentioning the >> points >> >> that you plan to document. >> >> >> >> 2. The KIP lists some concerns, but not what you will do to mitigate >> them. >> >> For example, the concern about IQ not behaving correctly. Will you >> disable >> >> the use of the implicit partitioner downstream of one of these >> >> cancellations? Or provide a new interface to supply the "reverse >> mapping" >> >> you mentioned? Or include documentation in the Javadoc for how to deal >> with >> >> the situation? I think there are a range of options for each of those >> >> concerns, and we should state up front what we plan to do. >> >> >> >> Thanks again! >> >> -John >> >> >> >> On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote: >> >>> Thanks Shay! You and Matthias have convinced me, I'm happy with the >> >> current >> >>> proposal. I think once you make the minor >> >>> updates to the KIP document this will be ready for voting again. >> >>> >> >>> Cheers, >> >>> Sophie >> >>> >> >>> On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote: >> >>> >> >>>> Hi Sophie and Matthias, thanks for your comments and replies. >> >>>> >> >>>> 1. Scope of change: KStreams only or KStreams/KTable >> >>>> I took some time to digest your points, looking through how KStreams >> >>>> triggers repartitions today. I noticed that `repartitionRequired`is a >> >> flag >> >>>> in KStreamImpl etc and not in KTableImpl etc. When I look further, in >> >> the >> >>>> case of KTable, instead of passing in a boolean flag, a repartition >> >> node ` >> >>>> TableRepartitionMapNode` is directly created. I went back and >> >> referenced >> >>>> the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were >> >>>> focused on KStreams, i.e. not to change the partition why the input >> >> streams >> >>>> are already correctly keyed. Is it possible that in the case of >> KTable, >> >>>> users always intend to repartition (change key) when they call on >> >>>> aggregate? -- (this was written before I saw Matthias's comment) >> >>>> >> >>>> Overall, based on the tickets, I see the benefit of doing a contained >> >>>> change focusing on KStreams, i.e. repartitionRequired, which would >> >> solve >> >>>> the pain points nicely. If we ran into similar >> complaints/optimization >> >>>> requests for KTable down the line, we can address them on top of >> >> this(let >> >>>> me know if we have these requests already, I might just be >> negligent). >> >>>> >> >>>> 2. API: markAsPartitioned() vs config >> >>>> If we go with the KStreams only scope, markAsPartition() is more >> >>>> adequate, i.e. maps nicely to repartitionRequired. There is a list of >> >>>> NamedOperations that may or may not trigger repartition based on its >> >>>> context(KStreams or KTable) which would make the implementation more >> >>>> confusing. >> >>>> >> >>>> 3. KIP documentation: Thanks for providing the links to previous >> KIPs. >> >> I >> >>>> will be adding the three use cases and javadoc. I will also document >> >> the >> >>>> risks when it relates to IQ and Join. >> >>>> >> >>>> Best, >> >>>> Shay >> >>>> >> >>>> On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org> >> >> wrote: >> >>>> >> >>>>> I agree that it could easily be misused. There is a few Jira tickets >> >> for >> >>>>> cases when people want to "cancel" a repartition step. I would hope >> >>>>> those tickets are linked to the KIP (if not, we should do this, and >> >>>>> maybe even c&p those cases as motivation into the KIP itself)? >> >>>>> >> >>>>> It's always a tricky question to what extend we want to guide users, >> >> and >> >>>>> to what extend we need to give levers for advances case (and how to >> >>>>> design those levers...) It's for sure a good idea to call out "use >> >> with >> >>>>> case" in the JavaDocs for the new method. >> >>>>> >> >>>>> >> >>>>> -Matthias >> >>>>> >> >>>>> On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote: >> >>>>>> I guess I felt a bit uneasy about how this could be used/abused >> >> while >> >>>>>> reading the KIP, but if we truly believe this is an advanced >> >> feature, >> >>>> I'm >> >>>>>> fine with the way things currently are. It doesn't feel like the >> >> best >> >>>>> API, >> >>>>>> but it does seem to be the best *possible* API given the way things >> >>>> are. >> >>>>>> >> >>>>>> W.r.t the KTable notes, that all makes sense to me. I just wanted >> >> to >> >>>> lay >> >>>>>> out all the potential cases to make sure we had our bases covered. >> >>>>>> >> >>>>>> I still think an example or two would help, but the only thing I >> >> will >> >>>>>> actually wait on before feeling comfortable enough to vote on this >> >>>> would >> >>>>> be >> >>>>>> a clear method signature (and maybe sample javadocs) in the "Public >> >>>>>> Interfaces" section. >> >>>>>> >> >>>>>> Thanks again for the KIP Shay! Hope I haven't dragged it out too >> >> much >> >>>>>> >> >>>>>> On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org> >> >>>>> wrote: >> >>>>>> >> >>>>>>> Some thought about the API question. >> >>>>>>> >> >>>>>>> >> >>>>>>>>> A. kstream.groupBy(...).aggregate(...) >> >>>>>>> >> >>>>>>> This can be re-writtten as >> >>>>>>> >> >>>>>>> kstream.selectKey(...) >> >>>>>>> .markAsRepartitioned() >> >>>>>>> .groupByKey() >> >>>>>>> .aggregate() >> >>>>>>> >> >>>>>>> Given that `markAsRepartitoned` is an advanced feature, I think it >> >>>> would >> >>>>>>> be ok? >> >>>>>>> >> >>>>>>> >> >>>>>>>>> B. ktable.groupBy(...).aggregate(...) >> >>>>>>> >> >>>>>>> For KTable aggregation, not sure how useful it would be? In the >> >> end, >> >>>> an >> >>>>>>> table aggregation does only make sense if we pick something from >> >> the >> >>>>>>> value, ie, we indeed change the key? >> >>>>>>> >> >>>>>>> >> >>>>>>>>> C. kstream.selectKey(...).join(ktable) >> >>>>>>> >> >>>>>>> We can just insert a `markAsRepartitioned()` after `selectKey` to >> >>>> avoid >> >>>>>>> repartitioning of the left input KStream. >> >>>>>>> >> >>>>>>> >> >>>>>>>> KStream.selectKey(...).toTable().join(...) >> >>>>>>> >> >>>>>>> Not sure if I understand what you try to say with this example? >> >> In the >> >>>>>>> end, `selectKey(...).toTable()` would repartiton. If I know that >> >> one >> >>>> can >> >>>>>>> upsert directly, one inserts a `markAsRepartitioned()` in between. >> >>>>>>> >> >>>>>>> >> >>>>>>> In general, the use case seems to be that the key is not in the >> >> right >> >>>>>>> "format", or there is no key, but data was partitioned by a >> >>>>>>> value-attribute upstream and we just want to extract this >> >>>>>>> value-attribute into the key. Both seems to be KStream cases? >> >>>>>>> >> >>>>>>> >> >>>>>>> -Matthias >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote: >> >>>>>>>> Hey Shay, while I don't have any specific concerns about the new >> >>>> public >> >>>>>>> API >> >>>>>>>> in this KIP, I'd like to better understand how this feature will >> >> work >> >>>>>>>> before I vote. We should document the behavior of this new >> >> operator >> >>>>>>> clearly >> >>>>>>>> in the KIP as well -- you don't necessarily need to write the >> >>>> complete >> >>>>>>>> javadocs up front, but it should be possible for a user to read >> >> the >> >>>> KIP >> >>>>>>> and >> >>>>>>>> then understand how this feature will work and how they would >> >> need to >> >>>>>>> apply >> >>>>>>>> it. >> >>>>>>>> >> >>>>>>>> To that end, I recommend framing this proposal with a few >> >> examples to >> >>>>>>> help >> >>>>>>>> clarify the semantics. When and where can you apply the >> >>>>>>> markAsPartitioned() >> >>>>>>>> operator? Some suggestions below. >> >>>>>>>> >> >>>>>>>> Specific notes: >> >>>>>>>> >> >>>>>>>> 1. The KIP opens with "Each key changing operation in Kafka >> >> Streams >> >>>>>>>> (selectKey, map, transform, etc.) now leads to automatic >> >> repartition >> >>>>>>> before >> >>>>>>>> an aggregation." We should change "aggregation" to "stateful >> >>>> operation" >> >>>>>>> as >> >>>>>>>> this is true for things like joins as well as aggregations >> >>>>>>>> 2. The callout on IQ makes me a bit uncomfortable -- basically it >> >>>> says >> >>>>>>> this >> >>>>>>>> should not be a concern "if we use markAsPartitioned correctly". >> >> Does >> >>>>>>> this >> >>>>>>>> mean if we, the devs implementing this, write the feature >> >> correctly? >> >>>> Or >> >>>>>>> is >> >>>>>>>> it saying that this won't be a problem as long as "we", the >> >> users of >> >>>>> this >> >>>>>>>> feature, use it correctly"? Just wondering if you've put any >> >> thought >> >>>>> into >> >>>>>>>> how this would work yet (I personally have not) >> >>>>>>>> 3. The KIP should lay out the proposed API exactly, even if >> >> there's >> >>>>> only >> >>>>>>>> one new method. Check out this KIP >> >>>>>>>> < >> >>>>>>> >> >>>>> >> >>>> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >> >>>>>>>> >> >>>>>>>> (or this KIP >> >>>>>>>> < >> >>>>>>> >> >>>>> >> >>>> >> >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808 >> >>>>>>>> ) >> >>>>>>>> for a good reference on what the Public Interfaces section should >> >>>>> include >> >>>>>>>> 4. Regarding the proposed API itself, I wonder if KStream is >> >> really >> >>>> the >> >>>>>>>> most appropriate interface for the new operator. A repartition >> >> can be >> >>>>>>>> triggered on just a KTable. Here's where some examples would >> >> help. >> >>>>>>> Perhaps >> >>>>>>>> we could focus on these three cases: >> >>>>>>>> >> >>>>>>>> A. kstream.groupBy(...).aggregate(...) >> >>>>>>>> B. ktable.groupBy(...).aggregate(...) >> >>>>>>>> C. kstream.selectKey(...).join(ktable) >> >>>>>>>> >> >>>>>>>> I'm sure someone will correct me if I'm missing any additional >> >> vital >> >>>>>>>> examples, but at the very least, these are the three to consider: >> >>>>> either >> >>>>>>> a >> >>>>>>>> KStream or KTable followed by a groupBy/aggregation, or a KStream >> >>>> with >> >>>>>>>> key-changing operator followed by a join. Note that you could >> >> have >> >>>>>>>> something like KStream.selectKey(...).toTable().join(...) as >> >> well, >> >>>> but >> >>>>>>>> since there are no pure key-changing operators (like #selectKey) >> >> on >> >>>>>>>> KTables, only groupBy() which must always be followed by >> >> aggregation, >> >>>>>>> this >> >>>>>>>> 4th case can be reduced to an example like C of a KStream with >> >>>>>>> key-changing >> >>>>>>>> operation and downstream join -- ie there's no way to do this >> >> without >> >>>>>>>> #toTable which is more like syntactic sugar for the purposes of >> >> this >> >>>>>>>> repartitioning discussion. >> >>>>>>>> >> >>>>>>>> I worry that making this a DSL operator on KStream is too >> >> generic, >> >>>> and >> >>>>> we >> >>>>>>>> would also need to add it to KTable for example B, despite >> >> KTables >> >>>> not >> >>>>>>>> having any true pure key-changing operators outside of #groupBy. >> >>>> Would >> >>>>> we >> >>>>>>>> throw an exception if you invoked #markAsPartitioned on a KTable >> >> that >> >>>>>>>> wasn't followed by a groupBy? If you have multiple key-changing >> >>>>>>> operators, >> >>>>>>>> would you need to add markAsPartitioned after each one? If not, >> >> what >> >>>>> are >> >>>>>>>> the semantics of that? These are the main questions that got me >> >>>>> thinking >> >>>>>>>> here, and will definitely need to be clarified in the KIP if we >> >> do go >> >>>>>>> with >> >>>>>>>> the current proposal. But I wanted to throw out another idea for >> >> an >> >>>>> API I >> >>>>>>>> think would help with some of this awkwardness by having clearly >> >>>>> defined >> >>>>>>>> semantics: >> >>>>>>>> >> >>>>>>>> Fundamentally it seems to me that these issues are arising from >> >> that >> >>>>>>> "being >> >>>>>>>> partitioned" is conceptually a property of other operations >> >> applied >> >>>> to >> >>>>> a >> >>>>>>>> KStream/KTable, rather than an operation itself. So rather than >> >>>> making >> >>>>>>> this >> >>>>>>>> a DSL operator itself, what if we added it to the Grouped and >> >> various >> >>>>>>>> Joined configuration classes? It would allow us to more >> >> carefully hit >> >>>>>>> only >> >>>>>>>> the relevant parts of the DSL, so there are no questions about >> >>>>>>> whether/when >> >>>>>>>> to throw errors when the operator is incorrectly applied -- there >> >>>> would >> >>>>>>> be >> >>>>>>>> no way to apply it incorrectly. The main drawback I can think of >> >> is >> >>>>>>> simply >> >>>>>>>> that this touches on a larger surface area of the API. I >> >> personally >> >>>>> don't >> >>>>>>>> believe this is a good enough reason to make it a DSL operator >> >> as one >> >>>>>>> could >> >>>>>>>> make that argument for nearly any kind of KStream or KTable >> >> operator >> >>>>>>>> configuration going forward, and would explode the >> >> KStream/KTable API >> >>>>>>>> surface area instead. Perhaps this was discussed during the >> >> previous >> >>>>>>>> iteration of this KIP, or I'm missing something here, so I just >> >>>> wanted >> >>>>> to >> >>>>>>>> put this out there and see what people think >> >>>>>>>> >> >>>>>>>> Either way, thanks for picking up this KIP. It's been a long time >> >>>>> coming >> >>>>>>> :) >> >>>>>>>> >> >>>>>>>> -Sophie >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com> >> >> wrote: >> >>>>>>>> >> >>>>>>>>> Hi all, >> >>>>>>>>> >> >>>>>>>>> It's been a few days so I went ahead with editing the KIP, the >> >> main >> >>>>>>> change >> >>>>>>>>> is on the method name >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling >> >>>>>>>>> . >> >>>>>>>>> I will follow up with a VOTE separately. >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> Shay >> >>>>>>>>> >> >>>>>>>>> On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax < >> >> mj...@apache.org> >> >>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Shay, >> >>>>>>>>>> >> >>>>>>>>>> thanks for picking up this KIP. It's a pity that the discussion >> >>>>> stalled >> >>>>>>>>>> for such a long time. >> >>>>>>>>>> >> >>>>>>>>>> As expressed previously, I am happy with the name >> >>>>> `markAsPartitioned()` >> >>>>>>>>>> and also believe it's ok to just document the impact and leave >> >> it >> >>>> to >> >>>>>>> the >> >>>>>>>>>> user to do the right thing. >> >>>>>>>>>> >> >>>>>>>>>> If we really get a lot of users that ask about it, because >> >> they did >> >>>>> not >> >>>>>>>>>> do the right thing, we could still add something (eg, a >> >>>>> reverse-mapper >> >>>>>>>>>> function) in a follow-up KIP. But we don't know if it's >> >> necessary; >> >>>>>>> thus, >> >>>>>>>>>> making a small incremental step sounds like a good approach to >> >> me. >> >>>>>>>>>> >> >>>>>>>>>> Let's see if others agree or not. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> -Matthias >> >>>>>>>>>> >> >>>>>>>>>> On 6/28/23 5:29 PM, Shay Lin wrote: >> >>>>>>>>>>> Hi all, >> >>>>>>>>>>> >> >>>>>>>>>>> Great discussion thread. May I take this KIP up? If it’s >> >> alright >> >>>> my >> >>>>>>>>> plan >> >>>>>>>>>> is >> >>>>>>>>>>> to update the KIP with the operator `markAsPartitioned()`. >> >>>>>>>>>>> >> >>>>>>>>>>> As you have discussed and pointed out, there are implications >> >> to >> >>>>>>>>>> downstream >> >>>>>>>>>>> joins or aggregation operations. Still, the operator is >> >> intended >> >>>> for >> >>>>>>>>>>> advanced users so my two cents is it would be a valuable >> >> addition >> >>>>>>>>>>> nonetheless. We could add this as a caution/consideration as >> >> part >> >>>> of >> >>>>>>>>> the >> >>>>>>>>>>> java doc. >> >>>>>>>>>>> >> >>>>>>>>>>> Let me know, thanks. >> >>>>>>>>>>> Shay >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> >