I'm less enthusiastic about inlining the branch logic with its downstream functionality. Programs that have deep branch trees will quickly become harder to read as a single unit.
On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com> wrote: > Also +1 on the issues/goals as Michael outlined them, I think that sets a > great framework for the discussion. > > Regarding the SortedMap solution, my understanding is that the current > proposal in the KIP is what is in my PR which (pending naming decisions) is > roughly this: > > stream.split() > .branch(Predicate<K, V>, Consumer<KStream<K, V>>) > .branch(Predicate<K, V>, Consumer<KStream<K, V>>) > .defaultBranch(Consumer<KStream<K, V>>); > > Obviously some ordering is necessary, since branching as a construct > doesn't work without it, but this solution seems like it provides as much > associativity as the SortedMap solution, because each branch() call > directly associates the "conditional" with the "code block." The value it > provides over the KIP solution is the accessing of streams in the same > scope. > > The KIP solution is less "dynamic" than the SortedMap solution in the sense > that it is slightly clumsier to add a dynamic number of branches, but it is > certainly possible. It seems to me like the API should favor the "static" > case anyway, and should make it simple and readable to fluently declare and > access your branches in-line. It also makes it impossible to ignore a > branch, and it is possible to build an (almost) identical SortedMap > solution on top of it. > > I could also see a middle ground where instead of a raw SortedMap being > taken in, branch() takes a name and not a Consumer. Something like this: > > Map<String, KStream<K, V>> branches = stream.split() > .branch("branchOne", Predicate<K, V>) > .branch( "branchTwo", Predicate<K, V>) > .defaultBranch("defaultBranch", Consumer<KStream<K, V>>); > > Pros for that solution: > - accessing branched KStreams in same scope > - no double brace initialization, hopefully slightly more readable than > SortedMap > > Cons > - downstream branch logic cannot be specified inline which makes it harder > to read top to bottom (like existing API and SortedMap, but unlike the KIP) > - you can forget to "handle" one of the branched streams (like existing > API and SortedMap, but unlike the KIP) > > (KBranchedStreams could even work *both* ways but perhaps that's overdoing > it). > > Overall I'm curious how important it is to be able to easily access the > branched KStream in the same scope as the original. It's possible that it > doesn't need to be handled directly by the API, but instead left up to the > user. I'm sort of in the middle on it. > > Paul > > > > On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > > > I'd like to +1 what Michael said about the issues with the existing > branch > > method, I agree with what he's outlined and I think we should proceed by > > trying to alleviate these problems. Specifically it seems important to be > > able to cleanly access the individual branches (eg by mapping > > name->stream), which I thought was the original intention of this KIP. > > > > That said, I don't think we should so easily give in to the double brace > > anti-pattern or force ours users into it if at all possible to > avoid...just > > my two cents. > > > > Cheers, > > Sophie > > > > On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < > > michael.droga...@confluent.io> wrote: > > > > > I’d like to propose a different way of thinking about this. To me, > there > > > are three problems with the existing branch signature: > > > > > > 1. If you use it the way most people do, Java raises unsafe type > > warnings. > > > 2. The way in which you use the stream branches is positionally coupled > > to > > > the ordering of the conditionals. > > > 3. It is brittle to extend existing branch calls with additional code > > > paths. > > > > > > Using associative constructs instead of relying on ordered constructs > > would > > > be a stronger approach. Consider a signature that instead looks like > > this: > > > > > > Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<? > > > super K,? super V>>); > > > > > > Branches are given names in a map, and as a result, the API returns a > > > mapping of names to streams. The ordering of the conditionals is > > maintained > > > because it’s a sorted map. Insert order determines the order of > > evaluation. > > > > > > This solves problem 1 because there are no more varargs. It solves > > problem > > > 2 because you no longer lean on ordering to access the branch you’re > > > interested in. It solves problem 3 because you can introduce another > > > conditional by simply attaching another name to the structure, rather > > than > > > messing with the existing indices. > > > > > > One of the drawbacks is that creating the map inline is historically > > > awkward in Java. I know it’s an anti-pattern to use voluminously, but > > > double brace initialization would clean up the aesthetics. > > > > > > On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io> > wrote: > > > > > > > Hi Ivan, > > > > > > > > Thanks for the update. > > > > > > > > FWIW, I agree with Matthias that the current "start branching" > operator > > > is > > > > confusing when named the same way as the actual branches. "Split" > seems > > > > like a good name. Alternatively, we can do without a "start > branching" > > > > operator at all, and just do: > > > > > > > > stream > > > > .branch(Predicate) > > > > .branch(Predicate) > > > > .defaultBranch(); > > > > > > > > Tentatively, I think that this branching operation should be > terminal. > > > That > > > > way, we don't create ambiguity about how to use it. That is, `branch` > > > > should return `KBranchedStream`, while `defaultBranch` is `void`, to > > > > enforce that it comes last, and that there is only one definition of > > the > > > > default branch. Potentially, we should log a warning if there's no > > > default, > > > > and additionally log a warning (or throw an exception) if a record > > falls > > > > though with no default. > > > > > > > > Thoughts? > > > > > > > > Thanks, > > > > -John > > > > > > > > On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > > > Thanks for updating the KIP and your answers. > > > > > > > > > > > > > > > > this is to make the name similar to String#split > > > > > >> that also returns an array, right? > > > > > > > > > > The intend was to avoid name duplication. The return type should > > _not_ > > > > > be an array. > > > > > > > > > > The current proposal is > > > > > > > > > > stream.branch() > > > > > .branch(Predicate) > > > > > .branch(Predicate) > > > > > .defaultBranch(); > > > > > > > > > > IMHO, this reads a little odd, because the first `branch()` does > not > > > > > take any parameters and has different semantics than the later > > > > > `branch()` calls. Note, that from the code snippet above, it's > hidden > > > > > that the first call is `KStream#branch()` while the others are > > > > > `KBranchedStream#branch()` what makes reading the code harder. > > > > > > > > > > Because I suggested to rename `addBranch()` -> `branch()`, I though > > it > > > > > might be better to also rename `KStream#branch()` to avoid the > naming > > > > > overlap that seems to be confusing. The following reads much > cleaner > > to > > > > me: > > > > > > > > > > stream.split() > > > > > .branch(Predicate) > > > > > .branch(Predicate) > > > > > .defaultBranch(); > > > > > > > > > > Maybe there is a better alternative to `split()` though to avoid > the > > > > > naming overlap. > > > > > > > > > > > > > > > > 'default' is, however, a reserved word, so unfortunately we > cannot > > > have > > > > > a method with such name :-) > > > > > > > > > > Bummer. Didn't consider this. Maybe we can still come up with a > short > > > > name? > > > > > > > > > > > > > > > Can you add the interface `KBranchedStream` to the KIP with all > it's > > > > > methods? It will be part of public API and should be contained in > the > > > > > KIP. For example, it's unclear atm, what the return type of > > > > > `defaultBranch()` is. > > > > > > > > > > > > > > > You did not comment on the idea to add a `KBranchedStream#get(int > > > index) > > > > > -> KStream` method to get the individually branched-KStreams. Would > > be > > > > > nice to get your feedback about it. It seems you suggest that users > > > > > would need to write custom utility code otherwise, to access them. > We > > > > > should discuss the pros and cons of both approaches. It feels > > > > > "incomplete" to me atm, if the API has no built-in support to get > the > > > > > branched-KStreams directly. > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > On 4/13/19 2:13 AM, Ivan Ponomarev wrote: > > > > > > Hi all! > > > > > > > > > > > > I have updated the KIP-418 according to the new vision. > > > > > > > > > > > > Matthias, thanks for your comment! > > > > > > > > > > > >> Renaming KStream#branch() -> #split() > > > > > > > > > > > > I can see your point: this is to make the name similar to > > > String#split > > > > > > that also returns an array, right? But is it worth the loss of > > > > backwards > > > > > > compatibility? We can have overloaded branch() as well without > > > > affecting > > > > > > the existing code. Maybe the old array-based `branch` method > should > > > be > > > > > > deprecated, but this is a subject for discussion. > > > > > > > > > > > >> Renaming KBranchedStream#addBranch() -> > BranchingKStream#branch(), > > > > > > KBranchedStream#defaultBranch() -> BranchingKStream#default() > > > > > > > > > > > > Totally agree with 'addBranch->branch' rename. 'default' is, > > > however, a > > > > > > reserved word, so unfortunately we cannot have a method with such > > > name > > > > > :-) > > > > > > > > > > > >> defaultBranch() does take an `Predicate` as argument, but I > think > > > that > > > > > > is not required? > > > > > > > > > > > > Absolutely! I think that was just copy-paste error or something. > > > > > > > > > > > > Dear colleagues, > > > > > > > > > > > > please revise the new version of the KIP and Paul's PR > > > > > > (https://github.com/apache/kafka/pull/6512) > > > > > > > > > > > > Any new suggestions/objections? > > > > > > > > > > > > Regards, > > > > > > > > > > > > Ivan > > > > > > > > > > > > > > > > > > 11.04.2019 11:47, Matthias J. Sax пишет: > > > > > >> Thanks for driving the discussion of this KIP. It seems that > > > everybody > > > > > >> agrees that the current branch() method using arrays is not > > optimal. > > > > > >> > > > > > >> I had a quick look into the PR and I like the overall proposal. > > > There > > > > > >> are some minor things we need to consider. I would recommend the > > > > > >> following renaming: > > > > > >> > > > > > >> KStream#branch() -> #split() > > > > > >> KBranchedStream#addBranch() -> BranchingKStream#branch() > > > > > >> KBranchedStream#defaultBranch() -> BranchingKStream#default() > > > > > >> > > > > > >> It's just a suggestion to get slightly shorter method names. > > > > > >> > > > > > >> In the current PR, defaultBranch() does take an `Predicate` as > > > > argument, > > > > > >> but I think that is not required? > > > > > >> > > > > > >> Also, we should consider KIP-307, that was recently accepted and > > is > > > > > >> currently implemented: > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > > > > > >> > > > > > >> Ie, we should add overloads that accepted a `Named` parameter. > > > > > >> > > > > > >> > > > > > >> For the issue that the created `KStream` object are in different > > > > scopes: > > > > > >> could we extend `KBranchedStream` with a `get(int index)` method > > > that > > > > > >> returns the corresponding "branched" result `KStream` object? > > Maybe, > > > > the > > > > > >> second argument of `addBranch()` should not be a > > `Consumer<KStream>` > > > > but > > > > > >> a `Function<KStream,KStream>` and `get()` could return whatever > > the > > > > > >> `Function` returns? > > > > > >> > > > > > >> > > > > > >> Finally, I would also suggest to update the KIP with the current > > > > > >> proposal. That makes it easier to review. > > > > > >> > > > > > >> > > > > > >> -Matthias > > > > > >> > > > > > >> > > > > > >> > > > > > >> On 3/31/19 12:22 PM, Paul Whalen wrote: > > > > > >>> Ivan, > > > > > >>> > > > > > >>> I'm a bit of a novice here as well, but I think it makes sense > > for > > > > you > > > > > to > > > > > >>> revise the KIP and continue the discussion. Obviously we'll > need > > > > some > > > > > >>> buy-in from committers that have actual binding votes on > whether > > > the > > > > > KIP > > > > > >>> could be adopted. It would be great to hear if they think this > > is > > > a > > > > > good > > > > > >>> idea overall. I'm not sure if that happens just by starting a > > > vote, > > > > > or if > > > > > >>> there is generally some indication of interest beforehand. > > > > > >>> > > > > > >>> That being said, I'll continue the discussion a bit: assuming > we > > do > > > > > move > > > > > >>> forward the solution of "stream.branch() returns > > KBranchedStream", > > > do > > > > > we > > > > > >>> deprecate "stream.branch(...) returns KStream[]"? I would > favor > > > > > >>> deprecating, since having two mutually exclusive APIs that > > > accomplish > > > > > the > > > > > >>> same thing is confusing, especially when they're fairly similar > > > > > anyway. We > > > > > >>> just need to be sure we're not making something > > > impossible/difficult > > > > > that > > > > > >>> is currently possible/easy. > > > > > >>> > > > > > >>> Regarding my PR - I think the general structure would work, > it's > > > > just a > > > > > >>> little sloppy overall in terms of naming and clarity. In > > > particular, > > > > > >>> passing in the "predicates" and "children" lists which get > > modified > > > > in > > > > > >>> KBranchedStream but read from all the way KStreamLazyBranch is > a > > > bit > > > > > >>> complicated to follow. > > > > > >>> > > > > > >>> Thanks, > > > > > >>> Paul > > > > > >>> > > > > > >>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev < > > iponoma...@mail.ru > > > > > > > > > wrote: > > > > > >>> > > > > > >>>> Hi Paul! > > > > > >>>> > > > > > >>>> I read your code carefully and now I am fully convinced: your > > > > proposal > > > > > >>>> looks better and should work. We just have to document the > > crucial > > > > > fact > > > > > >>>> that KStream consumers are invoked as they're added. And then > > it's > > > > all > > > > > >>>> going to be very nice. > > > > > >>>> > > > > > >>>> What shall we do now? I should re-write the KIP and resume the > > > > > >>>> discussion here, right? > > > > > >>>> > > > > > >>>> Why are you telling that your PR 'should not be even a > starting > > > > point > > > > > if > > > > > >>>> we go in this direction'? To me it looks like a good starting > > > point. > > > > > But > > > > > >>>> as a novice in this project I might miss some important > details. > > > > > >>>> > > > > > >>>> Regards, > > > > > >>>> > > > > > >>>> Ivan > > > > > >>>> > > > > > >>>> > > > > > >>>> 28.03.2019 17:38, Paul Whalen пишет: > > > > > >>>>> Ivan, > > > > > >>>>> > > > > > >>>>> Maybe I’m missing the point, but I believe the > stream.branch() > > > > > solution > > > > > >>>> supports this. The couponIssuer::set* consumers will be > invoked > > as > > > > > they’re > > > > > >>>> added, not during streamsBuilder.build(). So the user still > > ought > > > to > > > > > be > > > > > >>>> able to call couponIssuer.coupons() afterward and depend on > the > > > > > branched > > > > > >>>> streams having been set. > > > > > >>>>> The issue I mean to point out is that it is hard to access > the > > > > > branched > > > > > >>>> streams in the same scope as the original stream (that is, not > > > > inside > > > > > the > > > > > >>>> couponIssuer), which is a problem with both proposed > solutions. > > It > > > > > can be > > > > > >>>> worked around though. > > > > > >>>>> [Also, great to hear additional interest in 401, I’m excited > to > > > > hear > > > > > >>>> your thoughts!] > > > > > >>>>> Paul > > > > > >>>>> > > > > > >>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev < > > iponoma...@mail.ru > > > > > > > > > wrote: > > > > > >>>>>> > > > > > >>>>>> Hi Paul! > > > > > >>>>>> > > > > > >>>>>> The idea to postpone the wiring of branches to the > > > > > >>>> streamsBuilder.build() also looked great for me at first > glance, > > > but > > > > > --- > > > > > >>>>>>> the newly branched streams are not available in the same > > scope > > > as > > > > > each > > > > > >>>> other. That is, if we wanted to merge them back together > again > > I > > > > > don't see > > > > > >>>> a way to do that. > > > > > >>>>>> You just took the words right out of my mouth, I was just > > going > > > to > > > > > >>>> write in details about this issue. > > > > > >>>>>> Consider the example from Bill's book, p. 101: say we need > to > > > > > identify > > > > > >>>> customers who have bought coffee and made a purchase in the > > > > > electronics > > > > > >>>> store to give them coupons. > > > > > >>>>>> This is the code I usually write under these circumstances > > using > > > > my > > > > > >>>> 'brancher' class: > > > > > >>>>>> @Setter > > > > > >>>>>> class CouponIssuer{ > > > > > >>>>>> private KStream<....> coffePurchases; > > > > > >>>>>> private KStream<....> electronicsPurchases; > > > > > >>>>>> > > > > > >>>>>> KStream<...> coupons(){ > > > > > >>>>>> return > > > > > coffePurchases.join(electronicsPurchases...)...whatever > > > > > >>>>>> > > > > > >>>>>> /*In the real world the code here can be complex, so > > > > > creation of > > > > > >>>> a separate CouponIssuer class is fully justified, in order to > > > > separate > > > > > >>>> classes' responsibilities.*/ > > > > > >>>>>> } > > > > > >>>>>> } > > > > > >>>>>> > > > > > >>>>>> CouponIssuer couponIssuer = new CouponIssuer(); > > > > > >>>>>> > > > > > >>>>>> new KafkaStreamsBrancher<....>() > > > > > >>>>>> .branch(predicate1, couponIssuer::setCoffePurchases) > > > > > >>>>>> .branch(predicate2, > > couponIssuer::setElectronicsPurchases) > > > > > >>>>>> .onTopOf(transactionStream); > > > > > >>>>>> > > > > > >>>>>> /*Alas, this won't work if we're going to wire up everything > > > > later, > > > > > >>>> without the terminal operation!!!*/ > > > > > >>>>>> couponIssuer.coupons()... > > > > > >>>>>> > > > > > >>>>>> Does this make sense? In order to properly initialize the > > > > > CouponIssuer > > > > > >>>> we need the terminal operation to be called before > > > > > streamsBuilder.build() > > > > > >>>> is called. > > > > > >>>>>> [BTW Paul, I just found out that your KIP-401 is essentially > > the > > > > > next > > > > > >>>> KIP I was going to write here. I have some thoughts based on > my > > > > > experience, > > > > > >>>> so I will join the discussion on KIP-401 soon.] > > > > > >>>>>> Regards, > > > > > >>>>>> > > > > > >>>>>> Ivan > > > > > >>>>>> > > > > > >>>>>> 28.03.2019 6:29, Paul Whalen пишет: > > > > > >>>>>>> Ivan, > > > > > >>>>>>> I tried to make a very rough proof of concept of a fluent > API > > > > based > > > > > >>>> off of > > > > > >>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), > > and > > > I > > > > > think > > > > > >>>> I > > > > > >>>>>>> succeeded at removing both cons. > > > > > >>>>>>> - Compatibility: I was incorrect earlier about > > > compatibility > > > > > >>>> issues, > > > > > >>>>>>> there aren't any direct ones. I was unaware that Java > is > > > > smart > > > > > >>>> enough to > > > > > >>>>>>> distinguish between a branch(varargs...) returning one > > > thing > > > > > and > > > > > >>>> branch() > > > > > >>>>>>> with no arguments returning another thing. > > > > > >>>>>>> - Requiring a terminal method: We don't actually need > it. > > > We > > > > > can > > > > > >>>> just > > > > > >>>>>>> build up the branches in the KBranchedStream who shares > > its > > > > > state > > > > > >>>> with the > > > > > >>>>>>> ProcessorSupplier that will actually do the branching. > > > It's > > > > > not > > > > > >>>> terribly > > > > > >>>>>>> pretty in its current form, but I think it demonstrates > > its > > > > > >>>> feasibility. > > > > > >>>>>>> To be clear, I don't think that pull request should be > final > > or > > > > > even a > > > > > >>>>>>> starting point if we go in this direction, I just wanted to > > see > > > > how > > > > > >>>>>>> challenging it would be to get the API working. > > > > > >>>>>>> I will say though, that I'm not sure the existing solution > > > could > > > > be > > > > > >>>>>>> deprecated in favor of this, which I had originally > suggested > > > > was a > > > > > >>>>>>> possibility. The reason is that the newly branched streams > > are > > > > not > > > > > >>>>>>> available in the same scope as each other. That is, if we > > > wanted > > > > > to > > > > > >>>> merge > > > > > >>>>>>> them back together again I don't see a way to do that. The > > KIP > > > > > >>>> proposal > > > > > >>>>>>> has the same issue, though - all this means is that for > > either > > > > > >>>> solution, > > > > > >>>>>>> deprecating the existing branch(...) is not on the table. > > > > > >>>>>>> Thanks, > > > > > >>>>>>> Paul > > > > > >>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev < > > > > > iponoma...@mail.ru> > > > > > >>>> wrote: > > > > > >>>>>>>> OK, let me summarize what we have discussed up to this > > point. > > > > > >>>>>>>> > > > > > >>>>>>>> First, it seems that it's commonly agreed that branch API > > > needs > > > > > >>>>>>>> improvement. Motivation is given in the KIP. > > > > > >>>>>>>> > > > > > >>>>>>>> There are two potential ways to do it: > > > > > >>>>>>>> > > > > > >>>>>>>> 1. (as origianlly proposed) > > > > > >>>>>>>> > > > > > >>>>>>>> new KafkaStreamsBrancher<..>() > > > > > >>>>>>>> .branch(predicate1, ks ->..) > > > > > >>>>>>>> .branch(predicate2, ks->..) > > > > > >>>>>>>> .defaultBranch(ks->..) //optional > > > > > >>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf returns > > its > > > > > argument > > > > > >>>>>>>> > > > > > >>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't > make > > > > sense > > > > > >>>> until > > > > > >>>>>>>> all the necessary ingredients are provided. > > > > > >>>>>>>> > > > > > >>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance > > > > > contrasts the > > > > > >>>>>>>> fluency of other KStream methods. > > > > > >>>>>>>> > > > > > >>>>>>>> 2. (as Paul proposes) > > > > > >>>>>>>> > > > > > >>>>>>>> stream > > > > > >>>>>>>> .branch(predicate1, ks ->...) > > > > > >>>>>>>> .branch(predicate2, ks->...) > > > > > >>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both > > > > > defaultBranch(..) > > > > > >>>> and > > > > > >>>>>>>> noDefault() return void > > > > > >>>>>>>> > > > > > >>>>>>>> PROS: Generally follows the way KStreams interface is > > defined. > > > > > >>>>>>>> > > > > > >>>>>>>> CONS: We need to define two terminal methods > > > > (defaultBranch(ks->) > > > > > and > > > > > >>>>>>>> noDefault()). And for a user it is very easy to miss the > > fact > > > > > that one > > > > > >>>>>>>> of the terminal methods should be called. If these methods > > are > > > > not > > > > > >>>>>>>> called, we can throw an exception in runtime. > > > > > >>>>>>>> > > > > > >>>>>>>> Colleagues, what are your thoughts? Can we do better? > > > > > >>>>>>>> > > > > > >>>>>>>> Regards, > > > > > >>>>>>>> > > > > > >>>>>>>> Ivan > > > > > >>>>>>>> > > > > > >>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: > > > > > >>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: > > > > > >>>>>>>>>> Paul, > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> I see your point when you are talking about > > > > > >>>>>>>>>> stream..branch..branch...default.. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Still, I believe that this cannot not be implemented the > > > easy > > > > > way. > > > > > >>>>>>>>>> Maybe we all should think further. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Let me comment on two of your ideas. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>> user could specify a terminal method that assumes > nothing > > > > will > > > > > >>>> reach > > > > > >>>>>>>>>>> the default branch, > > > > > >>>>>>>>>> throwing an exception if such a case occurs. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> 1) OK, apparently this should not be the only option > > besides > > > > > >>>>>>>>>> `default`, because there are scenarios when we want to > > just > > > > > silently > > > > > >>>>>>>>>> drop the messages that didn't match any predicate. 2) > > > Throwing > > > > > an > > > > > >>>>>>>>>> exception in the middle of data flow processing looks > > like a > > > > bad > > > > > >>>> idea. > > > > > >>>>>>>>>> In stream processing paradigm, I would prefer to emit a > > > > special > > > > > >>>>>>>>>> message to a dedicated stream. This is exactly where > > > `default` > > > > > can > > > > > >>>> be > > > > > >>>>>>>>>> used. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>> it would be fairly easily for the > InternalTopologyBuilder > > > to > > > > > track > > > > > >>>>>>>>>>> dangling > > > > > >>>>>>>>>> branches that haven't been terminated and raise a clear > > > error > > > > > >>>> before it > > > > > >>>>>>>>>> becomes an issue. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> You mean a runtime exception, when the program is > compiled > > > and > > > > > run? > > > > > >>>>>>>>>> Well, I'd prefer an API that simply won't compile if > used > > > > > >>>>>>>>>> incorrectly. Can we build such an API as a method chain > > > > starting > > > > > >>>> from > > > > > >>>>>>>>>> KStream object? There is a huge cost difference between > > > > runtime > > > > > and > > > > > >>>>>>>>>> compile-time errors. Even if a failure uncovers > instantly > > on > > > > > unit > > > > > >>>>>>>>>> tests, it costs more for the project than a compilation > > > > failure. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Regards, > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Ivan > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: > > > > > >>>>>>>>>>> Ivan, > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Good point about the terminal operation being required. > > > But > > > > is > > > > > >>>> that > > > > > >>>>>>>>>>> really > > > > > >>>>>>>>>>> such a bad thing? If the user doesn't want a > > defaultBranch > > > > > they > > > > > >>>> can > > > > > >>>>>>>>>>> call > > > > > >>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as > > > > > easily. In > > > > > >>>>>>>>>>> fact I > > > > > >>>>>>>>>>> think it creates an opportunity for a nicer API - a > user > > > > could > > > > > >>>> specify > > > > > >>>>>>>> a > > > > > >>>>>>>>>>> terminal method that assumes nothing will reach the > > default > > > > > branch, > > > > > >>>>>>>>>>> throwing an exception if such a case occurs. That > seems > > > like > > > > > an > > > > > >>>>>>>>>>> improvement over the current branch() API, which allows > > for > > > > the > > > > > >>>> more > > > > > >>>>>>>>>>> subtle > > > > > >>>>>>>>>>> behavior of records unexpectedly getting dropped. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> The need for a terminal operation certainly has to be > > well > > > > > >>>>>>>>>>> documented, but > > > > > >>>>>>>>>>> it would be fairly easily for the > InternalTopologyBuilder > > > to > > > > > track > > > > > >>>>>>>>>>> dangling > > > > > >>>>>>>>>>> branches that haven't been terminated and raise a clear > > > error > > > > > >>>> before it > > > > > >>>>>>>>>>> becomes an issue. Especially now that there is a > "build > > > > step" > > > > > >>>> where > > > > > >>>>>>>> the > > > > > >>>>>>>>>>> topology is actually wired up, when > > StreamsBuilder.build() > > > is > > > > > >>>> called. > > > > > >>>>>>>>>>> Regarding onTopOf() returning its argument, I agree > that > > > it's > > > > > >>>>>>>>>>> critical to > > > > > >>>>>>>>>>> allow users to do other operations on the input stream. > > > With > > > > > the > > > > > >>>>>>>> fluent > > > > > >>>>>>>>>>> solution, it ought to work the same way all other > > > operations > > > > > do - > > > > > >>>> if > > > > > >>>>>>>> you > > > > > >>>>>>>>>>> want to process off the original KStream multiple > times, > > > you > > > > > just > > > > > >>>>>>>>>>> need the > > > > > >>>>>>>>>>> stream as a variable so you can call as many operations > > on > > > it > > > > > as > > > > > >>>> you > > > > > >>>>>>>>>>> desire. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Thoughts? > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Best, > > > > > >>>>>>>>>>> Paul > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev < > > > > > iponoma...@mail.ru > > > > > >>>>>>>>>>> wrote: > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>>> Hello Paul, > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> I afraid this won't work because we do not always need > > the > > > > > >>>>>>>>>>>> defaultBranch. And without a terminal operation we > don't > > > > know > > > > > >>>> when to > > > > > >>>>>>>>>>>> finalize and build the 'branch switch'. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we > can > > do > > > > > >>>> something > > > > > >>>>>>>>>>>> more with the original branch after branching. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> I understand your point that the need of special > object > > > > > >>>> construction > > > > > >>>>>>>>>>>> contrasts the fluency of most KStream methods. But > here > > we > > > > > have a > > > > > >>>>>>>>>>>> special case: we build the switch to split the flow, > so > > I > > > > > think > > > > > >>>> this > > > > > >>>>>>>> is > > > > > >>>>>>>>>>>> still idiomatic. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> Regards, > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> Ivan > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: > > > > > >>>>>>>>>>>>> Ivan, > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> I think it's a great idea to improve this API, but I > > find > > > > the > > > > > >>>>>>>>>>>>> onTopOff() > > > > > >>>>>>>>>>>>> mechanism a little confusing since it contrasts the > > > fluency > > > > > of > > > > > >>>> other > > > > > >>>>>>>>>>>>> KStream method calls. Ideally I'd like to just call > a > > > > > method on > > > > > >>>> the > > > > > >>>>>>>>>>>> stream > > > > > >>>>>>>>>>>>> so it still reads top to bottom if the branch cases > are > > > > > defined > > > > > >>>>>>>>>>>>> fluently. > > > > > >>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very > > nice > > > > > and the > > > > > >>>>>>>>>>>>> right > > > > > >>>>>>>>>>>> way > > > > > >>>>>>>>>>>>> to do things, but what if we flipped around how we > > > specify > > > > > the > > > > > >>>> source > > > > > >>>>>>>>>>>>> stream. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Like: > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> stream.branch() > > > > > >>>>>>>>>>>>> .addBranch(predicate1, this::handle1) > > > > > >>>>>>>>>>>>> .addBranch(predicate2, this::handle2) > > > > > >>>>>>>>>>>>> .defaultBranch(this::handleDefault); > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Where branch() returns a KBranchedStreams or > > > > KStreamBrancher > > > > > or > > > > > >>>>>>>>>>>> something, > > > > > >>>>>>>>>>>>> which is added to by addBranch() and terminated by > > > > > >>>> defaultBranch() > > > > > >>>>>>>>>>>>> (which > > > > > >>>>>>>>>>>>> returns void). This is obviously incompatible with > the > > > > > current > > > > > >>>>>>>>>>>>> API, so > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>> new stream.branch() would have to have a different > > name, > > > > but > > > > > that > > > > > >>>>>>>>>>>>> seems > > > > > >>>>>>>>>>>>> like a fairly small problem - we could call it > > something > > > > like > > > > > >>>>>>>>>>>>> branched() > > > > > >>>>>>>>>>>> or > > > > > >>>>>>>>>>>>> branchedStreams() and deprecate the old API. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Does this satisfy the motivations of your KIP? It > > seems > > > > > like it > > > > > >>>>>>>>>>>>> does to > > > > > >>>>>>>>>>>>> me, allowing for clear in-line branching while also > > > > allowing > > > > > you > > > > > >>>> to > > > > > >>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams > > if > > > > > desired. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Thanks, > > > > > >>>>>>>>>>>>> Paul > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev > > > > > >>>>>>>>>>>> <iponoma...@mail.ru.invalid> > > > > > >>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Hi Bill, > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Thank you for your reply! > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> This is how I usually do it: > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){ > > > > > >>>>>>>>>>>>>> ks.filter(....).mapValues(...) > > > > > >>>>>>>>>>>>>> } > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){ > > > > > >>>>>>>>>>>>>> ks.selectKey(...).groupByKey()... > > > > > >>>>>>>>>>>>>> } > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> ...... > > > > > >>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>() > > > > > >>>>>>>>>>>>>> .addBranch(predicate1, this::handleFirstCase) > > > > > >>>>>>>>>>>>>> .addBranch(predicate2, this::handleSecondCase) > > > > > >>>>>>>>>>>>>> .onTopOf(....) > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Regards, > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Ivan > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: > > > > > >>>>>>>>>>>>>>> Hi Ivan, > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Thanks for the KIP. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher > takes a > > > > > Consumer > > > > > >>>> as a > > > > > >>>>>>>>>>>>>> second > > > > > >>>>>>>>>>>>>>> argument which returns nothing, and the example in > > the > > > > KIP > > > > > >>>> shows > > > > > >>>>>>>>>>>>>>> each > > > > > >>>>>>>>>>>>>>> stream from the branch using a terminal node > > > > > (KafkaStreams#to() > > > > > >>>>>>>>>>>>>>> in this > > > > > >>>>>>>>>>>>>>> case). > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Maybe I've missed something, but how would we > handle > > > the > > > > > case > > > > > >>>>>>>>>>>>>>> where the > > > > > >>>>>>>>>>>>>>> user has created a branch but wants to continue > > > > processing > > > > > and > > > > > >>>> not > > > > > >>>>>>>>>>>>>>> necessarily use a terminal node on the branched > > stream > > > > > >>>> immediately? > > > > > >>>>>>>>>>>>>>> For example, using today's logic as is if we had > > > > something > > > > > like > > > > > >>>>>>>>>>>>>>> this: > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> KStream<String, String>[] branches = > > > > > >>>>>>>>>>>>>>> originalStream.branch(predicate1, > > > > > >>>>>>>>>>>>>>> predicate2); > > > > > >>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. > > > > > >>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Thanks! > > > > > >>>>>>>>>>>>>>> Bill > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck < > > > > > bbej...@gmail.com > > > > > >>>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> All, > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- > 418. > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Here's the original message: > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Hello, > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. > Please > > > > take > > > > > a > > > > > >>>> look > > > > > >>>>>>>> at > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :) > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> KIP-418: > > > > > >>>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream > > > > > >>>>>>>>>>>>>>>> JIRA KAFKA-5488: > > > > > >>>> https://issues.apache.org/jira/browse/KAFKA-5488 > > > > > >>>>>>>>>>>>>>>> PR#6164: > https://github.com/apache/kafka/pull/6164 > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Regards, > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Ivan Ponomarev > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > > > > > > > > > > > > > > >