I also agree with Michael's observation about the core problem of current `branch()` implementation.
However, I also don't like to pass in a clumsy Map object. My thinking was more aligned with Paul's proposal to just add a name to each `branch()` statement and return a `Map<String,KStream>`. It makes the code easier to read, and also make the order of `Predicates` (that is essential) easier to grasp. >>>> Map<String, KStream<K, V>> branches = stream.split() >>>> .branch("branchOne", Predicate<K, V>) >>>> .branch( "branchTwo", Predicate<K, V>) >>>> .defaultBranch("defaultBranch"); An open question is the case for which no defaultBranch() should be specified. Atm, `split()` and `branch()` would return `BranchedKStream` and the call to `defaultBranch()` that returns the `Map` is mandatory (what is not the case atm). Or is this actually not a real problem, because users can just ignore the branch returned by `defaultBranch()` in the result `Map` ? About "inlining": So far, it seems to be a matter of personal preference. I can see arguments for both, but no "killer argument" yet that clearly make the case for one or the other. -Matthias On 5/1/19 6:26 PM, Paul Whalen wrote: > Perhaps inlining is the wrong terminology. It doesn’t require that a lambda > with the full downstream topology be defined inline - it can be a method > reference as with Ivan’s original suggestion. The advantage of putting the > predicate and its downstream logic (Consumer) together in branch() is that > they are required to be near to each other. > > Ultimately the downstream code has to live somewhere, and deep branch trees > will be hard to read regardless. > >> On May 1, 2019, at 1:07 PM, Michael Drogalis <michael.droga...@confluent.io> >> wrote: >> >> I'm less enthusiastic about inlining the branch logic with its downstream >> functionality. Programs that have deep branch trees will quickly become >> harder to read as a single unit. >> >>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com> wrote: >>> >>> Also +1 on the issues/goals as Michael outlined them, I think that sets a >>> great framework for the discussion. >>> >>> Regarding the SortedMap solution, my understanding is that the current >>> proposal in the KIP is what is in my PR which (pending naming decisions) is >>> roughly this: >>> >>> stream.split() >>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>> .defaultBranch(Consumer<KStream<K, V>>); >>> >>> Obviously some ordering is necessary, since branching as a construct >>> doesn't work without it, but this solution seems like it provides as much >>> associativity as the SortedMap solution, because each branch() call >>> directly associates the "conditional" with the "code block." The value it >>> provides over the KIP solution is the accessing of streams in the same >>> scope. >>> >>> The KIP solution is less "dynamic" than the SortedMap solution in the sense >>> that it is slightly clumsier to add a dynamic number of branches, but it is >>> certainly possible. It seems to me like the API should favor the "static" >>> case anyway, and should make it simple and readable to fluently declare and >>> access your branches in-line. It also makes it impossible to ignore a >>> branch, and it is possible to build an (almost) identical SortedMap >>> solution on top of it. >>> >>> I could also see a middle ground where instead of a raw SortedMap being >>> taken in, branch() takes a name and not a Consumer. Something like this: >>> >>> Map<String, KStream<K, V>> branches = stream.split() >>> .branch("branchOne", Predicate<K, V>) >>> .branch( "branchTwo", Predicate<K, V>) >>> .defaultBranch("defaultBranch", Consumer<KStream<K, V>>); >>> >>> Pros for that solution: >>> - accessing branched KStreams in same scope >>> - no double brace initialization, hopefully slightly more readable than >>> SortedMap >>> >>> Cons >>> - downstream branch logic cannot be specified inline which makes it harder >>> to read top to bottom (like existing API and SortedMap, but unlike the KIP) >>> - you can forget to "handle" one of the branched streams (like existing >>> API and SortedMap, but unlike the KIP) >>> >>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing >>> it). >>> >>> Overall I'm curious how important it is to be able to easily access the >>> branched KStream in the same scope as the original. It's possible that it >>> doesn't need to be handled directly by the API, but instead left up to the >>> user. I'm sort of in the middle on it. >>> >>> Paul >>> >>> >>> >>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <sop...@confluent.io> >>> wrote: >>> >>>> I'd like to +1 what Michael said about the issues with the existing >>> branch >>>> method, I agree with what he's outlined and I think we should proceed by >>>> trying to alleviate these problems. Specifically it seems important to be >>>> able to cleanly access the individual branches (eg by mapping >>>> name->stream), which I thought was the original intention of this KIP. >>>> >>>> That said, I don't think we should so easily give in to the double brace >>>> anti-pattern or force ours users into it if at all possible to >>> avoid...just >>>> my two cents. >>>> >>>> Cheers, >>>> Sophie >>>> >>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < >>>> michael.droga...@confluent.io> wrote: >>>> >>>>> I’d like to propose a different way of thinking about this. To me, >>> there >>>>> are three problems with the existing branch signature: >>>>> >>>>> 1. If you use it the way most people do, Java raises unsafe type >>>> warnings. >>>>> 2. The way in which you use the stream branches is positionally coupled >>>> to >>>>> the ordering of the conditionals. >>>>> 3. It is brittle to extend existing branch calls with additional code >>>>> paths. >>>>> >>>>> Using associative constructs instead of relying on ordered constructs >>>> would >>>>> be a stronger approach. Consider a signature that instead looks like >>>> this: >>>>> >>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<? >>>>> super K,? super V>>); >>>>> >>>>> Branches are given names in a map, and as a result, the API returns a >>>>> mapping of names to streams. The ordering of the conditionals is >>>> maintained >>>>> because it’s a sorted map. Insert order determines the order of >>>> evaluation. >>>>> >>>>> This solves problem 1 because there are no more varargs. It solves >>>> problem >>>>> 2 because you no longer lean on ordering to access the branch you’re >>>>> interested in. It solves problem 3 because you can introduce another >>>>> conditional by simply attaching another name to the structure, rather >>>> than >>>>> messing with the existing indices. >>>>> >>>>> One of the drawbacks is that creating the map inline is historically >>>>> awkward in Java. I know it’s an anti-pattern to use voluminously, but >>>>> double brace initialization would clean up the aesthetics. >>>>> >>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io> >>> wrote: >>>>> >>>>>> Hi Ivan, >>>>>> >>>>>> Thanks for the update. >>>>>> >>>>>> FWIW, I agree with Matthias that the current "start branching" >>> operator >>>>> is >>>>>> confusing when named the same way as the actual branches. "Split" >>> seems >>>>>> like a good name. Alternatively, we can do without a "start >>> branching" >>>>>> operator at all, and just do: >>>>>> >>>>>> stream >>>>>> .branch(Predicate) >>>>>> .branch(Predicate) >>>>>> .defaultBranch(); >>>>>> >>>>>> Tentatively, I think that this branching operation should be >>> terminal. >>>>> That >>>>>> way, we don't create ambiguity about how to use it. That is, `branch` >>>>>> should return `KBranchedStream`, while `defaultBranch` is `void`, to >>>>>> enforce that it comes last, and that there is only one definition of >>>> the >>>>>> default branch. Potentially, we should log a warning if there's no >>>>> default, >>>>>> and additionally log a warning (or throw an exception) if a record >>>> falls >>>>>> though with no default. >>>>>> >>>>>> Thoughts? >>>>>> >>>>>> Thanks, >>>>>> -John >>>>>> >>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < >>> matth...@confluent.io >>>>> >>>>>> wrote: >>>>>> >>>>>>> Thanks for updating the KIP and your answers. >>>>>>> >>>>>>> >>>>>>>> this is to make the name similar to String#split >>>>>>>>> that also returns an array, right? >>>>>>> >>>>>>> The intend was to avoid name duplication. The return type should >>>> _not_ >>>>>>> be an array. >>>>>>> >>>>>>> The current proposal is >>>>>>> >>>>>>> stream.branch() >>>>>>> .branch(Predicate) >>>>>>> .branch(Predicate) >>>>>>> .defaultBranch(); >>>>>>> >>>>>>> IMHO, this reads a little odd, because the first `branch()` does >>> not >>>>>>> take any parameters and has different semantics than the later >>>>>>> `branch()` calls. Note, that from the code snippet above, it's >>> hidden >>>>>>> that the first call is `KStream#branch()` while the others are >>>>>>> `KBranchedStream#branch()` what makes reading the code harder. >>>>>>> >>>>>>> Because I suggested to rename `addBranch()` -> `branch()`, I though >>>> it >>>>>>> might be better to also rename `KStream#branch()` to avoid the >>> naming >>>>>>> overlap that seems to be confusing. The following reads much >>> cleaner >>>> to >>>>>> me: >>>>>>> >>>>>>> stream.split() >>>>>>> .branch(Predicate) >>>>>>> .branch(Predicate) >>>>>>> .defaultBranch(); >>>>>>> >>>>>>> Maybe there is a better alternative to `split()` though to avoid >>> the >>>>>>> naming overlap. >>>>>>> >>>>>>> >>>>>>>> 'default' is, however, a reserved word, so unfortunately we >>> cannot >>>>> have >>>>>>> a method with such name :-) >>>>>>> >>>>>>> Bummer. Didn't consider this. Maybe we can still come up with a >>> short >>>>>> name? >>>>>>> >>>>>>> >>>>>>> Can you add the interface `KBranchedStream` to the KIP with all >>> it's >>>>>>> methods? It will be part of public API and should be contained in >>> the >>>>>>> KIP. For example, it's unclear atm, what the return type of >>>>>>> `defaultBranch()` is. >>>>>>> >>>>>>> >>>>>>> You did not comment on the idea to add a `KBranchedStream#get(int >>>>> index) >>>>>>> -> KStream` method to get the individually branched-KStreams. Would >>>> be >>>>>>> nice to get your feedback about it. It seems you suggest that users >>>>>>> would need to write custom utility code otherwise, to access them. >>> We >>>>>>> should discuss the pros and cons of both approaches. It feels >>>>>>> "incomplete" to me atm, if the API has no built-in support to get >>> the >>>>>>> branched-KStreams directly. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: >>>>>>>> Hi all! >>>>>>>> >>>>>>>> I have updated the KIP-418 according to the new vision. >>>>>>>> >>>>>>>> Matthias, thanks for your comment! >>>>>>>> >>>>>>>>> Renaming KStream#branch() -> #split() >>>>>>>> >>>>>>>> I can see your point: this is to make the name similar to >>>>> String#split >>>>>>>> that also returns an array, right? But is it worth the loss of >>>>>> backwards >>>>>>>> compatibility? We can have overloaded branch() as well without >>>>>> affecting >>>>>>>> the existing code. Maybe the old array-based `branch` method >>> should >>>>> be >>>>>>>> deprecated, but this is a subject for discussion. >>>>>>>> >>>>>>>>> Renaming KBranchedStream#addBranch() -> >>> BranchingKStream#branch(), >>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default() >>>>>>>> >>>>>>>> Totally agree with 'addBranch->branch' rename. 'default' is, >>>>> however, a >>>>>>>> reserved word, so unfortunately we cannot have a method with such >>>>> name >>>>>>> :-) >>>>>>>> >>>>>>>>> defaultBranch() does take an `Predicate` as argument, but I >>> think >>>>> that >>>>>>>> is not required? >>>>>>>> >>>>>>>> Absolutely! I think that was just copy-paste error or something. >>>>>>>> >>>>>>>> Dear colleagues, >>>>>>>> >>>>>>>> please revise the new version of the KIP and Paul's PR >>>>>>>> (https://github.com/apache/kafka/pull/6512) >>>>>>>> >>>>>>>> Any new suggestions/objections? >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Ivan >>>>>>>> >>>>>>>> >>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет: >>>>>>>>> Thanks for driving the discussion of this KIP. It seems that >>>>> everybody >>>>>>>>> agrees that the current branch() method using arrays is not >>>> optimal. >>>>>>>>> >>>>>>>>> I had a quick look into the PR and I like the overall proposal. >>>>> There >>>>>>>>> are some minor things we need to consider. I would recommend the >>>>>>>>> following renaming: >>>>>>>>> >>>>>>>>> KStream#branch() -> #split() >>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch() >>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default() >>>>>>>>> >>>>>>>>> It's just a suggestion to get slightly shorter method names. >>>>>>>>> >>>>>>>>> In the current PR, defaultBranch() does take an `Predicate` as >>>>>> argument, >>>>>>>>> but I think that is not required? >>>>>>>>> >>>>>>>>> Also, we should consider KIP-307, that was recently accepted and >>>> is >>>>>>>>> currently implemented: >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>>>>> >>>>>>>>> Ie, we should add overloads that accepted a `Named` parameter. >>>>>>>>> >>>>>>>>> >>>>>>>>> For the issue that the created `KStream` object are in different >>>>>> scopes: >>>>>>>>> could we extend `KBranchedStream` with a `get(int index)` method >>>>> that >>>>>>>>> returns the corresponding "branched" result `KStream` object? >>>> Maybe, >>>>>> the >>>>>>>>> second argument of `addBranch()` should not be a >>>> `Consumer<KStream>` >>>>>> but >>>>>>>>> a `Function<KStream,KStream>` and `get()` could return whatever >>>> the >>>>>>>>> `Function` returns? >>>>>>>>> >>>>>>>>> >>>>>>>>> Finally, I would also suggest to update the KIP with the current >>>>>>>>> proposal. That makes it easier to review. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: >>>>>>>>>> Ivan, >>>>>>>>>> >>>>>>>>>> I'm a bit of a novice here as well, but I think it makes sense >>>> for >>>>>> you >>>>>>> to >>>>>>>>>> revise the KIP and continue the discussion. Obviously we'll >>> need >>>>>> some >>>>>>>>>> buy-in from committers that have actual binding votes on >>> whether >>>>> the >>>>>>> KIP >>>>>>>>>> could be adopted. It would be great to hear if they think this >>>> is >>>>> a >>>>>>> good >>>>>>>>>> idea overall. I'm not sure if that happens just by starting a >>>>> vote, >>>>>>> or if >>>>>>>>>> there is generally some indication of interest beforehand. >>>>>>>>>> >>>>>>>>>> That being said, I'll continue the discussion a bit: assuming >>> we >>>> do >>>>>>> move >>>>>>>>>> forward the solution of "stream.branch() returns >>>> KBranchedStream", >>>>> do >>>>>>> we >>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? I would >>> favor >>>>>>>>>> deprecating, since having two mutually exclusive APIs that >>>>> accomplish >>>>>>> the >>>>>>>>>> same thing is confusing, especially when they're fairly similar >>>>>>> anyway. We >>>>>>>>>> just need to be sure we're not making something >>>>> impossible/difficult >>>>>>> that >>>>>>>>>> is currently possible/easy. >>>>>>>>>> >>>>>>>>>> Regarding my PR - I think the general structure would work, >>> it's >>>>>> just a >>>>>>>>>> little sloppy overall in terms of naming and clarity. In >>>>> particular, >>>>>>>>>> passing in the "predicates" and "children" lists which get >>>> modified >>>>>> in >>>>>>>>>> KBranchedStream but read from all the way KStreamLazyBranch is >>> a >>>>> bit >>>>>>>>>> complicated to follow. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Paul >>>>>>>>>> >>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev < >>>> iponoma...@mail.ru >>>>>> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Paul! >>>>>>>>>>> >>>>>>>>>>> I read your code carefully and now I am fully convinced: your >>>>>> proposal >>>>>>>>>>> looks better and should work. We just have to document the >>>> crucial >>>>>>> fact >>>>>>>>>>> that KStream consumers are invoked as they're added. And then >>>> it's >>>>>> all >>>>>>>>>>> going to be very nice. >>>>>>>>>>> >>>>>>>>>>> What shall we do now? I should re-write the KIP and resume the >>>>>>>>>>> discussion here, right? >>>>>>>>>>> >>>>>>>>>>> Why are you telling that your PR 'should not be even a >>> starting >>>>>> point >>>>>>> if >>>>>>>>>>> we go in this direction'? To me it looks like a good starting >>>>> point. >>>>>>> But >>>>>>>>>>> as a novice in this project I might miss some important >>> details. >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> Ivan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет: >>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>> Maybe I’m missing the point, but I believe the >>> stream.branch() >>>>>>> solution >>>>>>>>>>> supports this. The couponIssuer::set* consumers will be >>> invoked >>>> as >>>>>>> they’re >>>>>>>>>>> added, not during streamsBuilder.build(). So the user still >>>> ought >>>>> to >>>>>>> be >>>>>>>>>>> able to call couponIssuer.coupons() afterward and depend on >>> the >>>>>>> branched >>>>>>>>>>> streams having been set. >>>>>>>>>>>> The issue I mean to point out is that it is hard to access >>> the >>>>>>> branched >>>>>>>>>>> streams in the same scope as the original stream (that is, not >>>>>> inside >>>>>>> the >>>>>>>>>>> couponIssuer), which is a problem with both proposed >>> solutions. >>>> It >>>>>>> can be >>>>>>>>>>> worked around though. >>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m excited >>> to >>>>>> hear >>>>>>>>>>> your thoughts!] >>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev < >>>> iponoma...@mail.ru >>>>>> >>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Paul! >>>>>>>>>>>>> >>>>>>>>>>>>> The idea to postpone the wiring of branches to the >>>>>>>>>>> streamsBuilder.build() also looked great for me at first >>> glance, >>>>> but >>>>>>> --- >>>>>>>>>>>>>> the newly branched streams are not available in the same >>>> scope >>>>> as >>>>>>> each >>>>>>>>>>> other. That is, if we wanted to merge them back together >>> again >>>> I >>>>>>> don't see >>>>>>>>>>> a way to do that. >>>>>>>>>>>>> You just took the words right out of my mouth, I was just >>>> going >>>>> to >>>>>>>>>>> write in details about this issue. >>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say we need >>> to >>>>>>> identify >>>>>>>>>>> customers who have bought coffee and made a purchase in the >>>>>>> electronics >>>>>>>>>>> store to give them coupons. >>>>>>>>>>>>> This is the code I usually write under these circumstances >>>> using >>>>>> my >>>>>>>>>>> 'brancher' class: >>>>>>>>>>>>> @Setter >>>>>>>>>>>>> class CouponIssuer{ >>>>>>>>>>>>> private KStream<....> coffePurchases; >>>>>>>>>>>>> private KStream<....> electronicsPurchases; >>>>>>>>>>>>> >>>>>>>>>>>>> KStream<...> coupons(){ >>>>>>>>>>>>> return >>>>>>> coffePurchases.join(electronicsPurchases...)...whatever >>>>>>>>>>>>> >>>>>>>>>>>>> /*In the real world the code here can be complex, so >>>>>>> creation of >>>>>>>>>>> a separate CouponIssuer class is fully justified, in order to >>>>>> separate >>>>>>>>>>> classes' responsibilities.*/ >>>>>>>>>>>>> } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer(); >>>>>>>>>>>>> >>>>>>>>>>>>> new KafkaStreamsBrancher<....>() >>>>>>>>>>>>> .branch(predicate1, couponIssuer::setCoffePurchases) >>>>>>>>>>>>> .branch(predicate2, >>>> couponIssuer::setElectronicsPurchases) >>>>>>>>>>>>> .onTopOf(transactionStream); >>>>>>>>>>>>> >>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up everything >>>>>> later, >>>>>>>>>>> without the terminal operation!!!*/ >>>>>>>>>>>>> couponIssuer.coupons()... >>>>>>>>>>>>> >>>>>>>>>>>>> Does this make sense? In order to properly initialize the >>>>>>> CouponIssuer >>>>>>>>>>> we need the terminal operation to be called before >>>>>>> streamsBuilder.build() >>>>>>>>>>> is called. >>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is essentially >>>> the >>>>>>> next >>>>>>>>>>> KIP I was going to write here. I have some thoughts based on >>> my >>>>>>> experience, >>>>>>>>>>> so I will join the discussion on KIP-401 soon.] >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Ivan >>>>>>>>>>>>> >>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет: >>>>>>>>>>>>>> Ivan, >>>>>>>>>>>>>> I tried to make a very rough proof of concept of a fluent >>> API >>>>>> based >>>>>>>>>>> off of >>>>>>>>>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), >>>> and >>>>> I >>>>>>> think >>>>>>>>>>> I >>>>>>>>>>>>>> succeeded at removing both cons. >>>>>>>>>>>>>> - Compatibility: I was incorrect earlier about >>>>> compatibility >>>>>>>>>>> issues, >>>>>>>>>>>>>> there aren't any direct ones. I was unaware that Java >>> is >>>>>> smart >>>>>>>>>>> enough to >>>>>>>>>>>>>> distinguish between a branch(varargs...) returning one >>>>> thing >>>>>>> and >>>>>>>>>>> branch() >>>>>>>>>>>>>> with no arguments returning another thing. >>>>>>>>>>>>>> - Requiring a terminal method: We don't actually need >>> it. >>>>> We >>>>>>> can >>>>>>>>>>> just >>>>>>>>>>>>>> build up the branches in the KBranchedStream who shares >>>> its >>>>>>> state >>>>>>>>>>> with the >>>>>>>>>>>>>> ProcessorSupplier that will actually do the branching. >>>>> It's >>>>>>> not >>>>>>>>>>> terribly >>>>>>>>>>>>>> pretty in its current form, but I think it demonstrates >>>> its >>>>>>>>>>> feasibility. >>>>>>>>>>>>>> To be clear, I don't think that pull request should be >>> final >>>> or >>>>>>> even a >>>>>>>>>>>>>> starting point if we go in this direction, I just wanted to >>>> see >>>>>> how >>>>>>>>>>>>>> challenging it would be to get the API working. >>>>>>>>>>>>>> I will say though, that I'm not sure the existing solution >>>>> could >>>>>> be >>>>>>>>>>>>>> deprecated in favor of this, which I had originally >>> suggested >>>>>> was a >>>>>>>>>>>>>> possibility. The reason is that the newly branched streams >>>> are >>>>>> not >>>>>>>>>>>>>> available in the same scope as each other. That is, if we >>>>> wanted >>>>>>> to >>>>>>>>>>> merge >>>>>>>>>>>>>> them back together again I don't see a way to do that. The >>>> KIP >>>>>>>>>>> proposal >>>>>>>>>>>>>> has the same issue, though - all this means is that for >>>> either >>>>>>>>>>> solution, >>>>>>>>>>>>>> deprecating the existing branch(...) is not on the table. >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev < >>>>>>> iponoma...@mail.ru> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to this >>>> point. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> First, it seems that it's commonly agreed that branch API >>>>> needs >>>>>>>>>>>>>>> improvement. Motivation is given in the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> There are two potential ways to do it: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. (as origianlly proposed) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() >>>>>>>>>>>>>>> .branch(predicate1, ks ->..) >>>>>>>>>>>>>>> .branch(predicate2, ks->..) >>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional >>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf returns >>>> its >>>>>>> argument >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't >>> make >>>>>> sense >>>>>>>>>>> until >>>>>>>>>>>>>>> all the necessary ingredients are provided. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance >>>>>>> contrasts the >>>>>>>>>>>>>>> fluency of other KStream methods. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2. (as Paul proposes) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> stream >>>>>>>>>>>>>>> .branch(predicate1, ks ->...) >>>>>>>>>>>>>>> .branch(predicate2, ks->...) >>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both >>>>>>> defaultBranch(..) >>>>>>>>>>> and >>>>>>>>>>>>>>> noDefault() return void >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface is >>>> defined. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> CONS: We need to define two terminal methods >>>>>> (defaultBranch(ks->) >>>>>>> and >>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to miss the >>>> fact >>>>>>> that one >>>>>>>>>>>>>>> of the terminal methods should be called. If these methods >>>> are >>>>>> not >>>>>>>>>>>>>>> called, we can throw an exception in runtime. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do better? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: >>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: >>>>>>>>>>>>>>>>> Paul, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I see your point when you are talking about >>>>>>>>>>>>>>>>> stream..branch..branch...default.. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Still, I believe that this cannot not be implemented the >>>>> easy >>>>>>> way. >>>>>>>>>>>>>>>>> Maybe we all should think further. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Let me comment on two of your ideas. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> user could specify a terminal method that assumes >>> nothing >>>>>> will >>>>>>>>>>> reach >>>>>>>>>>>>>>>>>> the default branch, >>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only option >>>> besides >>>>>>>>>>>>>>>>> `default`, because there are scenarios when we want to >>>> just >>>>>>> silently >>>>>>>>>>>>>>>>> drop the messages that didn't match any predicate. 2) >>>>> Throwing >>>>>>> an >>>>>>>>>>>>>>>>> exception in the middle of data flow processing looks >>>> like a >>>>>> bad >>>>>>>>>>> idea. >>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to emit a >>>>>> special >>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly where >>>>> `default` >>>>>>> can >>>>>>>>>>> be >>>>>>>>>>>>>>>>> used. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> it would be fairly easily for the >>> InternalTopologyBuilder >>>>> to >>>>>>> track >>>>>>>>>>>>>>>>>> dangling >>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear >>>>> error >>>>>>>>>>> before it >>>>>>>>>>>>>>>>> becomes an issue. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is >>> compiled >>>>> and >>>>>>> run? >>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't compile if >>> used >>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a method chain >>>>>> starting >>>>>>>>>>> from >>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference between >>>>>> runtime >>>>>>> and >>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers >>> instantly >>>> on >>>>>>> unit >>>>>>>>>>>>>>>>> tests, it costs more for the project than a compilation >>>>>> failure. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: >>>>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Good point about the terminal operation being required. >>>>> But >>>>>> is >>>>>>>>>>> that >>>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't want a >>>> defaultBranch >>>>>>> they >>>>>>>>>>> can >>>>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as >>>>>>> easily. In >>>>>>>>>>>>>>>>>> fact I >>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API - a >>> user >>>>>> could >>>>>>>>>>> specify >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach the >>>> default >>>>>>> branch, >>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. That >>> seems >>>>> like >>>>>>> an >>>>>>>>>>>>>>>>>> improvement over the current branch() API, which allows >>>> for >>>>>> the >>>>>>>>>>> more >>>>>>>>>>>>>>>>>> subtle >>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting dropped. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has to be >>>> well >>>>>>>>>>>>>>>>>> documented, but >>>>>>>>>>>>>>>>>> it would be fairly easily for the >>> InternalTopologyBuilder >>>>> to >>>>>>> track >>>>>>>>>>>>>>>>>> dangling >>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear >>>>> error >>>>>>>>>>> before it >>>>>>>>>>>>>>>>>> becomes an issue. Especially now that there is a >>> "build >>>>>> step" >>>>>>>>>>> where >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> topology is actually wired up, when >>>> StreamsBuilder.build() >>>>> is >>>>>>>>>>> called. >>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I agree >>> that >>>>> it's >>>>>>>>>>>>>>>>>> critical to >>>>>>>>>>>>>>>>>> allow users to do other operations on the input stream. >>>>> With >>>>>>> the >>>>>>>>>>>>>>> fluent >>>>>>>>>>>>>>>>>> solution, it ought to work the same way all other >>>>> operations >>>>>>> do - >>>>>>>>>>> if >>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>> want to process off the original KStream multiple >>> times, >>>>> you >>>>>>> just >>>>>>>>>>>>>>>>>> need the >>>>>>>>>>>>>>>>>> stream as a variable so you can call as many operations >>>> on >>>>> it >>>>>>> as >>>>>>>>>>> you >>>>>>>>>>>>>>>>>> desire. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thoughts? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev < >>>>>>> iponoma...@mail.ru >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hello Paul, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not always need >>>> the >>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal operation we >>> don't >>>>>> know >>>>>>>>>>> when to >>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we >>> can >>>> do >>>>>>>>>>> something >>>>>>>>>>>>>>>>>>> more with the original branch after branching. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I understand your point that the need of special >>> object >>>>>>>>>>> construction >>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods. But >>> here >>>> we >>>>>>> have a >>>>>>>>>>>>>>>>>>> special case: we build the switch to split the flow, >>> so >>>> I >>>>>>> think >>>>>>>>>>> this >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> still idiomatic. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: >>>>>>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this API, but I >>>> find >>>>>> the >>>>>>>>>>>>>>>>>>>> onTopOff() >>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it contrasts the >>>>> fluency >>>>>>> of >>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like to just call >>> a >>>>>>> method on >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> stream >>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch cases >>> are >>>>>>> defined >>>>>>>>>>>>>>>>>>>> fluently. >>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very >>>> nice >>>>>>> and the >>>>>>>>>>>>>>>>>>>> right >>>>>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around how we >>>>> specify >>>>>>> the >>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>> stream. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Like: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> stream.branch() >>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, this::handle1) >>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, this::handle2) >>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDefault); >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or >>>>>> KStreamBrancher >>>>>>> or >>>>>>>>>>>>>>>>>>> something, >>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and terminated by >>>>>>>>>>> defaultBranch() >>>>>>>>>>>>>>>>>>>> (which >>>>>>>>>>>>>>>>>>>> returns void). This is obviously incompatible with >>> the >>>>>>> current >>>>>>>>>>>>>>>>>>>> API, so >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a different >>>> name, >>>>>> but >>>>>>> that >>>>>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it >>>> something >>>>>> like >>>>>>>>>>>>>>>>>>>> branched() >>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your KIP? It >>>> seems >>>>>>> like it >>>>>>>>>>>>>>>>>>>> does to >>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching while also >>>>>> allowing >>>>>>> you >>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams >>>> if >>>>>>> desired. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev >>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thank you for your reply! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This is how I usually do it: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){ >>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...) >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){ >>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey()... >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> ...... >>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>() >>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, this::handleFirstCase) >>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, this::handleSecondCase) >>>>>>>>>>>>>>>>>>>>> .onTopOf(....) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: >>>>>>>>>>>>>>>>>>>>>> Hi Ivan, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher >>> takes a >>>>>>> Consumer >>>>>>>>>>> as a >>>>>>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the example in >>>> the >>>>>> KIP >>>>>>>>>>> shows >>>>>>>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node >>>>>>> (KafkaStreams#to() >>>>>>>>>>>>>>>>>>>>>> in this >>>>>>>>>>>>>>>>>>>>>> case). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would we >>> handle >>>>> the >>>>>>> case >>>>>>>>>>>>>>>>>>>>>> where the >>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to continue >>>>>> processing >>>>>>> and >>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the branched >>>> stream >>>>>>>>>>> immediately? >>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if we had >>>>>> something >>>>>>> like >>>>>>>>>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches = >>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, >>>>>>>>>>>>>>>>>>>>>> predicate2); >>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. >>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck < >>>>>>> bbej...@gmail.com >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> All, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- >>> 418. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Here's the original message: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. >>> Please >>>>>> take >>>>>>> a >>>>>>>>>>> look >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> KIP-418: >>>>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream >>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 >>>>>>>>>>>>>>>>>>>>>>> PR#6164: >>> https://github.com/apache/kafka/pull/6164 >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >
signature.asc
Description: OpenPGP digital signature