Ivan, Good point about the terminal operation being required. But is that really such a bad thing? If the user doesn't want a defaultBranch they can call some other terminal method (noDefaultBranch()?) just as easily. In fact I think it creates an opportunity for a nicer API - a user could specify a terminal method that assumes nothing will reach the default branch, throwing an exception if such a case occurs. That seems like an improvement over the current branch() API, which allows for the more subtle behavior of records unexpectedly getting dropped.
The need for a terminal operation certainly has to be well documented, but it would be fairly easily for the InternalTopologyBuilder to track dangling branches that haven't been terminated and raise a clear error before it becomes an issue. Especially now that there is a "build step" where the topology is actually wired up, when StreamsBuilder.build() is called. Regarding onTopOf() returning its argument, I agree that it's critical to allow users to do other operations on the input stream. With the fluent solution, it ought to work the same way all other operations do - if you want to process off the original KStream multiple times, you just need the stream as a variable so you can call as many operations on it as you desire. Thoughts? Best, Paul On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <iponoma...@mail.ru> wrote: > Hello Paul, > > I afraid this won't work because we do not always need the > defaultBranch. And without a terminal operation we don't know when to > finalize and build the 'branch switch'. > > In my proposal, onTopOf returns its argument, so we can do something > more with the original branch after branching. > > I understand your point that the need of special object construction > contrasts the fluency of most KStream methods. But here we have a > special case: we build the switch to split the flow, so I think this is > still idiomatic. > > Regards, > > Ivan > > > > 24.03.2019 4:02, Paul Whalen пишет: > > Ivan, > > > > I think it's a great idea to improve this API, but I find the onTopOff() > > mechanism a little confusing since it contrasts the fluency of other > > KStream method calls. Ideally I'd like to just call a method on the > stream > > so it still reads top to bottom if the branch cases are defined fluently. > > I think the addBranch(predicate, handleCase) is very nice and the right > way > > to do things, but what if we flipped around how we specify the source > > stream. > > > > Like: > > > > stream.branch() > > .addBranch(predicate1, this::handle1) > > .addBranch(predicate2, this::handle2) > > .defaultBranch(this::handleDefault); > > > > Where branch() returns a KBranchedStreams or KStreamBrancher or > something, > > which is added to by addBranch() and terminated by defaultBranch() (which > > returns void). This is obviously incompatible with the current API, so > the > > new stream.branch() would have to have a different name, but that seems > > like a fairly small problem - we could call it something like branched() > or > > branchedStreams() and deprecate the old API. > > > > Does this satisfy the motivations of your KIP? It seems like it does to > > me, allowing for clear in-line branching while also allowing you to > > dynamically build of branches off of KBranchedStreams if desired. > > > > Thanks, > > Paul > > > > > > > > On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev > <iponoma...@mail.ru.invalid> > > wrote: > > > >> Hi Bill, > >> > >> Thank you for your reply! > >> > >> This is how I usually do it: > >> > >> void handleFirstCase(KStream<String, String> ks){ > >> ks.filter(....).mapValues(...) > >> } > >> > >> > >> void handleSecondCase(KStream<String, String> ks){ > >> ks.selectKey(...).groupByKey()... > >> } > >> > >> ...... > >> new KafkaStreamsBrancher<String, String>() > >> .addBranch(predicate1, this::handleFirstCase) > >> .addBranch(predicate2, this::handleSecondCase) > >> .onTopOf(....) > >> > >> Regards, > >> > >> Ivan > >> > >> 22.03.2019 1:34, Bill Bejeck пишет: > >>> Hi Ivan, > >>> > >>> Thanks for the KIP. > >>> > >>> I have one question, the KafkaStreamsBrancher takes a Consumer as a > >> second > >>> argument which returns nothing, and the example in the KIP shows each > >>> stream from the branch using a terminal node (KafkaStreams#to() in this > >>> case). > >>> > >>> Maybe I've missed something, but how would we handle the case where the > >>> user has created a branch but wants to continue processing and not > >>> necessarily use a terminal node on the branched stream immediately? > >>> > >>> For example, using today's logic as is if we had something like this: > >>> > >>> KStream<String, String>[] branches = originalStream.branch(predicate1, > >>> predicate2); > >>> branches[0].filter(....).mapValues(...).. > >>> branches[1].selectKey(...).groupByKey()..... > >>> > >>> > >>> Thanks! > >>> Bill > >>> > >>> > >>> > >>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com> wrote: > >>> > >>>> All, > >>>> > >>>> I'd like to jump-start the discussion for KIP- 418. > >>>> > >>>> Here's the original message: > >>>> > >>>> Hello, > >>>> > >>>> I'd like to start a discussion about KIP-418. Please take a look at > the > >>>> KIP if you can, I would appreciate any feedback :) > >>>> > >>>> KIP-418: > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream > >>>> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488 > >>>> > >>>> PR#6164: https://github.com/apache/kafka/pull/6164 > >>>> > >>>> Regards, > >>>> > >>>> Ivan Ponomarev > >>>> > >>>> > >> > >