Ivan, no worries about getting side tracked. Glad to have you back!
The DSL improved further in the meantime and we already have a `Named` config object to name operators. It seems reasonable to me to build on this. Furthermore, John did a writeup about "DSL design principles" that we want to follow: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar -- might be worth to checkout. -Matthias On 4/17/20 4:30 PM, Ivan Ponomarev wrote: > Hi everyone! > > Let me revive the discussion of this KIP. > > I'm very sorry for stopping my participation in the discussion in June > 2019. My project work was very intensive then and it didn't leave me > spare time. But I think I must finish this, because we invested > substantial effort into this discussion and I'm not feel entitled to > propose other things before this one is finalized. > > During these months I proceeded with writing and reviewing Kafka > Streams-related code. Every time I needed branching, Spring-Kafka's > KafkaStreamBrancher class of my invention (the original idea for this > KIP) worked for me -- that's another reason why I gave up pushing the > KIP forward. When I was coming across the problem with the scope of > branches, I worked around it this way: > > AtomicReference<KStream<...>> result = new AtomicReference<>(); > new KafkaStreamBrancher<....>() > .branch(....) > .defaultBranch(result::set) > .onTopOf(someStream); > result.get()... > > > And yes, of course I don't feel very happy with this approach. > > I think that Matthias came up with a bright solution in his post from > May, 24th 2019. Let me quote it: > > KStream#split() -> KBranchedStream > // branch is not easily accessible in current scope > KBranchedStream#branch(Predicate, Consumer<KStream>) > -> KBranchedStream > // assign a name to the branch and > // return the sub-stream to the current scope later > // > // can be simple as `#branch(p, s->s, "name")` > // or also complex as `#branch(p, s->s.filter(...), "name")` > KBranchedStream#branch(Predicate, Function<KStream,KStream>, String) > -> KBranchedStream > // default branch is not easily accessible > // return map of all named sub-stream into current scope > KBranchedStream#default(Cosumer<KStream>) > -> Map<String,KStream> > // assign custom name to default-branch > // return map of all named sub-stream into current scope > KBranchedStream#default(Function<KStream,KStream>, String) > -> Map<String,KStream> > // assign a default name for default > // return map of all named sub-stream into current scope > KBranchedStream#defaultBranch(Function<KStream,KStream>) > -> Map<String,KStream> > // return map of all names sub-stream into current scope > KBranchedStream#noDefaultBranch() > -> Map<String,KStream> > > I believe this would satisfy everyone. Optional names seems to be a good > idea: when you don't need to have the branches in the same scope, you > just don't use names and you don't risk making your code brittle. Or, > you might want to add names just for debugging purposes. Or, finally, > you might use the returned Map to have the named branches in the > original scope. > > There also was an input from John Roesler on June 4th, 2019, who > suggested using Named class. I can't comment on this. The idea seems > reasonable, but in this matter I'd rather trust people who are more > familiar with Streams API design principles than me. > > Regards, > > Ivan > > > > 08.10.2019 1:38, Matthias J. Sax пишет: >> I am moving this KIP into "inactive status". Feel free to resume the KIP >> at any point. >> >> If anybody else is interested in picking up this KIP, feel free to do so. >> >> >> >> -Matthias >> >> On 7/11/19 4:00 PM, Matthias J. Sax wrote: >>> Ivan, >>> >>> did you see my last reply? What do you think about my proposal to mix >>> both approaches and try to get best-of-both worlds? >>> >>> >>> -Matthias >>> >>> On 6/11/19 3:56 PM, Matthias J. Sax wrote: >>>> Thanks for the input John! >>>> >>>>> under your suggestion, it seems that the name is required >>>> >>>> If you want to get the `KStream` as part of the `Map` back using a >>>> `Function`, yes. If you follow the "embedded chaining" pattern using a >>>> `Consumer`, no. >>>> >>>> Allowing for a default name via `split()` can of course be done. >>>> Similarly, using `Named` instead of `String` is possible. >>>> >>>> I wanted to sketch out a high level proposal to merge both patterns >>>> only. Your suggestions to align the new API with the existing API make >>>> totally sense. >>>> >>>> >>>> >>>> One follow up question: Would `Named` be optional or required in >>>> `split()` and `branch()`? It's unclear from your example. >>>> >>>> If both are mandatory, what do we gain by it? The returned `Map` only >>>> contains the corresponding branches, so why should we prefix all of >>>> them? If only `Named` is mandatory in `branch()`, but optional in >>>> `split()`, the same question raises? >>>> >>>> Requiring `Named` in `split()` seems only to make sense, if `Named` is >>>> optional in `branch()` and we generate `-X` suffix using a counter for >>>> different branch name. However, this might lead to the problem of >>>> changing names if branches are added/removed. Also, how would the names >>>> be generated if `Consumer` is mixed in (ie, not all branches are >>>> returned in the `Map`). >>>> >>>> If `Named` is optional for both, it could happen that a user misses to >>>> specify a name for a branch what would lead to runtime issues. >>>> >>>> >>>> Hence, I am actually in favor to not allow a default name but keep >>>> `split()` without parameter and make `Named` in `branch()` required >>>> if a >>>> `Function` is used. This makes it explicit to the user that >>>> specifying a >>>> name is required if a `Function` is used. >>>> >>>> >>>> >>>> About >>>> >>>>> KBranchedStream#branch(BranchConfig) >>>> >>>> I don't think that the branching predicate is a configuration and hence >>>> would not include it in a configuration object. >>>> >>>>> withChain(...); >>>> >>>> Similar, `withChain()` (that would only take a `Consumer`?) does not >>>> seem to be a configuration. We can also not prevent a user to call >>>> `withName()` in combination of `withChain()` what does not make sense >>>> IMHO. We could of course throw an RTE but not have a compile time check >>>> seems less appealing. Also, it could happen that neither `withChain()` >>>> not `withName()` is called and the branch is missing in the returned >>>> `Map` what lead to runtime issues, too. >>>> >>>> Hence, I don't think that we should add `BranchConfig`. A config object >>>> is helpful if each configuration can be set independently of all >>>> others, >>>> but this seems not to be the case here. If we add new configuration >>>> later, we can also just move forward by deprecating the methods that >>>> accept `Named` and add new methods that accepted `BranchConfig` (that >>>> would of course implement `Named`). >>>> >>>> >>>> Thoughts? >>>> >>>> >>>> @Ivan, what do you think about the general idea to blend the two main >>>> approaches of returning a `Map` plus an "embedded chaining"? >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 6/4/19 10:33 AM, John Roesler wrote: >>>>> Thanks for the idea, Matthias, it does seem like this would satisfy >>>>> everyone. Returning the map from the terminal operations also solves >>>>> the problem of merging/joining the branched streams, if we want to add >>>>> support for the compliment later on. >>>>> >>>>> Under your suggestion, it seems that the name is required. Otherwise, >>>>> we wouldn't have keys for the map to return. I this this is actually >>>>> not too bad, since experience has taught us that, although names for >>>>> operations are not required to define stream processing logic, it does >>>>> significantly improve the operational experience when you can map the >>>>> topology, logs, metrics, etc. back to the source code. Since you >>>>> wouldn't (have to) reference the name to chain extra processing onto >>>>> the branch (thanks to the second argument), you can avoid the >>>>> "unchecked name" problem that Ivan pointed out. >>>>> >>>>> In the current implementation of Branch, you can name the branch >>>>> operator itself, and then all the branches get index-suffixed names >>>>> built from the branch operator name. I guess under this proposal, we >>>>> could naturally append the branch name to the branching operator name, >>>>> like this: >>>>> >>>>> stream.split(Named.withName("mysplit")) //creates node "mysplit" >>>>> .branch(..., ..., "abranch") // creates node >>>>> "mysplit-abranch" >>>>> .defaultBranch(...) // creates node "mysplit-default" >>>>> >>>>> It does make me wonder about the DSL syntax itself, though. >>>>> >>>>> We don't have a defined grammar, so there's plenty of room to debate >>>>> the "best" syntax in the context of each operation, but in general, >>>>> the KStream DSL operators follow this pattern: >>>>> >>>>> operator(function, config_object?) OR operator(config_object) >>>>> >>>>> where config_object is often just Named in the "function" variant. >>>>> Even when the config_object isn't a Named, but some other config >>>>> class, that config class _always_ implements NamedOperation. >>>>> >>>>> Here, we're introducing a totally different pattern: >>>>> >>>>> operator(function, function, string) >>>>> >>>>> where the string is the name. >>>>> My first question is whether the name should instead be specified with >>>>> the NamedOperation interface. >>>>> >>>>> My second question is whether we should just roll all these arguments >>>>> up into a config object like: >>>>> >>>>> KBranchedStream#branch(BranchConfig) >>>>> >>>>> interface BranchConfig extends NamedOperation { >>>>> withPredicate(...); >>>>> withChain(...); >>>>> withName(...); >>>>> } >>>>> >>>>> Although I guess we'd like to call BranchConfig something more like >>>>> "Branched", even if I don't particularly like that pattern. >>>>> >>>>> This makes the source code a little noisier, but it also makes us more >>>>> future-proof, as we can deal with a wide range of alternatives purely >>>>> in the config interface, and never have to deal with adding overloads >>>>> to the KBranchedStream if/when we decide we want the name to be >>>>> optional, or the KStream->KStream to be optional. >>>>> >>>>> WDYT? >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis >>>>> <michael.droga...@confluent.io> wrote: >>>>>> >>>>>> Matthias: I think that's pretty reasonable from my point of view. >>>>>> Good >>>>>> suggestion. >>>>>> >>>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax >>>>>> <matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Interesting discussion. >>>>>>> >>>>>>> I am wondering, if we cannot unify the advantage of both approaches: >>>>>>> >>>>>>> >>>>>>> >>>>>>> KStream#split() -> KBranchedStream >>>>>>> >>>>>>> // branch is not easily accessible in current scope >>>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>) >>>>>>> -> KBranchedStream >>>>>>> >>>>>>> // assign a name to the branch and >>>>>>> // return the sub-stream to the current scope later >>>>>>> // >>>>>>> // can be simple as `#branch(p, s->s, "name")` >>>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")` >>>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String) >>>>>>> -> KBranchedStream >>>>>>> >>>>>>> // default branch is not easily accessible >>>>>>> // return map of all named sub-stream into current scope >>>>>>> KBranchedStream#default(Cosumer<KStream>) >>>>>>> -> Map<String,KStream> >>>>>>> >>>>>>> // assign custom name to default-branch >>>>>>> // return map of all named sub-stream into current scope >>>>>>> KBranchedStream#default(Function<KStream,KStream>, String) >>>>>>> -> Map<String,KStream> >>>>>>> >>>>>>> // assign a default name for default >>>>>>> // return map of all named sub-stream into current scope >>>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>) >>>>>>> -> Map<String,KStream> >>>>>>> >>>>>>> // return map of all names sub-stream into current scope >>>>>>> KBranchedStream#noDefaultBranch() >>>>>>> -> Map<String,KStream> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hence, for each sub-stream, the user can pick to add a name and >>>>>>> return >>>>>>> the branch "result" to the calling scope or not. The >>>>>>> implementation can >>>>>>> also check at runtime that all returned names are unique. The >>>>>>> returned >>>>>>> Map can be empty and it's also optional to use the Map. >>>>>>> >>>>>>> To me, it seems like a good way to get best of both worlds. >>>>>>> >>>>>>> Thoughts? >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 5/6/19 5:15 PM, John Roesler wrote: >>>>>>>> Ivan, >>>>>>>> >>>>>>>> That's a very good point about the "start" operator in the >>>>>>>> dynamic case. >>>>>>>> I had no problem with "split()"; I was just questioning the >>>>>>>> necessity. >>>>>>>> Since you've provided a proof of necessity, I'm in favor of the >>>>>>>> "split()" start operator. Thanks! >>>>>>>> >>>>>>>> Separately, I'm interested to see where the present discussion >>>>>>>> leads. >>>>>>>> I've written enough Javascript code in my life to be suspicious of >>>>>>>> nested closures. You have a good point about using method >>>>>>>> references (or >>>>>>>> indeed function literals also work). It should be validating >>>>>>>> that this >>>>>>>> was also the JS community's first approach to flattening the >>>>>>>> logic when >>>>>>>> their nested closure situation got out of hand. Unfortunately, it's >>>>>>>> replacing nesting with redirection, both of which disrupt code >>>>>>>> readability (but in different ways for different reasons). In other >>>>>>>> words, I agree that function references is *the* first-order >>>>>>>> solution if >>>>>>>> the nested code does indeed become a problem. >>>>>>>> >>>>>>>> However, the history of JS also tells us that function >>>>>>>> references aren't >>>>>>>> the end of the story either, and you can see that by observing that >>>>>>>> there have been two follow-on eras, as they continue trying to >>>>>>>> cope with >>>>>>>> the consequences of living in such a callback-heavy language. >>>>>>>> First, you >>>>>>>> have Futures/Promises, which essentially let you convert nested >>>>>>>> code to >>>>>>>> method-chained code (Observables/FP is a popular variation on >>>>>>>> this). >>>>>>>> Most lately, you have async/await, which is an effort to apply >>>>>>>> language >>>>>>>> (not just API) syntax to the problem, and offer the "flattest" >>>>>>>> possible >>>>>>>> programming style to solve the problem (because you get back to >>>>>>>> just one >>>>>>>> code block per functional unit). >>>>>>>> >>>>>>>> Stream-processing is a different domain, and Java+KStreams is >>>>>>>> nowhere >>>>>>>> near as callback heavy as JS, so I don't think we have to take >>>>>>>> the JS >>>>>>>> story for granted, but then again, I think we can derive some >>>>>>>> valuable >>>>>>>> lessons by looking sideways to adjacent domains. I'm just >>>>>>>> bringing this >>>>>>>> up to inspire further/deeper discussion. At the same time, just >>>>>>>> like JS, >>>>>>>> we can afford to take an iterative approach to the problem. >>>>>>>> >>>>>>>> Separately again, I'm interested in the post-branch merge (and >>>>>>>> I'd also >>>>>>>> add join) problem that Paul brought up. We can clearly punt on >>>>>>>> it, by >>>>>>>> terminating the nested branches with sink operators. But is >>>>>>>> there a DSL >>>>>>>> way to do it? >>>>>>>> >>>>>>>> Thanks again for your driving this, >>>>>>>> -John >>>>>>>> >>>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com >>>>>>>> <mailto:pgwha...@gmail.com>> wrote: >>>>>>>> >>>>>>>> Ivan, I’ll definitely forfeit my point on the clumsiness of >>>>>>>> the >>>>>>>> branch(predicate, consumer) solution, I don’t see any real >>>>>>>> drawbacks >>>>>>>> for the dynamic case. >>>>>>>> >>>>>>>> IMO the one trade off to consider at this point is the scope >>>>>>>> question. I don’t know if I totally agree that “we rarely >>>>>>>> need them >>>>>>>> in the same scope” since merging the branches back together >>>>>>>> later >>>>>>>> seems like a perfectly plausible use case that can be a lot >>>>>>>> nicer >>>>>>>> when the branched streams are in the same scope. That being >>>>>>>> said, >>>>>>>> for the reasons Ivan listed, I think it is overall the better >>>>>>>> solution - working around the scope thing is easy enough if >>>>>>>> you need >>>>>>>> to. >>>>>>>> >>>>>>>> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev >>>>>>>> <iponoma...@mail.ru.invalid> wrote: >>>>>>>> > >>>>>>>> > Hello everyone, thank you all for joining the discussion! >>>>>>>> > >>>>>>>> > Well, I don't think the idea of named branches, be it a >>>>>>>> LinkedHashMap (no other Map will do, because order of >>>>>>>> definition >>>>>>>> matters) or `branch` method taking name and Consumer has more >>>>>>>> advantages than drawbacks. >>>>>>>> > >>>>>>>> > In my opinion, the only real positive outcome from Michael's >>>>>>>> proposal is that all the returned branches are in the same >>>>>>>> scope. >>>>>>>> But 1) we rarely need them in the same scope 2) there is a >>>>>>>> workaround for the scope problem, described in the KIP. >>>>>>>> > >>>>>>>> > 'Inlining the complex logic' is not a problem, because we >>>>>>>> can use >>>>>>>> method references instead of lambdas. In real world >>>>>>>> scenarios you >>>>>>>> tend to split the complex logic to methods anyway, so the >>>>>>>> code is >>>>>>>> going to be clean. >>>>>>>> > >>>>>>>> > The drawbacks are strong. The cohesion between predicates >>>>>>>> and >>>>>>>> handlers is lost. We have to define predicates in one >>>>>>>> place, and >>>>>>>> handlers in another. This opens the door for bugs: >>>>>>>> > >>>>>>>> > - what if we forget to define a handler for a name? or a >>>>>>>> name for >>>>>>>> a handler? >>>>>>>> > - what if we misspell a name? >>>>>>>> > - what if we copy-paste and duplicate a name? >>>>>>>> > >>>>>>>> > What Michael propose would have been totally OK if we had >>>>>>>> been >>>>>>>> writing the API in Lua, Ruby or Python. In those languages the >>>>>>>> "dynamic naming" approach would have looked most concise and >>>>>>>> beautiful. But in Java we expect all the problems related to >>>>>>>> identifiers to be eliminated in compile time. >>>>>>>> > >>>>>>>> > Do we have to invent duck-typing for the Java API? >>>>>>>> > >>>>>>>> > And if we do, what advantage are we supposed to get >>>>>>>> besides having >>>>>>>> all the branches in the same scope? Michael, maybe I'm >>>>>>>> missing your >>>>>>>> point? >>>>>>>> > >>>>>>>> > --- >>>>>>>> > >>>>>>>> > Earlier in this discussion John Roesler also proposed to do >>>>>>>> without "start branching" operator, and later Paul >>>>>>>> mentioned that in >>>>>>>> the case when we have to add a dynamic number of branches, the >>>>>>>> current KIP is 'clumsier' compared to Michael's 'Map' >>>>>>>> solution. Let >>>>>>>> me address both comments here. >>>>>>>> > >>>>>>>> > 1) "Start branching" operator (I think that *split* is a >>>>>>>> good name >>>>>>>> for it indeed) is critical when we need to do a dynamic >>>>>>>> branching, >>>>>>>> see example below. >>>>>>>> > >>>>>>>> > 2) No, dynamic branching in current KIP is not clumsy at >>>>>>>> all. >>>>>>>> Imagine a real-world scenario when you need one branch per >>>>>>>> enum >>>>>>>> value (say, RecordType). You can have something like this: >>>>>>>> > >>>>>>>> > /*John:if we had to start with stream.branch(...) here, >>>>>>>> it would >>>>>>>> have been much messier.*/ >>>>>>>> > KBranchedStream branched = stream.split(); >>>>>>>> > >>>>>>>> > /*Not clumsy at all :-)*/ >>>>>>>> > for (RecordType recordType : RecordType.values()) >>>>>>>> > branched = branched.branch((k, v) -> >>>>>>>> v.getRecType() == >>>>>>>> recordType, >>>>>>>> > recordType::processRecords); >>>>>>>> > >>>>>>>> > Regards, >>>>>>>> > >>>>>>>> > Ivan >>>>>>>> > >>>>>>>> > >>>>>>>> > 02.05.2019 14:40, Matthias J. Sax пишет: >>>>>>>> >> I also agree with Michael's observation about the core >>>>>>>> problem of >>>>>>>> >> current `branch()` implementation. >>>>>>>> >> >>>>>>>> >> However, I also don't like to pass in a clumsy Map >>>>>>>> object. My >>>>>>>> thinking >>>>>>>> >> was more aligned with Paul's proposal to just add a name >>>>>>>> to each >>>>>>>> >> `branch()` statement and return a `Map<String,KStream>`. >>>>>>>> >> >>>>>>>> >> It makes the code easier to read, and also make the >>>>>>>> order of >>>>>>>> >> `Predicates` (that is essential) easier to grasp. >>>>>>>> >> >>>>>>>> >>>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>>>>> >>>>>> .branch("branchOne", Predicate<K, V>) >>>>>>>> >>>>>> .branch( "branchTwo", Predicate<K, V>) >>>>>>>> >>>>>> .defaultBranch("defaultBranch"); >>>>>>>> >> An open question is the case for which no >>>>>>>> defaultBranch() should >>>>>>> be >>>>>>>> >> specified. Atm, `split()` and `branch()` would return >>>>>>>> `BranchedKStream` >>>>>>>> >> and the call to `defaultBranch()` that returns the `Map` is >>>>>>> mandatory >>>>>>>> >> (what is not the case atm). Or is this actually not a real >>>>>>> problem, >>>>>>>> >> because users can just ignore the branch returned by >>>>>>>> `defaultBranch()` >>>>>>>> >> in the result `Map` ? >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> About "inlining": So far, it seems to be a matter of >>>>>>>> personal >>>>>>>> >> preference. I can see arguments for both, but no "killer >>>>>>>> argument" yet >>>>>>>> >> that clearly make the case for one or the other. >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> -Matthias >>>>>>>> >> >>>>>>>> >>> On 5/1/19 6:26 PM, Paul Whalen wrote: >>>>>>>> >>> Perhaps inlining is the wrong terminology. It doesn’t >>>>>>>> require >>>>>>>> that a lambda with the full downstream topology be defined >>>>>>>> inline - >>>>>>>> it can be a method reference as with Ivan’s original >>>>>>>> suggestion. >>>>>>>> The advantage of putting the predicate and its downstream >>>>>>>> logic >>>>>>>> (Consumer) together in branch() is that they are required >>>>>>>> to be near >>>>>>>> to each other. >>>>>>>> >>> >>>>>>>> >>> Ultimately the downstream code has to live somewhere, >>>>>>>> and deep >>>>>>>> branch trees will be hard to read regardless. >>>>>>>> >>> >>>>>>>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis >>>>>>>> <michael.droga...@confluent.io >>>>>>>> <mailto:michael.droga...@confluent.io>> wrote: >>>>>>>> >>>> >>>>>>>> >>>> I'm less enthusiastic about inlining the branch logic >>>>>>>> with its >>>>>>>> downstream >>>>>>>> >>>> functionality. Programs that have deep branch trees will >>>>>>>> quickly become >>>>>>>> >>>> harder to read as a single unit. >>>>>>>> >>>> >>>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen >>>>>>>> <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, >>>>>>>> I think >>>>>>>> that sets a >>>>>>>> >>>>> great framework for the discussion. >>>>>>>> >>>>> >>>>>>>> >>>>> Regarding the SortedMap solution, my understanding is >>>>>>>> that the >>>>>>>> current >>>>>>>> >>>>> proposal in the KIP is what is in my PR which >>>>>>>> (pending naming >>>>>>>> decisions) is >>>>>>>> >>>>> roughly this: >>>>>>>> >>>>> >>>>>>>> >>>>> stream.split() >>>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>>>>> >>>>> .defaultBranch(Consumer<KStream<K, V>>); >>>>>>>> >>>>> >>>>>>>> >>>>> Obviously some ordering is necessary, since branching >>>>>>>> as a >>>>>>>> construct >>>>>>>> >>>>> doesn't work without it, but this solution seems like it >>>>>>>> provides as much >>>>>>>> >>>>> associativity as the SortedMap solution, because each >>>>>>>> branch() >>>>>>>> call >>>>>>>> >>>>> directly associates the "conditional" with the "code >>>>>>>> block." >>>>>>>> The value it >>>>>>>> >>>>> provides over the KIP solution is the accessing of >>>>>>>> streams in >>>>>>>> the same >>>>>>>> >>>>> scope. >>>>>>>> >>>>> >>>>>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap >>>>>>>> solution >>>>>>>> in the sense >>>>>>>> >>>>> that it is slightly clumsier to add a dynamic number of >>>>>>>> branches, but it is >>>>>>>> >>>>> certainly possible. It seems to me like the API >>>>>>>> should favor >>>>>>>> the "static" >>>>>>>> >>>>> case anyway, and should make it simple and readable to >>>>>>>> fluently declare and >>>>>>>> >>>>> access your branches in-line. It also makes it >>>>>>>> impossible to >>>>>>>> ignore a >>>>>>>> >>>>> branch, and it is possible to build an (almost) >>>>>>>> identical >>>>>>>> SortedMap >>>>>>>> >>>>> solution on top of it. >>>>>>>> >>>>> >>>>>>>> >>>>> I could also see a middle ground where instead of a raw >>>>>>>> SortedMap being >>>>>>>> >>>>> taken in, branch() takes a name and not a Consumer. >>>>>>>> Something >>>>>>>> like this: >>>>>>>> >>>>> >>>>>>>> >>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>>>>> >>>>> .branch("branchOne", Predicate<K, V>) >>>>>>>> >>>>> .branch( "branchTwo", Predicate<K, V>) >>>>>>>> >>>>> .defaultBranch("defaultBranch", >>>>>>>> Consumer<KStream<K, V>>); >>>>>>>> >>>>> >>>>>>>> >>>>> Pros for that solution: >>>>>>>> >>>>> - accessing branched KStreams in same scope >>>>>>>> >>>>> - no double brace initialization, hopefully slightly >>>>>>>> more >>>>>>>> readable than >>>>>>>> >>>>> SortedMap >>>>>>>> >>>>> >>>>>>>> >>>>> Cons >>>>>>>> >>>>> - downstream branch logic cannot be specified inline >>>>>>>> which >>>>>>>> makes it harder >>>>>>>> >>>>> to read top to bottom (like existing API and >>>>>>>> SortedMap, but >>>>>>>> unlike the KIP) >>>>>>>> >>>>> - you can forget to "handle" one of the branched >>>>>>>> streams (like >>>>>>>> existing >>>>>>>> >>>>> API and SortedMap, but unlike the KIP) >>>>>>>> >>>>> >>>>>>>> >>>>> (KBranchedStreams could even work *both* ways but >>>>>>>> perhaps >>>>>>>> that's overdoing >>>>>>>> >>>>> it). >>>>>>>> >>>>> >>>>>>>> >>>>> Overall I'm curious how important it is to be able to >>>>>>>> easily >>>>>>>> access the >>>>>>>> >>>>> branched KStream in the same scope as the original. >>>>>>>> It's >>>>>>>> possible that it >>>>>>>> >>>>> doesn't need to be handled directly by the API, but >>>>>>>> instead >>>>>>>> left up to the >>>>>>>> >>>>> user. I'm sort of in the middle on it. >>>>>>>> >>>>> >>>>>>>> >>>>> Paul >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman >>>>>>>> <sop...@confluent.io <mailto:sop...@confluent.io>> >>>>>>>> >>>>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>>> I'd like to +1 what Michael said about the issues >>>>>>>> with the >>>>>>>> existing >>>>>>>> >>>>> branch >>>>>>>> >>>>>> method, I agree with what he's outlined and I think >>>>>>>> we should >>>>>>>> proceed by >>>>>>>> >>>>>> trying to alleviate these problems. Specifically it >>>>>>>> seems >>>>>>>> important to be >>>>>>>> >>>>>> able to cleanly access the individual branches (eg >>>>>>>> by mapping >>>>>>>> >>>>>> name->stream), which I thought was the original >>>>>>>> intention of >>>>>>>> this KIP. >>>>>>>> >>>>>> >>>>>>>> >>>>>> That said, I don't think we should so easily give in >>>>>>>> to the >>>>>>>> double brace >>>>>>>> >>>>>> anti-pattern or force ours users into it if at all >>>>>>>> possible to >>>>>>>> >>>>> avoid...just >>>>>>>> >>>>>> my two cents. >>>>>>>> >>>>>> >>>>>>>> >>>>>> Cheers, >>>>>>>> >>>>>> Sophie >>>>>>>> >>>>>> >>>>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < >>>>>>>> >>>>>> michael.droga...@confluent.io >>>>>>>> <mailto:michael.droga...@confluent.io>> wrote: >>>>>>>> >>>>>> >>>>>>>> >>>>>>> I’d like to propose a different way of thinking >>>>>>>> about this. >>>>>>>> To me, >>>>>>>> >>>>> there >>>>>>>> >>>>>>> are three problems with the existing branch signature: >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> 1. If you use it the way most people do, Java >>>>>>>> raises unsafe >>>>>>> type >>>>>>>> >>>>>> warnings. >>>>>>>> >>>>>>> 2. The way in which you use the stream branches is >>>>>>>> positionally coupled >>>>>>>> >>>>>> to >>>>>>>> >>>>>>> the ordering of the conditionals. >>>>>>>> >>>>>>> 3. It is brittle to extend existing branch calls with >>>>>>>> additional code >>>>>>>> >>>>>>> paths. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Using associative constructs instead of relying on >>>>>>>> ordered >>>>>>>> constructs >>>>>>>> >>>>>> would >>>>>>>> >>>>>>> be a stronger approach. Consider a signature that >>>>>>>> instead >>>>>>>> looks like >>>>>>>> >>>>>> this: >>>>>>>> >>>>>>> Map<String, KStream<K,V>> >>>>>>>> KStream#branch(SortedMap<String, >>>>>>>> Predicate<? >>>>>>>> >>>>>>> super K,? super V>>); >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Branches are given names in a map, and as a result, >>>>>>>> the API >>>>>>>> returns a >>>>>>>> >>>>>>> mapping of names to streams. The ordering of the >>>>>>> conditionals is >>>>>>>> >>>>>> maintained >>>>>>>> >>>>>>> because it’s a sorted map. Insert order determines >>>>>>>> the order >>>>>>> of >>>>>>>> >>>>>> evaluation. >>>>>>>> >>>>>>> This solves problem 1 because there are no more >>>>>>>> varargs. It >>>>>>>> solves >>>>>>>> >>>>>> problem >>>>>>>> >>>>>>> 2 because you no longer lean on ordering to access the >>>>>>>> branch you’re >>>>>>>> >>>>>>> interested in. It solves problem 3 because you can >>>>>>>> introduce >>>>>>>> another >>>>>>>> >>>>>>> conditional by simply attaching another name to the >>>>>>>> structure, rather >>>>>>>> >>>>>> than >>>>>>>> >>>>>>> messing with the existing indices. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> One of the drawbacks is that creating the map >>>>>>>> inline is >>>>>>>> historically >>>>>>>> >>>>>>> awkward in Java. I know it’s an anti-pattern to use >>>>>>>> voluminously, but >>>>>>>> >>>>>>> double brace initialization would clean up the >>>>>>>> aesthetics. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler >>>>>>>> <j...@confluent.io <mailto:j...@confluent.io>> >>>>>>>> >>>>> wrote: >>>>>>>> >>>>>>>> Hi Ivan, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks for the update. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> FWIW, I agree with Matthias that the current "start >>>>>>> branching" >>>>>>>> >>>>> operator >>>>>>>> >>>>>>> is >>>>>>>> >>>>>>>> confusing when named the same way as the actual >>>>>>>> branches. >>>>>>>> "Split" >>>>>>>> >>>>> seems >>>>>>>> >>>>>>>> like a good name. Alternatively, we can do without >>>>>>>> a "start >>>>>>>> >>>>> branching" >>>>>>>> >>>>>>>> operator at all, and just do: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> stream >>>>>>>> >>>>>>>> .branch(Predicate) >>>>>>>> >>>>>>>> .branch(Predicate) >>>>>>>> >>>>>>>> .defaultBranch(); >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Tentatively, I think that this branching operation >>>>>>>> should be >>>>>>>> >>>>> terminal. >>>>>>>> >>>>>>> That >>>>>>>> >>>>>>>> way, we don't create ambiguity about how to use >>>>>>>> it. That >>>>>>>> is, `branch` >>>>>>>> >>>>>>>> should return `KBranchedStream`, while >>>>>>>> `defaultBranch` is >>>>>>>> `void`, to >>>>>>>> >>>>>>>> enforce that it comes last, and that there is only >>>>>>>> one >>>>>>>> definition of >>>>>>>> >>>>>> the >>>>>>>> >>>>>>>> default branch. Potentially, we should log a >>>>>>>> warning if >>>>>>>> there's no >>>>>>>> >>>>>>> default, >>>>>>>> >>>>>>>> and additionally log a warning (or throw an >>>>>>>> exception) if a >>>>>>>> record >>>>>>>> >>>>>> falls >>>>>>>> >>>>>>>> though with no default. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> -John >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < >>>>>>>> >>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> Thanks for updating the KIP and your answers. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>>> this is to make the name similar to String#split >>>>>>>> >>>>>>>>>>> that also returns an array, right? >>>>>>>> >>>>>>>>> The intend was to avoid name duplication. The >>>>>>>> return type >>>>>>>> should >>>>>>>> >>>>>> _not_ >>>>>>>> >>>>>>>>> be an array. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> The current proposal is >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> stream.branch() >>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>> >>>>>>>>> .defaultBranch(); >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> IMHO, this reads a little odd, because the first >>>>>>>> `branch()` does >>>>>>>> >>>>> not >>>>>>>> >>>>>>>>> take any parameters and has different semantics >>>>>>>> than the >>>>>>> later >>>>>>>> >>>>>>>>> `branch()` calls. Note, that from the code >>>>>>>> snippet above, >>>>>>> it's >>>>>>>> >>>>> hidden >>>>>>>> >>>>>>>>> that the first call is `KStream#branch()` while >>>>>>>> the others >>>>>>> are >>>>>>>> >>>>>>>>> `KBranchedStream#branch()` what makes reading the >>>>>>>> code >>>>>>> harder. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> >>>>>>>> `branch()`, >>>>>>>> I though >>>>>>>> >>>>>> it >>>>>>>> >>>>>>>>> might be better to also rename `KStream#branch()` >>>>>>>> to avoid >>>>>>> the >>>>>>>> >>>>> naming >>>>>>>> >>>>>>>>> overlap that seems to be confusing. The following >>>>>>>> reads >>>>>>> much >>>>>>>> >>>>> cleaner >>>>>>>> >>>>>> to >>>>>>>> >>>>>>>> me: >>>>>>>> >>>>>>>>> stream.split() >>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>> >>>>>>>>> .defaultBranch(); >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Maybe there is a better alternative to `split()` >>>>>>>> though to >>>>>>>> avoid >>>>>>>> >>>>> the >>>>>>>> >>>>>>>>> naming overlap. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>>> 'default' is, however, a reserved word, so >>>>>>>> unfortunately >>>>>>> we >>>>>>>> >>>>> cannot >>>>>>>> >>>>>>> have >>>>>>>> >>>>>>>>> a method with such name :-) >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still >>>>>>>> come up >>>>>>>> with a >>>>>>>> >>>>> short >>>>>>>> >>>>>>>> name? >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to >>>>>>>> the KIP >>>>>>>> with all >>>>>>>> >>>>> it's >>>>>>>> >>>>>>>>> methods? It will be part of public API and should be >>>>>>>> contained in >>>>>>>> >>>>> the >>>>>>>> >>>>>>>>> KIP. For example, it's unclear atm, what the >>>>>>>> return type of >>>>>>>> >>>>>>>>> `defaultBranch()` is. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> You did not comment on the idea to add a >>>>>>>> `KBranchedStream#get(int >>>>>>>> >>>>>>> index) >>>>>>>> >>>>>>>>> -> KStream` method to get the individually >>>>>>>> branched-KStreams. Would >>>>>>>> >>>>>> be >>>>>>>> >>>>>>>>> nice to get your feedback about it. It seems you >>>>>>>> suggest >>>>>>>> that users >>>>>>>> >>>>>>>>> would need to write custom utility code >>>>>>>> otherwise, to >>>>>>>> access them. >>>>>>>> >>>>> We >>>>>>>> >>>>>>>>> should discuss the pros and cons of both >>>>>>>> approaches. It >>>>>>> feels >>>>>>>> >>>>>>>>> "incomplete" to me atm, if the API has no >>>>>>>> built-in support >>>>>>>> to get >>>>>>>> >>>>> the >>>>>>>> >>>>>>>>> branched-KStreams directly. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> -Matthias >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: >>>>>>>> >>>>>>>>>> Hi all! >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> I have updated the KIP-418 according to the new >>>>>>>> vision. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Matthias, thanks for your comment! >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split() >>>>>>>> >>>>>>>>>> I can see your point: this is to make the name >>>>>>>> similar to >>>>>>>> >>>>>>> String#split >>>>>>>> >>>>>>>>>> that also returns an array, right? But is it >>>>>>>> worth the >>>>>>>> loss of >>>>>>>> >>>>>>>> backwards >>>>>>>> >>>>>>>>>> compatibility? We can have overloaded branch() >>>>>>>> as well >>>>>>>> without >>>>>>>> >>>>>>>> affecting >>>>>>>> >>>>>>>>>> the existing code. Maybe the old array-based >>>>>>>> `branch` >>>>>>> method >>>>>>>> >>>>> should >>>>>>>> >>>>>>> be >>>>>>>> >>>>>>>>>> deprecated, but this is a subject for discussion. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() -> >>>>>>>> >>>>> BranchingKStream#branch(), >>>>>>>> >>>>>>>>>> KBranchedStream#defaultBranch() -> >>>>>>> BranchingKStream#default() >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. >>>>>>>> 'default' >>>>>>> is, >>>>>>>> >>>>>>> however, a >>>>>>>> >>>>>>>>>> reserved word, so unfortunately we cannot have a >>>>>>>> method >>>>>>>> with such >>>>>>>> >>>>>>> name >>>>>>>> >>>>>>>>> :-) >>>>>>>> >>>>>>>>>>> defaultBranch() does take an `Predicate` as >>>>>>>> argument, >>>>>>> but I >>>>>>>> >>>>> think >>>>>>>> >>>>>>> that >>>>>>>> >>>>>>>>>> is not required? >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste >>>>>>>> error or >>>>>>>> something. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Dear colleagues, >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> please revise the new version of the KIP and >>>>>>>> Paul's PR >>>>>>>> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512) >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Any new suggestions/objections? >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Ivan >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет: >>>>>>>> >>>>>>>>>>> Thanks for driving the discussion of this KIP. >>>>>>>> It seems >>>>>>> that >>>>>>>> >>>>>>> everybody >>>>>>>> >>>>>>>>>>> agrees that the current branch() method using >>>>>>>> arrays is >>>>>>> not >>>>>>>> >>>>>> optimal. >>>>>>>> >>>>>>>>>>> I had a quick look into the PR and I like the >>>>>>>> overall >>>>>>>> proposal. >>>>>>>> >>>>>>> There >>>>>>>> >>>>>>>>>>> are some minor things we need to consider. I would >>>>>>>> recommend the >>>>>>>> >>>>>>>>>>> following renaming: >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> KStream#branch() -> #split() >>>>>>>> >>>>>>>>>>> KBranchedStream#addBranch() -> >>>>>>>> BranchingKStream#branch() >>>>>>>> >>>>>>>>>>> KBranchedStream#defaultBranch() -> >>>>>>>> BranchingKStream#default() >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter >>>>>>>> method >>>>>>> names. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an >>>>>>>> `Predicate` as >>>>>>>> >>>>>>>> argument, >>>>>>>> >>>>>>>>>>> but I think that is not required? >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was >>>>>>>> recently >>>>>>>> accepted and >>>>>>>> >>>>>> is >>>>>>>> >>>>>>>>>>> currently implemented: >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>> >>>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>>> >>>>>>>> >>>>>>>>>>> Ie, we should add overloads that accepted a >>>>>>>> `Named` >>>>>>>> parameter. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> For the issue that the created `KStream` object >>>>>>>> are in >>>>>>>> different >>>>>>>> >>>>>>>> scopes: >>>>>>>> >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int >>>>>>>> index)` method >>>>>>>> >>>>>>> that >>>>>>>> >>>>>>>>>>> returns the corresponding "branched" result >>>>>>>> `KStream` >>>>>>>> object? >>>>>>>> >>>>>> Maybe, >>>>>>>> >>>>>>>> the >>>>>>>> >>>>>>>>>>> second argument of `addBranch()` should not be a >>>>>>>> >>>>>> `Consumer<KStream>` >>>>>>>> >>>>>>>> but >>>>>>>> >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could >>>>>>>> return >>>>>>>> whatever >>>>>>>> >>>>>> the >>>>>>>> >>>>>>>>>>> `Function` returns? >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP >>>>>>>> with the >>>>>>>> current >>>>>>>> >>>>>>>>>>> proposal. That makes it easier to review. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: >>>>>>>> >>>>>>>>>>>> Ivan, >>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I >>>>>>>> think it >>>>>>>> makes sense >>>>>>>> >>>>>> for >>>>>>>> >>>>>>>> you >>>>>>>> >>>>>>>>> to >>>>>>>> >>>>>>>>>>>> revise the KIP and continue the discussion. >>>>>>>> Obviously >>>>>>>> we'll >>>>>>>> >>>>> need >>>>>>>> >>>>>>>> some >>>>>>>> >>>>>>>>>>>> buy-in from committers that have actual >>>>>>>> binding votes on >>>>>>>> >>>>> whether >>>>>>>> >>>>>>> the >>>>>>>> >>>>>>>>> KIP >>>>>>>> >>>>>>>>>>>> could be adopted. It would be great to hear >>>>>>>> if they >>>>>>>> think this >>>>>>>> >>>>>> is >>>>>>>> >>>>>>> a >>>>>>>> >>>>>>>>> good >>>>>>>> >>>>>>>>>>>> idea overall. I'm not sure if that happens >>>>>>>> just by >>>>>>>> starting a >>>>>>>> >>>>>>> vote, >>>>>>>> >>>>>>>>> or if >>>>>>>> >>>>>>>>>>>> there is generally some indication of interest >>>>>>> beforehand. >>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> That being said, I'll continue the discussion >>>>>>>> a bit: >>>>>>>> assuming >>>>>>>> >>>>> we >>>>>>>> >>>>>> do >>>>>>>> >>>>>>>>> move >>>>>>>> >>>>>>>>>>>> forward the solution of "stream.branch() returns >>>>>>>> >>>>>> KBranchedStream", >>>>>>>> >>>>>>> do >>>>>>>> >>>>>>>>> we >>>>>>>> >>>>>>>>>>>> deprecate "stream.branch(...) returns >>>>>>>> KStream[]"? I >>>>>>> would >>>>>>>> >>>>> favor >>>>>>>> >>>>>>>>>>>> deprecating, since having two mutually >>>>>>>> exclusive APIs >>>>>>> that >>>>>>>> >>>>>>> accomplish >>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>>>>>> same thing is confusing, especially when >>>>>>>> they're fairly >>>>>>>> similar >>>>>>>> >>>>>>>>> anyway. We >>>>>>>> >>>>>>>>>>>> just need to be sure we're not making something >>>>>>>> >>>>>>> impossible/difficult >>>>>>>> >>>>>>>>> that >>>>>>>> >>>>>>>>>>>> is currently possible/easy. >>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> Regarding my PR - I think the general >>>>>>>> structure would >>>>>>> work, >>>>>>>> >>>>> it's >>>>>>>> >>>>>>>> just a >>>>>>>> >>>>>>>>>>>> little sloppy overall in terms of naming and >>>>>>>> clarity. In >>>>>>>> >>>>>>> particular, >>>>>>>> >>>>>>>>>>>> passing in the "predicates" and "children" >>>>>>>> lists which >>>>>>> get >>>>>>>> >>>>>> modified >>>>>>>> >>>>>>>> in >>>>>>>> >>>>>>>>>>>> KBranchedStream but read from all the way >>>>>>>> KStreamLazyBranch is >>>>>>>> >>>>> a >>>>>>>> >>>>>>> bit >>>>>>>> >>>>>>>>>>>> complicated to follow. >>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>> >>>>>>>>>>>> Paul >>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev < >>>>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>>>> >>>>>>>>> wrote: >>>>>>>> >>>>>>>>>>>>> Hi Paul! >>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully >>>>>>>> convinced: your >>>>>>>> >>>>>>>> proposal >>>>>>>> >>>>>>>>>>>>> looks better and should work. We just have to >>>>>>>> document >>>>>>> the >>>>>>>> >>>>>> crucial >>>>>>>> >>>>>>>>> fact >>>>>>>> >>>>>>>>>>>>> that KStream consumers are invoked as they're >>>>>>>> added. >>>>>>>> And then >>>>>>>> >>>>>> it's >>>>>>>> >>>>>>>> all >>>>>>>> >>>>>>>>>>>>> going to be very nice. >>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the >>>>>>>> KIP and >>>>>>>> resume the >>>>>>>> >>>>>>>>>>>>> discussion here, right? >>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not >>>>>>>> be even a >>>>>>>> >>>>> starting >>>>>>>> >>>>>>>> point >>>>>>>> >>>>>>>>> if >>>>>>>> >>>>>>>>>>>>> we go in this direction'? To me it looks like >>>>>>>> a good >>>>>>>> starting >>>>>>>> >>>>>>> point. >>>>>>>> >>>>>>>>> But >>>>>>>> >>>>>>>>>>>>> as a novice in this project I might miss some >>>>>>>> important >>>>>>>> >>>>> details. >>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>> Ivan >>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет: >>>>>>>> >>>>>>>>>>>>>> Ivan, >>>>>>>> >>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the >>>>>>>> >>>>> stream.branch() >>>>>>>> >>>>>>>>> solution >>>>>>>> >>>>>>>>>>>>> supports this. The couponIssuer::set* >>>>>>>> consumers will be >>>>>>>> >>>>> invoked >>>>>>>> >>>>>> as >>>>>>>> >>>>>>>>> they’re >>>>>>>> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So >>>>>>>> the user >>>>>>>> still >>>>>>>> >>>>>> ought >>>>>>>> >>>>>>> to >>>>>>>> >>>>>>>>> be >>>>>>>> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward >>>>>>>> and >>>>>>>> depend on >>>>>>>> >>>>> the >>>>>>>> >>>>>>>>> branched >>>>>>>> >>>>>>>>>>>>> streams having been set. >>>>>>>> >>>>>>>>>>>>>> The issue I mean to point out is that it is >>>>>>>> hard to >>>>>>>> access >>>>>>>> >>>>> the >>>>>>>> >>>>>>>>> branched >>>>>>>> >>>>>>>>>>>>> streams in the same scope as the original >>>>>>>> stream (that >>>>>>>> is, not >>>>>>>> >>>>>>>> inside >>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>>>>>>> couponIssuer), which is a problem with both >>>>>>>> proposed >>>>>>>> >>>>> solutions. >>>>>>>> >>>>>> It >>>>>>>> >>>>>>>>> can be >>>>>>>> >>>>>>>>>>>>> worked around though. >>>>>>>> >>>>>>>>>>>>>> [Also, great to hear additional interest in >>>>>>>> 401, I’m >>>>>>>> excited >>>>>>>> >>>>> to >>>>>>>> >>>>>>>> hear >>>>>>>> >>>>>>>>>>>>> your thoughts!] >>>>>>>> >>>>>>>>>>>>>> Paul >>>>>>>> >>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev < >>>>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>>>> >>>>>>>>> wrote: >>>>>>>> >>>>>>>>>>>>>>> Hi Paul! >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches >>>>>>>> to the >>>>>>>> >>>>>>>>>>>>> streamsBuilder.build() also looked great for >>>>>>>> me at >>>>>>> first >>>>>>>> >>>>> glance, >>>>>>>> >>>>>>> but >>>>>>>> >>>>>>>>> --- >>>>>>>> >>>>>>>>>>>>>>>> the newly branched streams are not >>>>>>>> available in the >>>>>>>> same >>>>>>>> >>>>>> scope >>>>>>>> >>>>>>> as >>>>>>>> >>>>>>>>> each >>>>>>>> >>>>>>>>>>>>> other. That is, if we wanted to merge them back >>>>>>> together >>>>>>>> >>>>> again >>>>>>>> >>>>>> I >>>>>>>> >>>>>>>>> don't see >>>>>>>> >>>>>>>>>>>>> a way to do that. >>>>>>>> >>>>>>>>>>>>>>> You just took the words right out of my >>>>>>>> mouth, I was >>>>>>>> just >>>>>>>> >>>>>> going >>>>>>>> >>>>>>> to >>>>>>>> >>>>>>>>>>>>> write in details about this issue. >>>>>>>> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. >>>>>>>> 101: say >>>>>>>> we need >>>>>>>> >>>>> to >>>>>>>> >>>>>>>>> identify >>>>>>>> >>>>>>>>>>>>> customers who have bought coffee and made a >>>>>>>> purchase >>>>>>>> in the >>>>>>>> >>>>>>>>> electronics >>>>>>>> >>>>>>>>>>>>> store to give them coupons. >>>>>>>> >>>>>>>>>>>>>>> This is the code I usually write under these >>>>>>>> circumstances >>>>>>>> >>>>>> using >>>>>>>> >>>>>>>> my >>>>>>>> >>>>>>>>>>>>> 'brancher' class: >>>>>>>> >>>>>>>>>>>>>>> @Setter >>>>>>>> >>>>>>>>>>>>>>> class CouponIssuer{ >>>>>>>> >>>>>>>>>>>>>>> private KStream<....> coffePurchases; >>>>>>>> >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases; >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){ >>>>>>>> >>>>>>>>>>>>>>> return >>>>>>>> >>>>>>>>> >>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever >>>>>>>> >>>>>>>>>>>>>>> /*In the real world the code here can be >>>>>>>> complex, so >>>>>>>> >>>>>>>>> creation of >>>>>>>> >>>>>>>>>>>>> a separate CouponIssuer class is fully >>>>>>>> justified, in >>>>>>>> order to >>>>>>>> >>>>>>>> separate >>>>>>>> >>>>>>>>>>>>> classes' responsibilities.*/ >>>>>>>> >>>>>>>>>>>>>>> } >>>>>>>> >>>>>>>>>>>>>>> } >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new >>>>>>>> CouponIssuer(); >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>() >>>>>>>> >>>>>>>>>>>>>>> .branch(predicate1, >>>>>>> couponIssuer::setCoffePurchases) >>>>>>>> >>>>>>>>>>>>>>> .branch(predicate2, >>>>>>>> >>>>>> couponIssuer::setElectronicsPurchases) >>>>>>>> >>>>>>>>>>>>>>> .onTopOf(transactionStream); >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to >>>>>>>> wire up >>>>>>>> everything >>>>>>>> >>>>>>>> later, >>>>>>>> >>>>>>>>>>>>> without the terminal operation!!!*/ >>>>>>>> >>>>>>>>>>>>>>> couponIssuer.coupons()... >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly >>>>>>>> initialize the >>>>>>>> >>>>>>>>> CouponIssuer >>>>>>>> >>>>>>>>>>>>> we need the terminal operation to be called >>>>>>>> before >>>>>>>> >>>>>>>>> streamsBuilder.build() >>>>>>>> >>>>>>>>>>>>> is called. >>>>>>>> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your >>>>>>>> KIP-401 is >>>>>>>> essentially >>>>>>>> >>>>>> the >>>>>>>> >>>>>>>>> next >>>>>>>> >>>>>>>>>>>>> KIP I was going to write here. I have some >>>>>>>> thoughts >>>>>>>> based on >>>>>>>> >>>>> my >>>>>>>> >>>>>>>>> experience, >>>>>>>> >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.] >>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> Ivan >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет: >>>>>>>> >>>>>>>>>>>>>>>> Ivan, >>>>>>>> >>>>>>>>>>>>>>>> I tried to make a very rough proof of >>>>>>>> concept of a >>>>>>>> fluent >>>>>>>> >>>>> API >>>>>>>> >>>>>>>> based >>>>>>>> >>>>>>>>>>>>> off of >>>>>>>> >>>>>>>>>>>>>>>> KStream here >>>>>>>> (https://github.com/apache/kafka/pull/6512), >>>>>>>> >>>>>> and >>>>>>>> >>>>>>> I >>>>>>>> >>>>>>>>> think >>>>>>>> >>>>>>>>>>>>> I >>>>>>>> >>>>>>>>>>>>>>>> succeeded at removing both cons. >>>>>>>> >>>>>>>>>>>>>>>> - Compatibility: I was incorrect >>>>>>>> earlier about >>>>>>>> >>>>>>> compatibility >>>>>>>> >>>>>>>>>>>>> issues, >>>>>>>> >>>>>>>>>>>>>>>> there aren't any direct ones. I was >>>>>>>> unaware >>>>>>>> that Java >>>>>>>> >>>>> is >>>>>>>> >>>>>>>> smart >>>>>>>> >>>>>>>>>>>>> enough to >>>>>>>> >>>>>>>>>>>>>>>> distinguish between a branch(varargs...) >>>>>>>> returning one >>>>>>>> >>>>>>> thing >>>>>>>> >>>>>>>>> and >>>>>>>> >>>>>>>>>>>>> branch() >>>>>>>> >>>>>>>>>>>>>>>> with no arguments returning another thing. >>>>>>>> >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't >>>>>>>> actually >>>>>>>> need >>>>>>>> >>>>> it. >>>>>>>> >>>>>>> We >>>>>>>> >>>>>>>>> can >>>>>>>> >>>>>>>>>>>>> just >>>>>>>> >>>>>>>>>>>>>>>> build up the branches in the >>>>>>>> KBranchedStream who >>>>>>>> shares >>>>>>>> >>>>>> its >>>>>>>> >>>>>>>>> state >>>>>>>> >>>>>>>>>>>>> with the >>>>>>>> >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do >>>>>>>> the >>>>>>>> branching. >>>>>>>> >>>>>>> It's >>>>>>>> >>>>>>>>> not >>>>>>>> >>>>>>>>>>>>> terribly >>>>>>>> >>>>>>>>>>>>>>>> pretty in its current form, but I think it >>>>>>>> demonstrates >>>>>>>> >>>>>> its >>>>>>>> >>>>>>>>>>>>> feasibility. >>>>>>>> >>>>>>>>>>>>>>>> To be clear, I don't think that pull >>>>>>>> request should >>>>>>> be >>>>>>>> >>>>> final >>>>>>>> >>>>>> or >>>>>>>> >>>>>>>>> even a >>>>>>>> >>>>>>>>>>>>>>>> starting point if we go in this direction, >>>>>>>> I just >>>>>>>> wanted to >>>>>>>> >>>>>> see >>>>>>>> >>>>>>>> how >>>>>>>> >>>>>>>>>>>>>>>> challenging it would be to get the API >>>>>>>> working. >>>>>>>> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the >>>>>>>> existing >>>>>>>> solution >>>>>>>> >>>>>>> could >>>>>>>> >>>>>>>> be >>>>>>>> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had >>>>>>>> originally >>>>>>>> >>>>> suggested >>>>>>>> >>>>>>>> was a >>>>>>>> >>>>>>>>>>>>>>>> possibility. The reason is that the newly >>>>>>>> branched >>>>>>>> streams >>>>>>>> >>>>>> are >>>>>>>> >>>>>>>> not >>>>>>>> >>>>>>>>>>>>>>>> available in the same scope as each >>>>>>>> other. That >>>>>>>> is, if we >>>>>>>> >>>>>>> wanted >>>>>>>> >>>>>>>>> to >>>>>>>> >>>>>>>>>>>>> merge >>>>>>>> >>>>>>>>>>>>>>>> them back together again I don't see a way >>>>>>>> to do >>>>>>>> that. The >>>>>>>> >>>>>> KIP >>>>>>>> >>>>>>>>>>>>> proposal >>>>>>>> >>>>>>>>>>>>>>>> has the same issue, though - all this >>>>>>>> means is that >>>>>>> for >>>>>>>> >>>>>> either >>>>>>>> >>>>>>>>>>>>> solution, >>>>>>>> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is >>>>>>>> not on the >>>>>>>> table. >>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>> >>>>>>>>>>>>>>>> Paul >>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan >>>>>>>> Ponomarev < >>>>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>> >>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>> >>>>>>>>>>>>>>>>> OK, let me summarize what we have >>>>>>>> discussed up to >>>>>>> this >>>>>>>> >>>>>> point. >>>>>>>> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed >>>>>>>> that >>>>>>>> branch API >>>>>>>> >>>>>>> needs >>>>>>>> >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP. >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it: >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed) >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() >>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..) >>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..) >>>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional >>>>>>>> >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... >>>>>>>> //onTopOf >>>>>>>> returns >>>>>>>> >>>>>> its >>>>>>>> >>>>>>>>> argument >>>>>>>> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) >>>>>>>> The code >>>>>>> won't >>>>>>>> >>>>> make >>>>>>>> >>>>>>>> sense >>>>>>>> >>>>>>>>>>>>> until >>>>>>>> >>>>>>>>>>>>>>>>> all the necessary ingredients are provided. >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a >>>>>>>> KafkaStreamsBrancher >>>>>>>> instance >>>>>>>> >>>>>>>>> contrasts the >>>>>>>> >>>>>>>>>>>>>>>>> fluency of other KStream methods. >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes) >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> stream >>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...) >>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...) >>>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or >>>>>>>> noDefault(). Both >>>>>>>> >>>>>>>>> defaultBranch(..) >>>>>>>> >>>>>>>>>>>>> and >>>>>>>> >>>>>>>>>>>>>>>>> noDefault() return void >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams >>>>>>>> interface >>>>>>> is >>>>>>>> >>>>>> defined. >>>>>>>> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods >>>>>>>> >>>>>>>> (defaultBranch(ks->) >>>>>>>> >>>>>>>>> and >>>>>>>> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very >>>>>>>> easy to >>>>>>>> miss the >>>>>>>> >>>>>> fact >>>>>>>> >>>>>>>>> that one >>>>>>>> >>>>>>>>>>>>>>>>> of the terminal methods should be called. >>>>>>>> If these >>>>>>>> methods >>>>>>>> >>>>>> are >>>>>>>> >>>>>>>> not >>>>>>>> >>>>>>>>>>>>>>>>> called, we can throw an exception in >>>>>>>> runtime. >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can >>>>>>>> we do >>>>>>> better? >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> Ivan >>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: >>>>>>>> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: >>>>>>>> >>>>>>>>>>>>>>>>>>> Paul, >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking >>>>>>>> about >>>>>>>> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default.. >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be >>>>>>>> implemented the >>>>>>>> >>>>>>> easy >>>>>>>> >>>>>>>>> way. >>>>>>>> >>>>>>>>>>>>>>>>>>> Maybe we all should think further. >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas. >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that >>>>>>> assumes >>>>>>>> >>>>> nothing >>>>>>>> >>>>>>>> will >>>>>>>> >>>>>>>>>>>>> reach >>>>>>>> >>>>>>>>>>>>>>>>>>>> the default branch, >>>>>>>> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case >>>>>>>> occurs. >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be >>>>>>>> the only >>>>>>> option >>>>>>>> >>>>>> besides >>>>>>>> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios >>>>>>>> when we >>>>>>>> want to >>>>>>>> >>>>>> just >>>>>>>> >>>>>>>>> silently >>>>>>>> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any >>>>>>>> predicate. 2) >>>>>>>> >>>>>>> Throwing >>>>>>>> >>>>>>>>> an >>>>>>>> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow >>>>>>>> processing >>>>>>>> looks >>>>>>>> >>>>>> like a >>>>>>>> >>>>>>>> bad >>>>>>>> >>>>>>>>>>>>> idea. >>>>>>>> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would >>>>>>>> prefer to >>>>>>>> emit a >>>>>>>> >>>>>>>> special >>>>>>>> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is >>>>>>>> exactly >>>>>>> where >>>>>>>> >>>>>>> `default` >>>>>>>> >>>>>>>>> can >>>>>>>> >>>>>>>>>>>>> be >>>>>>>> >>>>>>>>>>>>>>>>>>> used. >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>>>>> >>>>> InternalTopologyBuilder >>>>>>>> >>>>>>> to >>>>>>>> >>>>>>>>> track >>>>>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>>>>> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated >>>>>>>> and raise >>>>>>>> a clear >>>>>>>> >>>>>>> error >>>>>>>> >>>>>>>>>>>>> before it >>>>>>>> >>>>>>>>>>>>>>>>>>> becomes an issue. >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the >>>>>>>> program is >>>>>>>> >>>>> compiled >>>>>>>> >>>>>>> and >>>>>>>> >>>>>>>>> run? >>>>>>>> >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't >>>>>>>> compile if >>>>>>>> >>>>> used >>>>>>>> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a >>>>>>>> method chain >>>>>>>> >>>>>>>> starting >>>>>>>> >>>>>>>>>>>>> from >>>>>>>> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost >>>>>>>> difference >>>>>>>> between >>>>>>>> >>>>>>>> runtime >>>>>>>> >>>>>>>>> and >>>>>>>> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure >>>>>>>> uncovers >>>>>>>> >>>>> instantly >>>>>>>> >>>>>> on >>>>>>>> >>>>>>>>> unit >>>>>>>> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project >>>>>>>> than a >>>>>>>> compilation >>>>>>>> >>>>>>>> failure. >>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> Ivan >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: >>>>>>>> >>>>>>>>>>>>>>>>>>>> Ivan, >>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal >>>>>>>> operation being >>>>>>>> required. >>>>>>>> >>>>>>> But >>>>>>>> >>>>>>>> is >>>>>>>> >>>>>>>>>>>>> that >>>>>>>> >>>>>>>>>>>>>>>>>>>> really >>>>>>>> >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't >>>>>>>> want a >>>>>>>> >>>>>> defaultBranch >>>>>>>> >>>>>>>>> they >>>>>>>> >>>>>>>>>>>>> can >>>>>>>> >>>>>>>>>>>>>>>>>>>> call >>>>>>>> >>>>>>>>>>>>>>>>>>>> some other terminal method >>>>>>>> (noDefaultBranch()?) >>>>>>>> just as >>>>>>>> >>>>>>>>> easily. In >>>>>>>> >>>>>>>>>>>>>>>>>>>> fact I >>>>>>>> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a >>>>>>>> nicer API >>>>>>> - a >>>>>>>> >>>>> user >>>>>>>> >>>>>>>> could >>>>>>>> >>>>>>>>>>>>> specify >>>>>>>> >>>>>>>>>>>>>>>>> a >>>>>>>> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing >>>>>>>> will reach >>>>>>> the >>>>>>>> >>>>>> default >>>>>>>> >>>>>>>>> branch, >>>>>>>> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case >>>>>>>> occurs. >>>>>>> That >>>>>>>> >>>>> seems >>>>>>>> >>>>>>> like >>>>>>>> >>>>>>>>> an >>>>>>>> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() >>>>>>>> API, >>>>>>>> which allows >>>>>>>> >>>>>> for >>>>>>>> >>>>>>>> the >>>>>>>> >>>>>>>>>>>>> more >>>>>>>> >>>>>>>>>>>>>>>>>>>> subtle >>>>>>>> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting >>>>>>> dropped. >>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation >>>>>>>> certainly has >>>>>>>> to be >>>>>>>> >>>>>> well >>>>>>>> >>>>>>>>>>>>>>>>>>>> documented, but >>>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>>>>> >>>>> InternalTopologyBuilder >>>>>>>> >>>>>>> to >>>>>>>> >>>>>>>>> track >>>>>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>>>>> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated >>>>>>>> and raise >>>>>>>> a clear >>>>>>>> >>>>>>> error >>>>>>>> >>>>>>>>>>>>> before it >>>>>>>> >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that >>>>>>>> there is >>>>>>> a >>>>>>>> >>>>> "build >>>>>>>> >>>>>>>> step" >>>>>>>> >>>>>>>>>>>>> where >>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when >>>>>>>> >>>>>> StreamsBuilder.build() >>>>>>>> >>>>>>> is >>>>>>>> >>>>>>>>>>>>> called. >>>>>>>> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its >>>>>>>> argument, I >>>>>>> agree >>>>>>>> >>>>> that >>>>>>>> >>>>>>> it's >>>>>>>> >>>>>>>>>>>>>>>>>>>> critical to >>>>>>>> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on >>>>>>>> the input >>>>>>>> stream. >>>>>>>> >>>>>>> With >>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>>>>>>>>>>> fluent >>>>>>>> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same >>>>>>>> way all >>>>>>> other >>>>>>>> >>>>>>> operations >>>>>>>> >>>>>>>>> do - >>>>>>>> >>>>>>>>>>>>> if >>>>>>>> >>>>>>>>>>>>>>>>> you >>>>>>>> >>>>>>>>>>>>>>>>>>>> want to process off the original KStream >>>>>>> multiple >>>>>>>> >>>>> times, >>>>>>>> >>>>>>> you >>>>>>>> >>>>>>>>> just >>>>>>>> >>>>>>>>>>>>>>>>>>>> need the >>>>>>>> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call >>>>>>>> as many >>>>>>>> operations >>>>>>>> >>>>>> on >>>>>>>> >>>>>>> it >>>>>>>> >>>>>>>>> as >>>>>>>> >>>>>>>>>>>>> you >>>>>>>> >>>>>>>>>>>>>>>>>>>> desire. >>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>> Thoughts? >>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>> >>>>>>>>>>>>>>>>>>>> Paul >>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan >>>>>>>> Ponomarev < >>>>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul, >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we >>>>>>>> do not >>>>>>>> always need >>>>>>>> >>>>>> the >>>>>>>> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal >>>>>>> operation we >>>>>>>> >>>>> don't >>>>>>>> >>>>>>>> know >>>>>>>> >>>>>>>>>>>>> when to >>>>>>>> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'. >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its >>>>>>>> argument, >>>>>>>> so we >>>>>>>> >>>>> can >>>>>>>> >>>>>> do >>>>>>>> >>>>>>>>>>>>> something >>>>>>>> >>>>>>>>>>>>>>>>>>>>> more with the original branch after >>>>>>>> branching. >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of >>>>>>> special >>>>>>>> >>>>> object >>>>>>>> >>>>>>>>>>>>> construction >>>>>>>> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream >>>>>>>> methods. >>>>>>> But >>>>>>>> >>>>> here >>>>>>>> >>>>>> we >>>>>>>> >>>>>>>>> have a >>>>>>>> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to >>>>>>>> split the >>>>>>>> flow, >>>>>>>> >>>>> so >>>>>>>> >>>>>> I >>>>>>>> >>>>>>>>> think >>>>>>>> >>>>>>>>>>>>> this >>>>>>>> >>>>>>>>>>>>>>>>> is >>>>>>>> >>>>>>>>>>>>>>>>>>>>> still idiomatic. >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Ivan, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve >>>>>>>> this >>>>>>>> API, but I >>>>>>>> >>>>>> find >>>>>>>> >>>>>>>> the >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> onTopOff() >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it >>>>>>>> contrasts the >>>>>>>> >>>>>>> fluency >>>>>>>> >>>>>>>>> of >>>>>>>> >>>>>>>>>>>>> other >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd >>>>>>>> like to >>>>>>>> just call >>>>>>>> >>>>> a >>>>>>>> >>>>>>>>> method on >>>>>>>> >>>>>>>>>>>>> the >>>>>>>> >>>>>>>>>>>>>>>>>>>>> stream >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if >>>>>>>> the branch >>>>>>>> cases >>>>>>>> >>>>> are >>>>>>>> >>>>>>>>> defined >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> fluently. >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, >>>>>>>> handleCase) >>>>>>>> is very >>>>>>>> >>>>>> nice >>>>>>>> >>>>>>>>> and the >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> right >>>>>>>> >>>>>>>>>>>>>>>>>>>>> way >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped >>>>>>>> around >>>>>>>> how we >>>>>>>> >>>>>>> specify >>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>>>>>>> source >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream. >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Like: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch() >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>>>>> this::handle1) >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>>>>> this::handle2) >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> .defaultBranch(this::handleDefault); >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a >>>>>>>> KBranchedStreams or >>>>>>>> >>>>>>>> KStreamBrancher >>>>>>>> >>>>>>>>> or >>>>>>>> >>>>>>>>>>>>>>>>>>>>> something, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and >>>>>>>> terminated by >>>>>>>> >>>>>>>>>>>>> defaultBranch() >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> (which >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously >>>>>>>> incompatible with >>>>>>>> >>>>> the >>>>>>>> >>>>>>>>> current >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> API, so >>>>>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to >>>>>>>> have a >>>>>>>> different >>>>>>>> >>>>>> name, >>>>>>>> >>>>>>>> but >>>>>>>> >>>>>>>>> that >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> seems >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we >>>>>>>> could call it >>>>>>>> >>>>>> something >>>>>>>> >>>>>>>> like >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> branched() >>>>>>>> >>>>>>>>>>>>>>>>>>>>> or >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the >>>>>>>> old API. >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of >>>>>>>> your >>>>>>>> KIP? It >>>>>>>> >>>>>> seems >>>>>>>> >>>>>>>>> like it >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> does to >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line >>>>>>>> branching >>>>>>>> while also >>>>>>>> >>>>>>>> allowing >>>>>>>> >>>>>>>>> you >>>>>>>> >>>>>>>>>>>>> to >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of >>>>>>>> KBranchedStreams >>>>>>>> >>>>>> if >>>>>>>> >>>>>>>>> desired. >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Paul >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan >>>>>>>> Ponomarev >>>>>>>> >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply! >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void >>>>>>>> handleFirstCase(KStream<String, String> >>>>>>>> ks){ >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> ks.filter(....).mapValues(...) >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String, >>>>>>>> String> ks){ >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> ks.selectKey(...).groupByKey()... >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ...... >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, >>>>>>>> String>() >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>>>>>> this::handleFirstCase) >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>>>>>> this::handleSecondCase) >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....) >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the >>>>>>> KafkaStreamsBrancher >>>>>>>> >>>>> takes a >>>>>>>> >>>>>>>>> Consumer >>>>>>>> >>>>>>>>>>>>> as a >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> second >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, >>>>>>>> and the >>>>>>>> example in >>>>>>>> >>>>>> the >>>>>>>> >>>>>>>> KIP >>>>>>>> >>>>>>>>>>>>> shows >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> each >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a >>>>>>>> terminal node >>>>>>>> >>>>>>>>> (KafkaStreams#to() >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in this >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> case). >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but >>>>>>>> how would >>>>>>> we >>>>>>>> >>>>> handle >>>>>>>> >>>>>>> the >>>>>>>> >>>>>>>>> case >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> where the >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but >>>>>>>> wants to >>>>>>> continue >>>>>>>> >>>>>>>> processing >>>>>>>> >>>>>>>>> and >>>>>>>> >>>>>>>>>>>>> not >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on >>>>>>>> the >>>>>>> branched >>>>>>>> >>>>>> stream >>>>>>>> >>>>>>>>>>>>> immediately? >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic >>>>>>>> as is if >>>>>>>> we had >>>>>>>> >>>>>>>> something >>>>>>>> >>>>>>>>> like >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches = >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> predicate2); >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> branches[0].filter(....).mapValues(...).. >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> branches[1].selectKey(...).groupByKey()..... >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks! >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM >>>>>>>> Bill Bejeck >>>>>>> < >>>>>>>> >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the >>>>>>>> discussion for >>>>>>> KIP- >>>>>>>> >>>>> 418. >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message: >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about >>>>>>> KIP-418. >>>>>>>> >>>>> Please >>>>>>>> >>>>>>>> take >>>>>>>> >>>>>>>>> a >>>>>>>> >>>>>>>>>>>>> look >>>>>>>> >>>>>>>>>>>>>>>>> at >>>>>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would >>>>>>>> appreciate any >>>>>>>> feedback :) >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418: >>>>>>>> >>>>> >>>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream >>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: >>>>>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164: >>>>>>>> >>>>> https://github.com/apache/kafka/pull/6164 >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> > >>>>>>>> >>>>>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature