Thanks for updating the KIP! I also have some minor comment:
(1) We should rename `KBranchedStream` -> `BranchedKStream` (Most classed follow this naming pattern now, eg, CoGroupedKStream, TimeWindowedKStream etc -- there is just the "legacy" `KGroupedStream` and `KGroupedKTable` that we cannot rename without a breaking change... so we just keep them.) (2) Quote: > Both branch and defaultBranch operations also have overloaded parameterless > alternatives. I think `branch()` always needs to take a `Predicate` and assume you meant that `Branched` is optional. Can you maybe rephrase it accordingly as `branch()` would not be "parameterless". (3) Can you maybe add an overview in the "Public Interface" section) of newly added and deprecated methods/classes (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup#KIP-150-Kafka-StreamsCogroup-PublicInterfaces) (4) What is unclear from the KIP is the interaction of `withConsumer()` and the finally returned `Map<String, KStream>`. This related to John's 4th comment: > It seems like there are really two disjoint use cases: EITHER using chain and > the result map OR using just the sink. I don't think that using both `withChain()` and `withConsumer()` is the issue though, as the KIP clearly states that the result of `withChain()` will be given to the `Consumer`. The issue is really with the `Consumer` and the returned `Map` of `defautBranch()` and `noDefaultBranch()`. Maybe a reasonable implementation would be to not add the "branch" to the result map if `withConsumer` is used? As long as we clearly document it in the JavaDocs, this might be fine? (5) Reply to John's comments: > 3. Reading the KIP, I found ‘withConsumer’ confusing; I thought you were > talking about the kafka Consumer interface (which doesn’t make sense, of > course). I get that you were referring to the java Consumer interface, but we > should still probably to to avoid the ambiguity. Just throwing out a > suggestion, how about ‘withSink’? IMHO, `withSink` has the issue that it might be confused with a "sink node", ie., writing the KStream to a topic. Maybe `withJavaConsumer` would make it less ambiguous? -Matthias On 5/8/20 7:13 AM, John Roesler wrote: > Hi Ivan, > > It looks like you missed my reply on Apr 23rd. I think it’s close, but I had > a few last comments. > > Thanks, > John > > On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote: >> Hello everyone, >> >> will someone please take a look at the reworked KIP? >> >> I believe that now it follows design principles and takes into account >> all the arguments discussed here. >> >> >> Regards, >> >> Ivan >> >> >> 23.04.2020 2:45, Ivan Ponomarev пишет: >>> Hi, >>> >>> I have read the John's "DSL design principles" and have completely >>> rewritten the KIP, see >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream >>> >>> >>> >>> >>> This version includes all the previous discussion results and follows >>> the design principles, with one exception. >>> >>> The exception is >>> >>> branch(Predicate<K,V> predicate, Branched<K,V> branched) >>> >>> which formally violates 'no more than one parameter' rule, but I think >>> here it is justified. >>> >>> We must provide a predicate for a branch and don't need to provide one >>> for the default branch. Thus for both operations we may use a single >>> Branched parameter class, with an extra method parameter for `branch`. >>> >>> Since predicate is a natural, necessary part of a branch, no >>> 'proliferation of overloads, deprecations, etc.' is expected here as it >>> is said in the rationale for the 'single parameter rule'. >>> >>> WDYT, is this KIP mature enough to begin voting? >>> >>> Regards, >>> >>> Ivan >>> >>> 21.04.2020 2:09, Matthias J. Sax пишет: >>>> Ivan, >>>> >>>> no worries about getting side tracked. Glad to have you back! >>>> >>>> The DSL improved further in the meantime and we already have a `Named` >>>> config object to name operators. It seems reasonable to me to build on >>>> this. >>>> >>>> Furthermore, John did a writeup about "DSL design principles" that we >>>> want to follow: >>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar >>>> >>>> >>>> -- might be worth to checkout. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 4/17/20 4:30 PM, Ivan Ponomarev wrote: >>>>> Hi everyone! >>>>> >>>>> Let me revive the discussion of this KIP. >>>>> >>>>> I'm very sorry for stopping my participation in the discussion in June >>>>> 2019. My project work was very intensive then and it didn't leave me >>>>> spare time. But I think I must finish this, because we invested >>>>> substantial effort into this discussion and I'm not feel entitled to >>>>> propose other things before this one is finalized. >>>>> >>>>> During these months I proceeded with writing and reviewing Kafka >>>>> Streams-related code. Every time I needed branching, Spring-Kafka's >>>>> KafkaStreamBrancher class of my invention (the original idea for this >>>>> KIP) worked for me -- that's another reason why I gave up pushing the >>>>> KIP forward. When I was coming across the problem with the scope of >>>>> branches, I worked around it this way: >>>>> >>>>> AtomicReference<KStream<...>> result = new AtomicReference<>(); >>>>> new KafkaStreamBrancher<....>() >>>>> .branch(....) >>>>> .defaultBranch(result::set) >>>>> .onTopOf(someStream); >>>>> result.get()... >>>>> >>>>> >>>>> And yes, of course I don't feel very happy with this approach. >>>>> >>>>> I think that Matthias came up with a bright solution in his post from >>>>> May, 24th 2019. Let me quote it: >>>>> >>>>> KStream#split() -> KBranchedStream >>>>> // branch is not easily accessible in current scope >>>>> KBranchedStream#branch(Predicate, Consumer<KStream>) >>>>> -> KBranchedStream >>>>> // assign a name to the branch and >>>>> // return the sub-stream to the current scope later >>>>> // >>>>> // can be simple as `#branch(p, s->s, "name")` >>>>> // or also complex as `#branch(p, s->s.filter(...), "name")` >>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String) >>>>> -> KBranchedStream >>>>> // default branch is not easily accessible >>>>> // return map of all named sub-stream into current scope >>>>> KBranchedStream#default(Cosumer<KStream>) >>>>> -> Map<String,KStream> >>>>> // assign custom name to default-branch >>>>> // return map of all named sub-stream into current scope >>>>> KBranchedStream#default(Function<KStream,KStream>, String) >>>>> -> Map<String,KStream> >>>>> // assign a default name for default >>>>> // return map of all named sub-stream into current scope >>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>) >>>>> -> Map<String,KStream> >>>>> // return map of all names sub-stream into current scope >>>>> KBranchedStream#noDefaultBranch() >>>>> -> Map<String,KStream> >>>>> >>>>> I believe this would satisfy everyone. Optional names seems to be a good >>>>> idea: when you don't need to have the branches in the same scope, you >>>>> just don't use names and you don't risk making your code brittle. Or, >>>>> you might want to add names just for debugging purposes. Or, finally, >>>>> you might use the returned Map to have the named branches in the >>>>> original scope. >>>>> >>>>> There also was an input from John Roesler on June 4th, 2019, who >>>>> suggested using Named class. I can't comment on this. The idea seems >>>>> reasonable, but in this matter I'd rather trust people who are more >>>>> familiar with Streams API design principles than me. >>>>> >>>>> Regards, >>>>> >>>>> Ivan >>>>> >>>>> >>>>> >>>>> 08.10.2019 1:38, Matthias J. Sax пишет: >>>>>> I am moving this KIP into "inactive status". Feel free to resume the >>>>>> KIP >>>>>> at any point. >>>>>> >>>>>> If anybody else is interested in picking up this KIP, feel free to >>>>>> do so. >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 7/11/19 4:00 PM, Matthias J. Sax wrote: >>>>>>> Ivan, >>>>>>> >>>>>>> did you see my last reply? What do you think about my proposal to mix >>>>>>> both approaches and try to get best-of-both worlds? >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 6/11/19 3:56 PM, Matthias J. Sax wrote: >>>>>>>> Thanks for the input John! >>>>>>>> >>>>>>>>> under your suggestion, it seems that the name is required >>>>>>>> >>>>>>>> If you want to get the `KStream` as part of the `Map` back using a >>>>>>>> `Function`, yes. If you follow the "embedded chaining" pattern >>>>>>>> using a >>>>>>>> `Consumer`, no. >>>>>>>> >>>>>>>> Allowing for a default name via `split()` can of course be done. >>>>>>>> Similarly, using `Named` instead of `String` is possible. >>>>>>>> >>>>>>>> I wanted to sketch out a high level proposal to merge both patterns >>>>>>>> only. Your suggestions to align the new API with the existing API >>>>>>>> make >>>>>>>> totally sense. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> One follow up question: Would `Named` be optional or required in >>>>>>>> `split()` and `branch()`? It's unclear from your example. >>>>>>>> >>>>>>>> If both are mandatory, what do we gain by it? The returned `Map` only >>>>>>>> contains the corresponding branches, so why should we prefix all of >>>>>>>> them? If only `Named` is mandatory in `branch()`, but optional in >>>>>>>> `split()`, the same question raises? >>>>>>>> >>>>>>>> Requiring `Named` in `split()` seems only to make sense, if >>>>>>>> `Named` is >>>>>>>> optional in `branch()` and we generate `-X` suffix using a counter >>>>>>>> for >>>>>>>> different branch name. However, this might lead to the problem of >>>>>>>> changing names if branches are added/removed. Also, how would the >>>>>>>> names >>>>>>>> be generated if `Consumer` is mixed in (ie, not all branches are >>>>>>>> returned in the `Map`). >>>>>>>> >>>>>>>> If `Named` is optional for both, it could happen that a user >>>>>>>> misses to >>>>>>>> specify a name for a branch what would lead to runtime issues. >>>>>>>> >>>>>>>> >>>>>>>> Hence, I am actually in favor to not allow a default name but keep >>>>>>>> `split()` without parameter and make `Named` in `branch()` required >>>>>>>> if a >>>>>>>> `Function` is used. This makes it explicit to the user that >>>>>>>> specifying a >>>>>>>> name is required if a `Function` is used. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> About >>>>>>>> >>>>>>>>> KBranchedStream#branch(BranchConfig) >>>>>>>> >>>>>>>> I don't think that the branching predicate is a configuration and >>>>>>>> hence >>>>>>>> would not include it in a configuration object. >>>>>>>> >>>>>>>>> withChain(...); >>>>>>>> >>>>>>>> Similar, `withChain()` (that would only take a `Consumer`?) does not >>>>>>>> seem to be a configuration. We can also not prevent a user to call >>>>>>>> `withName()` in combination of `withChain()` what does not make sense >>>>>>>> IMHO. We could of course throw an RTE but not have a compile time >>>>>>>> check >>>>>>>> seems less appealing. Also, it could happen that neither >>>>>>>> `withChain()` >>>>>>>> not `withName()` is called and the branch is missing in the returned >>>>>>>> `Map` what lead to runtime issues, too. >>>>>>>> >>>>>>>> Hence, I don't think that we should add `BranchConfig`. A config >>>>>>>> object >>>>>>>> is helpful if each configuration can be set independently of all >>>>>>>> others, >>>>>>>> but this seems not to be the case here. If we add new configuration >>>>>>>> later, we can also just move forward by deprecating the methods that >>>>>>>> accept `Named` and add new methods that accepted `BranchConfig` (that >>>>>>>> would of course implement `Named`). >>>>>>>> >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> >>>>>>>> @Ivan, what do you think about the general idea to blend the two main >>>>>>>> approaches of returning a `Map` plus an "embedded chaining"? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 6/4/19 10:33 AM, John Roesler wrote: >>>>>>>>> Thanks for the idea, Matthias, it does seem like this would satisfy >>>>>>>>> everyone. Returning the map from the terminal operations also solves >>>>>>>>> the problem of merging/joining the branched streams, if we want >>>>>>>>> to add >>>>>>>>> support for the compliment later on. >>>>>>>>> >>>>>>>>> Under your suggestion, it seems that the name is required. >>>>>>>>> Otherwise, >>>>>>>>> we wouldn't have keys for the map to return. I this this is actually >>>>>>>>> not too bad, since experience has taught us that, although names for >>>>>>>>> operations are not required to define stream processing logic, it >>>>>>>>> does >>>>>>>>> significantly improve the operational experience when you can map >>>>>>>>> the >>>>>>>>> topology, logs, metrics, etc. back to the source code. Since you >>>>>>>>> wouldn't (have to) reference the name to chain extra processing onto >>>>>>>>> the branch (thanks to the second argument), you can avoid the >>>>>>>>> "unchecked name" problem that Ivan pointed out. >>>>>>>>> >>>>>>>>> In the current implementation of Branch, you can name the branch >>>>>>>>> operator itself, and then all the branches get index-suffixed names >>>>>>>>> built from the branch operator name. I guess under this proposal, we >>>>>>>>> could naturally append the branch name to the branching operator >>>>>>>>> name, >>>>>>>>> like this: >>>>>>>>> >>>>>>>>> stream.split(Named.withName("mysplit")) //creates node >>>>>>>>> "mysplit" >>>>>>>>> .branch(..., ..., "abranch") // creates node >>>>>>>>> "mysplit-abranch" >>>>>>>>> .defaultBranch(...) // creates node >>>>>>>>> "mysplit-default" >>>>>>>>> >>>>>>>>> It does make me wonder about the DSL syntax itself, though. >>>>>>>>> >>>>>>>>> We don't have a defined grammar, so there's plenty of room to debate >>>>>>>>> the "best" syntax in the context of each operation, but in general, >>>>>>>>> the KStream DSL operators follow this pattern: >>>>>>>>> >>>>>>>>> operator(function, config_object?) OR operator(config_object) >>>>>>>>> >>>>>>>>> where config_object is often just Named in the "function" variant. >>>>>>>>> Even when the config_object isn't a Named, but some other config >>>>>>>>> class, that config class _always_ implements NamedOperation. >>>>>>>>> >>>>>>>>> Here, we're introducing a totally different pattern: >>>>>>>>> >>>>>>>>> operator(function, function, string) >>>>>>>>> >>>>>>>>> where the string is the name. >>>>>>>>> My first question is whether the name should instead be specified >>>>>>>>> with >>>>>>>>> the NamedOperation interface. >>>>>>>>> >>>>>>>>> My second question is whether we should just roll all these >>>>>>>>> arguments >>>>>>>>> up into a config object like: >>>>>>>>> >>>>>>>>> KBranchedStream#branch(BranchConfig) >>>>>>>>> >>>>>>>>> interface BranchConfig extends NamedOperation { >>>>>>>>> withPredicate(...); >>>>>>>>> withChain(...); >>>>>>>>> withName(...); >>>>>>>>> } >>>>>>>>> >>>>>>>>> Although I guess we'd like to call BranchConfig something more like >>>>>>>>> "Branched", even if I don't particularly like that pattern. >>>>>>>>> >>>>>>>>> This makes the source code a little noisier, but it also makes us >>>>>>>>> more >>>>>>>>> future-proof, as we can deal with a wide range of alternatives >>>>>>>>> purely >>>>>>>>> in the config interface, and never have to deal with adding >>>>>>>>> overloads >>>>>>>>> to the KBranchedStream if/when we decide we want the name to be >>>>>>>>> optional, or the KStream->KStream to be optional. >>>>>>>>> >>>>>>>>> WDYT? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> -John >>>>>>>>> >>>>>>>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis >>>>>>>>> <michael.droga...@confluent.io> wrote: >>>>>>>>>> >>>>>>>>>> Matthias: I think that's pretty reasonable from my point of view. >>>>>>>>>> Good >>>>>>>>>> suggestion. >>>>>>>>>> >>>>>>>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax >>>>>>>>>> <matth...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Interesting discussion. >>>>>>>>>>> >>>>>>>>>>> I am wondering, if we cannot unify the advantage of both >>>>>>>>>>> approaches: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> KStream#split() -> KBranchedStream >>>>>>>>>>> >>>>>>>>>>> // branch is not easily accessible in current scope >>>>>>>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>) >>>>>>>>>>> -> KBranchedStream >>>>>>>>>>> >>>>>>>>>>> // assign a name to the branch and >>>>>>>>>>> // return the sub-stream to the current scope later >>>>>>>>>>> // >>>>>>>>>>> // can be simple as `#branch(p, s->s, "name")` >>>>>>>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")` >>>>>>>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, >>>>>>>>>>> String) >>>>>>>>>>> -> KBranchedStream >>>>>>>>>>> >>>>>>>>>>> // default branch is not easily accessible >>>>>>>>>>> // return map of all named sub-stream into current scope >>>>>>>>>>> KBranchedStream#default(Cosumer<KStream>) >>>>>>>>>>> -> Map<String,KStream> >>>>>>>>>>> >>>>>>>>>>> // assign custom name to default-branch >>>>>>>>>>> // return map of all named sub-stream into current scope >>>>>>>>>>> KBranchedStream#default(Function<KStream,KStream>, String) >>>>>>>>>>> -> Map<String,KStream> >>>>>>>>>>> >>>>>>>>>>> // assign a default name for default >>>>>>>>>>> // return map of all named sub-stream into current scope >>>>>>>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>) >>>>>>>>>>> -> Map<String,KStream> >>>>>>>>>>> >>>>>>>>>>> // return map of all names sub-stream into current scope >>>>>>>>>>> KBranchedStream#noDefaultBranch() >>>>>>>>>>> -> Map<String,KStream> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hence, for each sub-stream, the user can pick to add a name and >>>>>>>>>>> return >>>>>>>>>>> the branch "result" to the calling scope or not. The >>>>>>>>>>> implementation can >>>>>>>>>>> also check at runtime that all returned names are unique. The >>>>>>>>>>> returned >>>>>>>>>>> Map can be empty and it's also optional to use the Map. >>>>>>>>>>> >>>>>>>>>>> To me, it seems like a good way to get best of both worlds. >>>>>>>>>>> >>>>>>>>>>> Thoughts? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 5/6/19 5:15 PM, John Roesler wrote: >>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>> That's a very good point about the "start" operator in the >>>>>>>>>>>> dynamic case. >>>>>>>>>>>> I had no problem with "split()"; I was just questioning the >>>>>>>>>>>> necessity. >>>>>>>>>>>> Since you've provided a proof of necessity, I'm in favor of the >>>>>>>>>>>> "split()" start operator. Thanks! >>>>>>>>>>>> >>>>>>>>>>>> Separately, I'm interested to see where the present discussion >>>>>>>>>>>> leads. >>>>>>>>>>>> I've written enough Javascript code in my life to be >>>>>>>>>>>> suspicious of >>>>>>>>>>>> nested closures. You have a good point about using method >>>>>>>>>>>> references (or >>>>>>>>>>>> indeed function literals also work). It should be validating >>>>>>>>>>>> that this >>>>>>>>>>>> was also the JS community's first approach to flattening the >>>>>>>>>>>> logic when >>>>>>>>>>>> their nested closure situation got out of hand. Unfortunately, >>>>>>>>>>>> it's >>>>>>>>>>>> replacing nesting with redirection, both of which disrupt code >>>>>>>>>>>> readability (but in different ways for different reasons). In >>>>>>>>>>>> other >>>>>>>>>>>> words, I agree that function references is *the* first-order >>>>>>>>>>>> solution if >>>>>>>>>>>> the nested code does indeed become a problem. >>>>>>>>>>>> >>>>>>>>>>>> However, the history of JS also tells us that function >>>>>>>>>>>> references aren't >>>>>>>>>>>> the end of the story either, and you can see that by observing >>>>>>>>>>>> that >>>>>>>>>>>> there have been two follow-on eras, as they continue trying to >>>>>>>>>>>> cope with >>>>>>>>>>>> the consequences of living in such a callback-heavy language. >>>>>>>>>>>> First, you >>>>>>>>>>>> have Futures/Promises, which essentially let you convert nested >>>>>>>>>>>> code to >>>>>>>>>>>> method-chained code (Observables/FP is a popular variation on >>>>>>>>>>>> this). >>>>>>>>>>>> Most lately, you have async/await, which is an effort to apply >>>>>>>>>>>> language >>>>>>>>>>>> (not just API) syntax to the problem, and offer the "flattest" >>>>>>>>>>>> possible >>>>>>>>>>>> programming style to solve the problem (because you get back to >>>>>>>>>>>> just one >>>>>>>>>>>> code block per functional unit). >>>>>>>>>>>> >>>>>>>>>>>> Stream-processing is a different domain, and Java+KStreams is >>>>>>>>>>>> nowhere >>>>>>>>>>>> near as callback heavy as JS, so I don't think we have to take >>>>>>>>>>>> the JS >>>>>>>>>>>> story for granted, but then again, I think we can derive some >>>>>>>>>>>> valuable >>>>>>>>>>>> lessons by looking sideways to adjacent domains. I'm just >>>>>>>>>>>> bringing this >>>>>>>>>>>> up to inspire further/deeper discussion. At the same time, just >>>>>>>>>>>> like JS, >>>>>>>>>>>> we can afford to take an iterative approach to the problem. >>>>>>>>>>>> >>>>>>>>>>>> Separately again, I'm interested in the post-branch merge (and >>>>>>>>>>>> I'd also >>>>>>>>>>>> add join) problem that Paul brought up. We can clearly punt on >>>>>>>>>>>> it, by >>>>>>>>>>>> terminating the nested branches with sink operators. But is >>>>>>>>>>>> there a DSL >>>>>>>>>>>> way to do it? >>>>>>>>>>>> >>>>>>>>>>>> Thanks again for your driving this, >>>>>>>>>>>> -John >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com >>>>>>>>>>>> <mailto:pgwha...@gmail.com>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Ivan, I’ll definitely forfeit my point on the clumsiness of >>>>>>>>>>>> the >>>>>>>>>>>> branch(predicate, consumer) solution, I don’t see any real >>>>>>>>>>>> drawbacks >>>>>>>>>>>> for the dynamic case. >>>>>>>>>>>> >>>>>>>>>>>> IMO the one trade off to consider at this point is the >>>>>>>>>>>> scope >>>>>>>>>>>> question. I don’t know if I totally agree that “we rarely >>>>>>>>>>>> need them >>>>>>>>>>>> in the same scope” since merging the branches back together >>>>>>>>>>>> later >>>>>>>>>>>> seems like a perfectly plausible use case that can be a lot >>>>>>>>>>>> nicer >>>>>>>>>>>> when the branched streams are in the same scope. That being >>>>>>>>>>>> said, >>>>>>>>>>>> for the reasons Ivan listed, I think it is overall the >>>>>>>>>>>> better >>>>>>>>>>>> solution - working around the scope thing is easy enough if >>>>>>>>>>>> you need >>>>>>>>>>>> to. >>>>>>>>>>>> >>>>>>>>>>>> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev >>>>>>>>>>>> <iponoma...@mail.ru.invalid> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > Hello everyone, thank you all for joining the discussion! >>>>>>>>>>>> > >>>>>>>>>>>> > Well, I don't think the idea of named branches, be it a >>>>>>>>>>>> LinkedHashMap (no other Map will do, because order of >>>>>>>>>>>> definition >>>>>>>>>>>> matters) or `branch` method taking name and Consumer >>>>>>>>>>>> has more >>>>>>>>>>>> advantages than drawbacks. >>>>>>>>>>>> > >>>>>>>>>>>> > In my opinion, the only real positive outcome from >>>>>>>>>>>> Michael's >>>>>>>>>>>> proposal is that all the returned branches are in the same >>>>>>>>>>>> scope. >>>>>>>>>>>> But 1) we rarely need them in the same scope 2) there is a >>>>>>>>>>>> workaround for the scope problem, described in the KIP. >>>>>>>>>>>> > >>>>>>>>>>>> > 'Inlining the complex logic' is not a problem, because we >>>>>>>>>>>> can use >>>>>>>>>>>> method references instead of lambdas. In real world >>>>>>>>>>>> scenarios you >>>>>>>>>>>> tend to split the complex logic to methods anyway, so the >>>>>>>>>>>> code is >>>>>>>>>>>> going to be clean. >>>>>>>>>>>> > >>>>>>>>>>>> > The drawbacks are strong. The cohesion between predicates >>>>>>>>>>>> and >>>>>>>>>>>> handlers is lost. We have to define predicates in one >>>>>>>>>>>> place, and >>>>>>>>>>>> handlers in another. This opens the door for bugs: >>>>>>>>>>>> > >>>>>>>>>>>> > - what if we forget to define a handler for a name? or a >>>>>>>>>>>> name for >>>>>>>>>>>> a handler? >>>>>>>>>>>> > - what if we misspell a name? >>>>>>>>>>>> > - what if we copy-paste and duplicate a name? >>>>>>>>>>>> > >>>>>>>>>>>> > What Michael propose would have been totally OK if we had >>>>>>>>>>>> been >>>>>>>>>>>> writing the API in Lua, Ruby or Python. In those >>>>>>>>>>>> languages the >>>>>>>>>>>> "dynamic naming" approach would have looked most concise >>>>>>>>>>>> and >>>>>>>>>>>> beautiful. But in Java we expect all the problems >>>>>>>>>>>> related to >>>>>>>>>>>> identifiers to be eliminated in compile time. >>>>>>>>>>>> > >>>>>>>>>>>> > Do we have to invent duck-typing for the Java API? >>>>>>>>>>>> > >>>>>>>>>>>> > And if we do, what advantage are we supposed to get >>>>>>>>>>>> besides having >>>>>>>>>>>> all the branches in the same scope? Michael, maybe I'm >>>>>>>>>>>> missing your >>>>>>>>>>>> point? >>>>>>>>>>>> > >>>>>>>>>>>> > --- >>>>>>>>>>>> > >>>>>>>>>>>> > Earlier in this discussion John Roesler also proposed >>>>>>>>>>>> to do >>>>>>>>>>>> without "start branching" operator, and later Paul >>>>>>>>>>>> mentioned that in >>>>>>>>>>>> the case when we have to add a dynamic number of >>>>>>>>>>>> branches, the >>>>>>>>>>>> current KIP is 'clumsier' compared to Michael's 'Map' >>>>>>>>>>>> solution. Let >>>>>>>>>>>> me address both comments here. >>>>>>>>>>>> > >>>>>>>>>>>> > 1) "Start branching" operator (I think that *split* is a >>>>>>>>>>>> good name >>>>>>>>>>>> for it indeed) is critical when we need to do a dynamic >>>>>>>>>>>> branching, >>>>>>>>>>>> see example below. >>>>>>>>>>>> > >>>>>>>>>>>> > 2) No, dynamic branching in current KIP is not clumsy at >>>>>>>>>>>> all. >>>>>>>>>>>> Imagine a real-world scenario when you need one branch per >>>>>>>>>>>> enum >>>>>>>>>>>> value (say, RecordType). You can have something like this: >>>>>>>>>>>> > >>>>>>>>>>>> > /*John:if we had to start with stream.branch(...) here, >>>>>>>>>>>> it would >>>>>>>>>>>> have been much messier.*/ >>>>>>>>>>>> > KBranchedStream branched = stream.split(); >>>>>>>>>>>> > >>>>>>>>>>>> > /*Not clumsy at all :-)*/ >>>>>>>>>>>> > for (RecordType recordType : RecordType.values()) >>>>>>>>>>>> > branched = branched.branch((k, v) -> >>>>>>>>>>>> v.getRecType() == >>>>>>>>>>>> recordType, >>>>>>>>>>>> > recordType::processRecords); >>>>>>>>>>>> > >>>>>>>>>>>> > Regards, >>>>>>>>>>>> > >>>>>>>>>>>> > Ivan >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > 02.05.2019 14:40, Matthias J. Sax пишет: >>>>>>>>>>>> >> I also agree with Michael's observation about the core >>>>>>>>>>>> problem of >>>>>>>>>>>> >> current `branch()` implementation. >>>>>>>>>>>> >> >>>>>>>>>>>> >> However, I also don't like to pass in a clumsy Map >>>>>>>>>>>> object. My >>>>>>>>>>>> thinking >>>>>>>>>>>> >> was more aligned with Paul's proposal to just add a name >>>>>>>>>>>> to each >>>>>>>>>>>> >> `branch()` statement and return a `Map<String,KStream>`. >>>>>>>>>>>> >> >>>>>>>>>>>> >> It makes the code easier to read, and also make the >>>>>>>>>>>> order of >>>>>>>>>>>> >> `Predicates` (that is essential) easier to grasp. >>>>>>>>>>>> >> >>>>>>>>>>>> >>>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>>>>>>>>> >>>>>> .branch("branchOne", Predicate<K, V>) >>>>>>>>>>>> >>>>>> .branch( "branchTwo", Predicate<K, V>) >>>>>>>>>>>> >>>>>> .defaultBranch("defaultBranch"); >>>>>>>>>>>> >> An open question is the case for which no >>>>>>>>>>>> defaultBranch() should >>>>>>>>>>> be >>>>>>>>>>>> >> specified. Atm, `split()` and `branch()` would return >>>>>>>>>>>> `BranchedKStream` >>>>>>>>>>>> >> and the call to `defaultBranch()` that returns the >>>>>>>>>>>> `Map` is >>>>>>>>>>> mandatory >>>>>>>>>>>> >> (what is not the case atm). Or is this actually not a >>>>>>>>>>>> real >>>>>>>>>>> problem, >>>>>>>>>>>> >> because users can just ignore the branch returned by >>>>>>>>>>>> `defaultBranch()` >>>>>>>>>>>> >> in the result `Map` ? >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> About "inlining": So far, it seems to be a matter of >>>>>>>>>>>> personal >>>>>>>>>>>> >> preference. I can see arguments for both, but no "killer >>>>>>>>>>>> argument" yet >>>>>>>>>>>> >> that clearly make the case for one or the other. >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> -Matthias >>>>>>>>>>>> >> >>>>>>>>>>>> >>> On 5/1/19 6:26 PM, Paul Whalen wrote: >>>>>>>>>>>> >>> Perhaps inlining is the wrong terminology. It doesn’t >>>>>>>>>>>> require >>>>>>>>>>>> that a lambda with the full downstream topology be defined >>>>>>>>>>>> inline - >>>>>>>>>>>> it can be a method reference as with Ivan’s original >>>>>>>>>>>> suggestion. >>>>>>>>>>>> The advantage of putting the predicate and its downstream >>>>>>>>>>>> logic >>>>>>>>>>>> (Consumer) together in branch() is that they are required >>>>>>>>>>>> to be near >>>>>>>>>>>> to each other. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> Ultimately the downstream code has to live somewhere, >>>>>>>>>>>> and deep >>>>>>>>>>>> branch trees will be hard to read regardless. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis >>>>>>>>>>>> <michael.droga...@confluent.io >>>>>>>>>>>> <mailto:michael.droga...@confluent.io>> wrote: >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> I'm less enthusiastic about inlining the branch logic >>>>>>>>>>>> with its >>>>>>>>>>>> downstream >>>>>>>>>>>> >>>> functionality. Programs that have deep branch trees >>>>>>>>>>>> will >>>>>>>>>>>> quickly become >>>>>>>>>>>> >>>> harder to read as a single unit. >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen >>>>>>>>>>>> <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, >>>>>>>>>>>> I think >>>>>>>>>>>> that sets a >>>>>>>>>>>> >>>>> great framework for the discussion. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Regarding the SortedMap solution, my understanding is >>>>>>>>>>>> that the >>>>>>>>>>>> current >>>>>>>>>>>> >>>>> proposal in the KIP is what is in my PR which >>>>>>>>>>>> (pending naming >>>>>>>>>>>> decisions) is >>>>>>>>>>>> >>>>> roughly this: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> stream.split() >>>>>>>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>>>>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>>>>>>>>> >>>>> .defaultBranch(Consumer<KStream<K, V>>); >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Obviously some ordering is necessary, since branching >>>>>>>>>>>> as a >>>>>>>>>>>> construct >>>>>>>>>>>> >>>>> doesn't work without it, but this solution seems >>>>>>>>>>>> like it >>>>>>>>>>>> provides as much >>>>>>>>>>>> >>>>> associativity as the SortedMap solution, because each >>>>>>>>>>>> branch() >>>>>>>>>>>> call >>>>>>>>>>>> >>>>> directly associates the "conditional" with the "code >>>>>>>>>>>> block." >>>>>>>>>>>> The value it >>>>>>>>>>>> >>>>> provides over the KIP solution is the accessing of >>>>>>>>>>>> streams in >>>>>>>>>>>> the same >>>>>>>>>>>> >>>>> scope. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap >>>>>>>>>>>> solution >>>>>>>>>>>> in the sense >>>>>>>>>>>> >>>>> that it is slightly clumsier to add a dynamic >>>>>>>>>>>> number of >>>>>>>>>>>> branches, but it is >>>>>>>>>>>> >>>>> certainly possible. It seems to me like the API >>>>>>>>>>>> should favor >>>>>>>>>>>> the "static" >>>>>>>>>>>> >>>>> case anyway, and should make it simple and >>>>>>>>>>>> readable to >>>>>>>>>>>> fluently declare and >>>>>>>>>>>> >>>>> access your branches in-line. It also makes it >>>>>>>>>>>> impossible to >>>>>>>>>>>> ignore a >>>>>>>>>>>> >>>>> branch, and it is possible to build an (almost) >>>>>>>>>>>> identical >>>>>>>>>>>> SortedMap >>>>>>>>>>>> >>>>> solution on top of it. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> I could also see a middle ground where instead of >>>>>>>>>>>> a raw >>>>>>>>>>>> SortedMap being >>>>>>>>>>>> >>>>> taken in, branch() takes a name and not a Consumer. >>>>>>>>>>>> Something >>>>>>>>>>>> like this: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>>>>>>>>> >>>>> .branch("branchOne", Predicate<K, V>) >>>>>>>>>>>> >>>>> .branch( "branchTwo", Predicate<K, V>) >>>>>>>>>>>> >>>>> .defaultBranch("defaultBranch", >>>>>>>>>>>> Consumer<KStream<K, V>>); >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Pros for that solution: >>>>>>>>>>>> >>>>> - accessing branched KStreams in same scope >>>>>>>>>>>> >>>>> - no double brace initialization, hopefully slightly >>>>>>>>>>>> more >>>>>>>>>>>> readable than >>>>>>>>>>>> >>>>> SortedMap >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Cons >>>>>>>>>>>> >>>>> - downstream branch logic cannot be specified inline >>>>>>>>>>>> which >>>>>>>>>>>> makes it harder >>>>>>>>>>>> >>>>> to read top to bottom (like existing API and >>>>>>>>>>>> SortedMap, but >>>>>>>>>>>> unlike the KIP) >>>>>>>>>>>> >>>>> - you can forget to "handle" one of the branched >>>>>>>>>>>> streams (like >>>>>>>>>>>> existing >>>>>>>>>>>> >>>>> API and SortedMap, but unlike the KIP) >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> (KBranchedStreams could even work *both* ways but >>>>>>>>>>>> perhaps >>>>>>>>>>>> that's overdoing >>>>>>>>>>>> >>>>> it). >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Overall I'm curious how important it is to be able to >>>>>>>>>>>> easily >>>>>>>>>>>> access the >>>>>>>>>>>> >>>>> branched KStream in the same scope as the original. >>>>>>>>>>>> It's >>>>>>>>>>>> possible that it >>>>>>>>>>>> >>>>> doesn't need to be handled directly by the API, but >>>>>>>>>>>> instead >>>>>>>>>>>> left up to the >>>>>>>>>>>> >>>>> user. I'm sort of in the middle on it. >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> Paul >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman >>>>>>>>>>>> <sop...@confluent.io <mailto:sop...@confluent.io>> >>>>>>>>>>>> >>>>> wrote: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>>> I'd like to +1 what Michael said about the issues >>>>>>>>>>>> with the >>>>>>>>>>>> existing >>>>>>>>>>>> >>>>> branch >>>>>>>>>>>> >>>>>> method, I agree with what he's outlined and I think >>>>>>>>>>>> we should >>>>>>>>>>>> proceed by >>>>>>>>>>>> >>>>>> trying to alleviate these problems. Specifically it >>>>>>>>>>>> seems >>>>>>>>>>>> important to be >>>>>>>>>>>> >>>>>> able to cleanly access the individual branches (eg >>>>>>>>>>>> by mapping >>>>>>>>>>>> >>>>>> name->stream), which I thought was the original >>>>>>>>>>>> intention of >>>>>>>>>>>> this KIP. >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> >>>>>> That said, I don't think we should so easily give in >>>>>>>>>>>> to the >>>>>>>>>>>> double brace >>>>>>>>>>>> >>>>>> anti-pattern or force ours users into it if at all >>>>>>>>>>>> possible to >>>>>>>>>>>> >>>>> avoid...just >>>>>>>>>>>> >>>>>> my two cents. >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> >>>>>> Cheers, >>>>>>>>>>>> >>>>>> Sophie >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < >>>>>>>>>>>> >>>>>> michael.droga...@confluent.io >>>>>>>>>>>> <mailto:michael.droga...@confluent.io>> wrote: >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> >>>>>>> I’d like to propose a different way of thinking >>>>>>>>>>>> about this. >>>>>>>>>>>> To me, >>>>>>>>>>>> >>>>> there >>>>>>>>>>>> >>>>>>> are three problems with the existing branch >>>>>>>>>>>> signature: >>>>>>>>>>>> >>>>>>> >>>>>>>>>>>> >>>>>>> 1. If you use it the way most people do, Java >>>>>>>>>>>> raises unsafe >>>>>>>>>>> type >>>>>>>>>>>> >>>>>> warnings. >>>>>>>>>>>> >>>>>>> 2. The way in which you use the stream branches is >>>>>>>>>>>> positionally coupled >>>>>>>>>>>> >>>>>> to >>>>>>>>>>>> >>>>>>> the ordering of the conditionals. >>>>>>>>>>>> >>>>>>> 3. It is brittle to extend existing branch calls >>>>>>>>>>>> with >>>>>>>>>>>> additional code >>>>>>>>>>>> >>>>>>> paths. >>>>>>>>>>>> >>>>>>> >>>>>>>>>>>> >>>>>>> Using associative constructs instead of relying on >>>>>>>>>>>> ordered >>>>>>>>>>>> constructs >>>>>>>>>>>> >>>>>> would >>>>>>>>>>>> >>>>>>> be a stronger approach. Consider a signature that >>>>>>>>>>>> instead >>>>>>>>>>>> looks like >>>>>>>>>>>> >>>>>> this: >>>>>>>>>>>> >>>>>>> Map<String, KStream<K,V>> >>>>>>>>>>>> KStream#branch(SortedMap<String, >>>>>>>>>>>> Predicate<? >>>>>>>>>>>> >>>>>>> super K,? super V>>); >>>>>>>>>>>> >>>>>>> >>>>>>>>>>>> >>>>>>> Branches are given names in a map, and as a result, >>>>>>>>>>>> the API >>>>>>>>>>>> returns a >>>>>>>>>>>> >>>>>>> mapping of names to streams. The ordering of the >>>>>>>>>>> conditionals is >>>>>>>>>>>> >>>>>> maintained >>>>>>>>>>>> >>>>>>> because it’s a sorted map. Insert order determines >>>>>>>>>>>> the order >>>>>>>>>>> of >>>>>>>>>>>> >>>>>> evaluation. >>>>>>>>>>>> >>>>>>> This solves problem 1 because there are no more >>>>>>>>>>>> varargs. It >>>>>>>>>>>> solves >>>>>>>>>>>> >>>>>> problem >>>>>>>>>>>> >>>>>>> 2 because you no longer lean on ordering to >>>>>>>>>>>> access the >>>>>>>>>>>> branch you’re >>>>>>>>>>>> >>>>>>> interested in. It solves problem 3 because you can >>>>>>>>>>>> introduce >>>>>>>>>>>> another >>>>>>>>>>>> >>>>>>> conditional by simply attaching another name to the >>>>>>>>>>>> structure, rather >>>>>>>>>>>> >>>>>> than >>>>>>>>>>>> >>>>>>> messing with the existing indices. >>>>>>>>>>>> >>>>>>> >>>>>>>>>>>> >>>>>>> One of the drawbacks is that creating the map >>>>>>>>>>>> inline is >>>>>>>>>>>> historically >>>>>>>>>>>> >>>>>>> awkward in Java. I know it’s an anti-pattern to use >>>>>>>>>>>> voluminously, but >>>>>>>>>>>> >>>>>>> double brace initialization would clean up the >>>>>>>>>>>> aesthetics. >>>>>>>>>>>> >>>>>>> >>>>>>>>>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler >>>>>>>>>>>> <j...@confluent.io <mailto:j...@confluent.io>> >>>>>>>>>>>> >>>>> wrote: >>>>>>>>>>>> >>>>>>>> Hi Ivan, >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>> Thanks for the update. >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>> FWIW, I agree with Matthias that the current >>>>>>>>>>>> "start >>>>>>>>>>> branching" >>>>>>>>>>>> >>>>> operator >>>>>>>>>>>> >>>>>>> is >>>>>>>>>>>> >>>>>>>> confusing when named the same way as the actual >>>>>>>>>>>> branches. >>>>>>>>>>>> "Split" >>>>>>>>>>>> >>>>> seems >>>>>>>>>>>> >>>>>>>> like a good name. Alternatively, we can do without >>>>>>>>>>>> a "start >>>>>>>>>>>> >>>>> branching" >>>>>>>>>>>> >>>>>>>> operator at all, and just do: >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>> stream >>>>>>>>>>>> >>>>>>>> .branch(Predicate) >>>>>>>>>>>> >>>>>>>> .branch(Predicate) >>>>>>>>>>>> >>>>>>>> .defaultBranch(); >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>> Tentatively, I think that this branching operation >>>>>>>>>>>> should be >>>>>>>>>>>> >>>>> terminal. >>>>>>>>>>>> >>>>>>> That >>>>>>>>>>>> >>>>>>>> way, we don't create ambiguity about how to use >>>>>>>>>>>> it. That >>>>>>>>>>>> is, `branch` >>>>>>>>>>>> >>>>>>>> should return `KBranchedStream`, while >>>>>>>>>>>> `defaultBranch` is >>>>>>>>>>>> `void`, to >>>>>>>>>>>> >>>>>>>> enforce that it comes last, and that there is only >>>>>>>>>>>> one >>>>>>>>>>>> definition of >>>>>>>>>>>> >>>>>> the >>>>>>>>>>>> >>>>>>>> default branch. Potentially, we should log a >>>>>>>>>>>> warning if >>>>>>>>>>>> there's no >>>>>>>>>>>> >>>>>>> default, >>>>>>>>>>>> >>>>>>>> and additionally log a warning (or throw an >>>>>>>>>>>> exception) if a >>>>>>>>>>>> record >>>>>>>>>>>> >>>>>> falls >>>>>>>>>>>> >>>>>>>> though with no default. >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>> Thoughts? >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>> -John >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < >>>>>>>>>>>> >>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>>>>>>>>>>> >>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>> >>>>>>>>> Thanks for updating the KIP and your answers. >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> this is to make the name similar to String#split >>>>>>>>>>>> >>>>>>>>>>> that also returns an array, right? >>>>>>>>>>>> >>>>>>>>> The intend was to avoid name duplication. The >>>>>>>>>>>> return type >>>>>>>>>>>> should >>>>>>>>>>>> >>>>>> _not_ >>>>>>>>>>>> >>>>>>>>> be an array. >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> The current proposal is >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> stream.branch() >>>>>>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>>>>>> >>>>>>>>> .defaultBranch(); >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> IMHO, this reads a little odd, because the first >>>>>>>>>>>> `branch()` does >>>>>>>>>>>> >>>>> not >>>>>>>>>>>> >>>>>>>>> take any parameters and has different semantics >>>>>>>>>>>> than the >>>>>>>>>>> later >>>>>>>>>>>> >>>>>>>>> `branch()` calls. Note, that from the code >>>>>>>>>>>> snippet above, >>>>>>>>>>> it's >>>>>>>>>>>> >>>>> hidden >>>>>>>>>>>> >>>>>>>>> that the first call is `KStream#branch()` while >>>>>>>>>>>> the others >>>>>>>>>>> are >>>>>>>>>>>> >>>>>>>>> `KBranchedStream#branch()` what makes reading the >>>>>>>>>>>> code >>>>>>>>>>> harder. >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> >>>>>>>>>>>> `branch()`, >>>>>>>>>>>> I though >>>>>>>>>>>> >>>>>> it >>>>>>>>>>>> >>>>>>>>> might be better to also rename `KStream#branch()` >>>>>>>>>>>> to avoid >>>>>>>>>>> the >>>>>>>>>>>> >>>>> naming >>>>>>>>>>>> >>>>>>>>> overlap that seems to be confusing. The following >>>>>>>>>>>> reads >>>>>>>>>>> much >>>>>>>>>>>> >>>>> cleaner >>>>>>>>>>>> >>>>>> to >>>>>>>>>>>> >>>>>>>> me: >>>>>>>>>>>> >>>>>>>>> stream.split() >>>>>>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>>>>>> >>>>>>>>> .branch(Predicate) >>>>>>>>>>>> >>>>>>>>> .defaultBranch(); >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> Maybe there is a better alternative to `split()` >>>>>>>>>>>> though to >>>>>>>>>>>> avoid >>>>>>>>>>>> >>>>> the >>>>>>>>>>>> >>>>>>>>> naming overlap. >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> 'default' is, however, a reserved word, so >>>>>>>>>>>> unfortunately >>>>>>>>>>> we >>>>>>>>>>>> >>>>> cannot >>>>>>>>>>>> >>>>>>> have >>>>>>>>>>>> >>>>>>>>> a method with such name :-) >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still >>>>>>>>>>>> come up >>>>>>>>>>>> with a >>>>>>>>>>>> >>>>> short >>>>>>>>>>>> >>>>>>>> name? >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to >>>>>>>>>>>> the KIP >>>>>>>>>>>> with all >>>>>>>>>>>> >>>>> it's >>>>>>>>>>>> >>>>>>>>> methods? It will be part of public API and >>>>>>>>>>>> should be >>>>>>>>>>>> contained in >>>>>>>>>>>> >>>>> the >>>>>>>>>>>> >>>>>>>>> KIP. For example, it's unclear atm, what the >>>>>>>>>>>> return type of >>>>>>>>>>>> >>>>>>>>> `defaultBranch()` is. >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> You did not comment on the idea to add a >>>>>>>>>>>> `KBranchedStream#get(int >>>>>>>>>>>> >>>>>>> index) >>>>>>>>>>>> >>>>>>>>> -> KStream` method to get the individually >>>>>>>>>>>> branched-KStreams. Would >>>>>>>>>>>> >>>>>> be >>>>>>>>>>>> >>>>>>>>> nice to get your feedback about it. It seems you >>>>>>>>>>>> suggest >>>>>>>>>>>> that users >>>>>>>>>>>> >>>>>>>>> would need to write custom utility code >>>>>>>>>>>> otherwise, to >>>>>>>>>>>> access them. >>>>>>>>>>>> >>>>> We >>>>>>>>>>>> >>>>>>>>> should discuss the pros and cons of both >>>>>>>>>>>> approaches. It >>>>>>>>>>> feels >>>>>>>>>>>> >>>>>>>>> "incomplete" to me atm, if the API has no >>>>>>>>>>>> built-in support >>>>>>>>>>>> to get >>>>>>>>>>>> >>>>> the >>>>>>>>>>>> >>>>>>>>> branched-KStreams directly. >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: >>>>>>>>>>>> >>>>>>>>>> Hi all! >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> I have updated the KIP-418 according to the new >>>>>>>>>>>> vision. >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> Matthias, thanks for your comment! >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split() >>>>>>>>>>>> >>>>>>>>>> I can see your point: this is to make the name >>>>>>>>>>>> similar to >>>>>>>>>>>> >>>>>>> String#split >>>>>>>>>>>> >>>>>>>>>> that also returns an array, right? But is it >>>>>>>>>>>> worth the >>>>>>>>>>>> loss of >>>>>>>>>>>> >>>>>>>> backwards >>>>>>>>>>>> >>>>>>>>>> compatibility? We can have overloaded branch() >>>>>>>>>>>> as well >>>>>>>>>>>> without >>>>>>>>>>>> >>>>>>>> affecting >>>>>>>>>>>> >>>>>>>>>> the existing code. Maybe the old array-based >>>>>>>>>>>> `branch` >>>>>>>>>>> method >>>>>>>>>>>> >>>>> should >>>>>>>>>>>> >>>>>>> be >>>>>>>>>>>> >>>>>>>>>> deprecated, but this is a subject for >>>>>>>>>>>> discussion. >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() -> >>>>>>>>>>>> >>>>> BranchingKStream#branch(), >>>>>>>>>>>> >>>>>>>>>> KBranchedStream#defaultBranch() -> >>>>>>>>>>> BranchingKStream#default() >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. >>>>>>>>>>>> 'default' >>>>>>>>>>> is, >>>>>>>>>>>> >>>>>>> however, a >>>>>>>>>>>> >>>>>>>>>> reserved word, so unfortunately we cannot have a >>>>>>>>>>>> method >>>>>>>>>>>> with such >>>>>>>>>>>> >>>>>>> name >>>>>>>>>>>> >>>>>>>>> :-) >>>>>>>>>>>> >>>>>>>>>>> defaultBranch() does take an `Predicate` as >>>>>>>>>>>> argument, >>>>>>>>>>> but I >>>>>>>>>>>> >>>>> think >>>>>>>>>>>> >>>>>>> that >>>>>>>>>>>> >>>>>>>>>> is not required? >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste >>>>>>>>>>>> error or >>>>>>>>>>>> something. >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> Dear colleagues, >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> please revise the new version of the KIP and >>>>>>>>>>>> Paul's PR >>>>>>>>>>>> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512) >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> Any new suggestions/objections? >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет: >>>>>>>>>>>> >>>>>>>>>>> Thanks for driving the discussion of this KIP. >>>>>>>>>>>> It seems >>>>>>>>>>> that >>>>>>>>>>>> >>>>>>> everybody >>>>>>>>>>>> >>>>>>>>>>> agrees that the current branch() method using >>>>>>>>>>>> arrays is >>>>>>>>>>> not >>>>>>>>>>>> >>>>>> optimal. >>>>>>>>>>>> >>>>>>>>>>> I had a quick look into the PR and I like the >>>>>>>>>>>> overall >>>>>>>>>>>> proposal. >>>>>>>>>>>> >>>>>>> There >>>>>>>>>>>> >>>>>>>>>>> are some minor things we need to consider. I >>>>>>>>>>>> would >>>>>>>>>>>> recommend the >>>>>>>>>>>> >>>>>>>>>>> following renaming: >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> KStream#branch() -> #split() >>>>>>>>>>>> >>>>>>>>>>> KBranchedStream#addBranch() -> >>>>>>>>>>>> BranchingKStream#branch() >>>>>>>>>>>> >>>>>>>>>>> KBranchedStream#defaultBranch() -> >>>>>>>>>>>> BranchingKStream#default() >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter >>>>>>>>>>>> method >>>>>>>>>>> names. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an >>>>>>>>>>>> `Predicate` as >>>>>>>>>>>> >>>>>>>> argument, >>>>>>>>>>>> >>>>>>>>>>> but I think that is not required? >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was >>>>>>>>>>>> recently >>>>>>>>>>>> accepted and >>>>>>>>>>>> >>>>>> is >>>>>>>>>>>> >>>>>>>>>>> currently implemented: >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> Ie, we should add overloads that accepted a >>>>>>>>>>>> `Named` >>>>>>>>>>>> parameter. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> For the issue that the created `KStream` object >>>>>>>>>>>> are in >>>>>>>>>>>> different >>>>>>>>>>>> >>>>>>>> scopes: >>>>>>>>>>>> >>>>>>>>>>> could we extend `KBranchedStream` with a >>>>>>>>>>>> `get(int >>>>>>>>>>>> index)` method >>>>>>>>>>>> >>>>>>> that >>>>>>>>>>>> >>>>>>>>>>> returns the corresponding "branched" result >>>>>>>>>>>> `KStream` >>>>>>>>>>>> object? >>>>>>>>>>>> >>>>>> Maybe, >>>>>>>>>>>> >>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>> second argument of `addBranch()` should not >>>>>>>>>>>> be a >>>>>>>>>>>> >>>>>> `Consumer<KStream>` >>>>>>>>>>>> >>>>>>>> but >>>>>>>>>>>> >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could >>>>>>>>>>>> return >>>>>>>>>>>> whatever >>>>>>>>>>>> >>>>>> the >>>>>>>>>>>> >>>>>>>>>>> `Function` returns? >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP >>>>>>>>>>>> with the >>>>>>>>>>>> current >>>>>>>>>>>> >>>>>>>>>>> proposal. That makes it easier to review. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: >>>>>>>>>>>> >>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I >>>>>>>>>>>> think it >>>>>>>>>>>> makes sense >>>>>>>>>>>> >>>>>> for >>>>>>>>>>>> >>>>>>>> you >>>>>>>>>>>> >>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>> revise the KIP and continue the discussion. >>>>>>>>>>>> Obviously >>>>>>>>>>>> we'll >>>>>>>>>>>> >>>>> need >>>>>>>>>>>> >>>>>>>> some >>>>>>>>>>>> >>>>>>>>>>>> buy-in from committers that have actual >>>>>>>>>>>> binding votes on >>>>>>>>>>>> >>>>> whether >>>>>>>>>>>> >>>>>>> the >>>>>>>>>>>> >>>>>>>>> KIP >>>>>>>>>>>> >>>>>>>>>>>> could be adopted. It would be great to hear >>>>>>>>>>>> if they >>>>>>>>>>>> think this >>>>>>>>>>>> >>>>>> is >>>>>>>>>>>> >>>>>>> a >>>>>>>>>>>> >>>>>>>>> good >>>>>>>>>>>> >>>>>>>>>>>> idea overall. I'm not sure if that happens >>>>>>>>>>>> just by >>>>>>>>>>>> starting a >>>>>>>>>>>> >>>>>>> vote, >>>>>>>>>>>> >>>>>>>>> or if >>>>>>>>>>>> >>>>>>>>>>>> there is generally some indication of interest >>>>>>>>>>> beforehand. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That being said, I'll continue the discussion >>>>>>>>>>>> a bit: >>>>>>>>>>>> assuming >>>>>>>>>>>> >>>>> we >>>>>>>>>>>> >>>>>> do >>>>>>>>>>>> >>>>>>>>> move >>>>>>>>>>>> >>>>>>>>>>>> forward the solution of "stream.branch() >>>>>>>>>>>> returns >>>>>>>>>>>> >>>>>> KBranchedStream", >>>>>>>>>>>> >>>>>>> do >>>>>>>>>>>> >>>>>>>>> we >>>>>>>>>>>> >>>>>>>>>>>> deprecate "stream.branch(...) returns >>>>>>>>>>>> KStream[]"? I >>>>>>>>>>> would >>>>>>>>>>>> >>>>> favor >>>>>>>>>>>> >>>>>>>>>>>> deprecating, since having two mutually >>>>>>>>>>>> exclusive APIs >>>>>>>>>>> that >>>>>>>>>>>> >>>>>>> accomplish >>>>>>>>>>>> >>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>> same thing is confusing, especially when >>>>>>>>>>>> they're fairly >>>>>>>>>>>> similar >>>>>>>>>>>> >>>>>>>>> anyway. We >>>>>>>>>>>> >>>>>>>>>>>> just need to be sure we're not making >>>>>>>>>>>> something >>>>>>>>>>>> >>>>>>> impossible/difficult >>>>>>>>>>>> >>>>>>>>> that >>>>>>>>>>>> >>>>>>>>>>>> is currently possible/easy. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Regarding my PR - I think the general >>>>>>>>>>>> structure would >>>>>>>>>>> work, >>>>>>>>>>>> >>>>> it's >>>>>>>>>>>> >>>>>>>> just a >>>>>>>>>>>> >>>>>>>>>>>> little sloppy overall in terms of naming and >>>>>>>>>>>> clarity. In >>>>>>>>>>>> >>>>>>> particular, >>>>>>>>>>>> >>>>>>>>>>>> passing in the "predicates" and "children" >>>>>>>>>>>> lists which >>>>>>>>>>> get >>>>>>>>>>>> >>>>>> modified >>>>>>>>>>>> >>>>>>>> in >>>>>>>>>>>> >>>>>>>>>>>> KBranchedStream but read from all the way >>>>>>>>>>>> KStreamLazyBranch is >>>>>>>>>>>> >>>>> a >>>>>>>>>>>> >>>>>>> bit >>>>>>>>>>>> >>>>>>>>>>>> complicated to follow. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan >>>>>>>>>>>> Ponomarev < >>>>>>>>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Paul! >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully >>>>>>>>>>>> convinced: your >>>>>>>>>>>> >>>>>>>> proposal >>>>>>>>>>>> >>>>>>>>>>>>> looks better and should work. We just have to >>>>>>>>>>>> document >>>>>>>>>>> the >>>>>>>>>>>> >>>>>> crucial >>>>>>>>>>>> >>>>>>>>> fact >>>>>>>>>>>> >>>>>>>>>>>>> that KStream consumers are invoked as they're >>>>>>>>>>>> added. >>>>>>>>>>>> And then >>>>>>>>>>>> >>>>>> it's >>>>>>>>>>>> >>>>>>>> all >>>>>>>>>>>> >>>>>>>>>>>>> going to be very nice. >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the >>>>>>>>>>>> KIP and >>>>>>>>>>>> resume the >>>>>>>>>>>> >>>>>>>>>>>>> discussion here, right? >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not >>>>>>>>>>>> be even a >>>>>>>>>>>> >>>>> starting >>>>>>>>>>>> >>>>>>>> point >>>>>>>>>>>> >>>>>>>>> if >>>>>>>>>>>> >>>>>>>>>>>>> we go in this direction'? To me it looks like >>>>>>>>>>>> a good >>>>>>>>>>>> starting >>>>>>>>>>>> >>>>>>> point. >>>>>>>>>>>> >>>>>>>>> But >>>>>>>>>>>> >>>>>>>>>>>>> as a novice in this project I might miss some >>>>>>>>>>>> important >>>>>>>>>>>> >>>>> details. >>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет: >>>>>>>>>>>> >>>>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>> Maybe I’m missing the point, but I >>>>>>>>>>>> believe the >>>>>>>>>>>> >>>>> stream.branch() >>>>>>>>>>>> >>>>>>>>> solution >>>>>>>>>>>> >>>>>>>>>>>>> supports this. The couponIssuer::set* >>>>>>>>>>>> consumers will be >>>>>>>>>>>> >>>>> invoked >>>>>>>>>>>> >>>>>> as >>>>>>>>>>>> >>>>>>>>> they’re >>>>>>>>>>>> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So >>>>>>>>>>>> the user >>>>>>>>>>>> still >>>>>>>>>>>> >>>>>> ought >>>>>>>>>>>> >>>>>>> to >>>>>>>>>>>> >>>>>>>>> be >>>>>>>>>>>> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward >>>>>>>>>>>> and >>>>>>>>>>>> depend on >>>>>>>>>>>> >>>>> the >>>>>>>>>>>> >>>>>>>>> branched >>>>>>>>>>>> >>>>>>>>>>>>> streams having been set. >>>>>>>>>>>> >>>>>>>>>>>>>> The issue I mean to point out is that it is >>>>>>>>>>>> hard to >>>>>>>>>>>> access >>>>>>>>>>>> >>>>> the >>>>>>>>>>>> >>>>>>>>> branched >>>>>>>>>>>> >>>>>>>>>>>>> streams in the same scope as the original >>>>>>>>>>>> stream (that >>>>>>>>>>>> is, not >>>>>>>>>>>> >>>>>>>> inside >>>>>>>>>>>> >>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>> couponIssuer), which is a problem with both >>>>>>>>>>>> proposed >>>>>>>>>>>> >>>>> solutions. >>>>>>>>>>>> >>>>>> It >>>>>>>>>>>> >>>>>>>>> can be >>>>>>>>>>>> >>>>>>>>>>>>> worked around though. >>>>>>>>>>>> >>>>>>>>>>>>>> [Also, great to hear additional interest in >>>>>>>>>>>> 401, I’m >>>>>>>>>>>> excited >>>>>>>>>>>> >>>>> to >>>>>>>>>>>> >>>>>>>> hear >>>>>>>>>>>> >>>>>>>>>>>>> your thoughts!] >>>>>>>>>>>> >>>>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan >>>>>>>>>>>> Ponomarev < >>>>>>>>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Paul! >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches >>>>>>>>>>>> to the >>>>>>>>>>>> >>>>>>>>>>>>> streamsBuilder.build() also looked great for >>>>>>>>>>>> me at >>>>>>>>>>> first >>>>>>>>>>>> >>>>> glance, >>>>>>>>>>>> >>>>>>> but >>>>>>>>>>>> >>>>>>>>> --- >>>>>>>>>>>> >>>>>>>>>>>>>>>> the newly branched streams are not >>>>>>>>>>>> available in the >>>>>>>>>>>> same >>>>>>>>>>>> >>>>>> scope >>>>>>>>>>>> >>>>>>> as >>>>>>>>>>>> >>>>>>>>> each >>>>>>>>>>>> >>>>>>>>>>>>> other. That is, if we wanted to merge >>>>>>>>>>>> them back >>>>>>>>>>> together >>>>>>>>>>>> >>>>> again >>>>>>>>>>>> >>>>>> I >>>>>>>>>>>> >>>>>>>>> don't see >>>>>>>>>>>> >>>>>>>>>>>>> a way to do that. >>>>>>>>>>>> >>>>>>>>>>>>>>> You just took the words right out of my >>>>>>>>>>>> mouth, I was >>>>>>>>>>>> just >>>>>>>>>>>> >>>>>> going >>>>>>>>>>>> >>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> write in details about this issue. >>>>>>>>>>>> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. >>>>>>>>>>>> 101: say >>>>>>>>>>>> we need >>>>>>>>>>>> >>>>> to >>>>>>>>>>>> >>>>>>>>> identify >>>>>>>>>>>> >>>>>>>>>>>>> customers who have bought coffee and made a >>>>>>>>>>>> purchase >>>>>>>>>>>> in the >>>>>>>>>>>> >>>>>>>>> electronics >>>>>>>>>>>> >>>>>>>>>>>>> store to give them coupons. >>>>>>>>>>>> >>>>>>>>>>>>>>> This is the code I usually write under >>>>>>>>>>>> these >>>>>>>>>>>> circumstances >>>>>>>>>>>> >>>>>> using >>>>>>>>>>>> >>>>>>>> my >>>>>>>>>>>> >>>>>>>>>>>>> 'brancher' class: >>>>>>>>>>>> >>>>>>>>>>>>>>> @Setter >>>>>>>>>>>> >>>>>>>>>>>>>>> class CouponIssuer{ >>>>>>>>>>>> >>>>>>>>>>>>>>> private KStream<....> coffePurchases; >>>>>>>>>>>> >>>>>>>>>>>>>>> private KStream<....> >>>>>>>>>>>> electronicsPurchases; >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){ >>>>>>>>>>>> >>>>>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever >>>>>>>>>>>> >>>>>>>>>>>>>>> /*In the real world the code here >>>>>>>>>>>> can be >>>>>>>>>>>> complex, so >>>>>>>>>>>> >>>>>>>>> creation of >>>>>>>>>>>> >>>>>>>>>>>>> a separate CouponIssuer class is fully >>>>>>>>>>>> justified, in >>>>>>>>>>>> order to >>>>>>>>>>>> >>>>>>>> separate >>>>>>>>>>>> >>>>>>>>>>>>> classes' responsibilities.*/ >>>>>>>>>>>> >>>>>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new >>>>>>>>>>>> CouponIssuer(); >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>() >>>>>>>>>>>> >>>>>>>>>>>>>>> .branch(predicate1, >>>>>>>>>>> couponIssuer::setCoffePurchases) >>>>>>>>>>>> >>>>>>>>>>>>>>> .branch(predicate2, >>>>>>>>>>>> >>>>>> couponIssuer::setElectronicsPurchases) >>>>>>>>>>>> >>>>>>>>>>>>>>> .onTopOf(transactionStream); >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to >>>>>>>>>>>> wire up >>>>>>>>>>>> everything >>>>>>>>>>>> >>>>>>>> later, >>>>>>>>>>>> >>>>>>>>>>>>> without the terminal operation!!!*/ >>>>>>>>>>>> >>>>>>>>>>>>>>> couponIssuer.coupons()... >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly >>>>>>>>>>>> initialize the >>>>>>>>>>>> >>>>>>>>> CouponIssuer >>>>>>>>>>>> >>>>>>>>>>>>> we need the terminal operation to be called >>>>>>>>>>>> before >>>>>>>>>>>> >>>>>>>>> streamsBuilder.build() >>>>>>>>>>>> >>>>>>>>>>>>> is called. >>>>>>>>>>>> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your >>>>>>>>>>>> KIP-401 is >>>>>>>>>>>> essentially >>>>>>>>>>>> >>>>>> the >>>>>>>>>>>> >>>>>>>>> next >>>>>>>>>>>> >>>>>>>>>>>>> KIP I was going to write here. I have some >>>>>>>>>>>> thoughts >>>>>>>>>>>> based on >>>>>>>>>>>> >>>>> my >>>>>>>>>>>> >>>>>>>>> experience, >>>>>>>>>>>> >>>>>>>>>>>>> so I will join the discussion on KIP-401 >>>>>>>>>>>> soon.] >>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет: >>>>>>>>>>>> >>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>>>>>> I tried to make a very rough proof of >>>>>>>>>>>> concept of a >>>>>>>>>>>> fluent >>>>>>>>>>>> >>>>> API >>>>>>>>>>>> >>>>>>>> based >>>>>>>>>>>> >>>>>>>>>>>>> off of >>>>>>>>>>>> >>>>>>>>>>>>>>>> KStream here >>>>>>>>>>>> (https://github.com/apache/kafka/pull/6512), >>>>>>>>>>>> >>>>>> and >>>>>>>>>>>> >>>>>>> I >>>>>>>>>>>> >>>>>>>>> think >>>>>>>>>>>> >>>>>>>>>>>>> I >>>>>>>>>>>> >>>>>>>>>>>>>>>> succeeded at removing both cons. >>>>>>>>>>>> >>>>>>>>>>>>>>>> - Compatibility: I was incorrect >>>>>>>>>>>> earlier about >>>>>>>>>>>> >>>>>>> compatibility >>>>>>>>>>>> >>>>>>>>>>>>> issues, >>>>>>>>>>>> >>>>>>>>>>>>>>>> there aren't any direct ones. I was >>>>>>>>>>>> unaware >>>>>>>>>>>> that Java >>>>>>>>>>>> >>>>> is >>>>>>>>>>>> >>>>>>>> smart >>>>>>>>>>>> >>>>>>>>>>>>> enough to >>>>>>>>>>>> >>>>>>>>>>>>>>>> distinguish between a >>>>>>>>>>>> branch(varargs...) >>>>>>>>>>>> returning one >>>>>>>>>>>> >>>>>>> thing >>>>>>>>>>>> >>>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>> branch() >>>>>>>>>>>> >>>>>>>>>>>>>>>> with no arguments returning another >>>>>>>>>>>> thing. >>>>>>>>>>>> >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't >>>>>>>>>>>> actually >>>>>>>>>>>> need >>>>>>>>>>>> >>>>> it. >>>>>>>>>>>> >>>>>>> We >>>>>>>>>>>> >>>>>>>>> can >>>>>>>>>>>> >>>>>>>>>>>>> just >>>>>>>>>>>> >>>>>>>>>>>>>>>> build up the branches in the >>>>>>>>>>>> KBranchedStream who >>>>>>>>>>>> shares >>>>>>>>>>>> >>>>>> its >>>>>>>>>>>> >>>>>>>>> state >>>>>>>>>>>> >>>>>>>>>>>>> with the >>>>>>>>>>>> >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do >>>>>>>>>>>> the >>>>>>>>>>>> branching. >>>>>>>>>>>> >>>>>>> It's >>>>>>>>>>>> >>>>>>>>> not >>>>>>>>>>>> >>>>>>>>>>>>> terribly >>>>>>>>>>>> >>>>>>>>>>>>>>>> pretty in its current form, but I >>>>>>>>>>>> think it >>>>>>>>>>>> demonstrates >>>>>>>>>>>> >>>>>> its >>>>>>>>>>>> >>>>>>>>>>>>> feasibility. >>>>>>>>>>>> >>>>>>>>>>>>>>>> To be clear, I don't think that pull >>>>>>>>>>>> request should >>>>>>>>>>> be >>>>>>>>>>>> >>>>> final >>>>>>>>>>>> >>>>>> or >>>>>>>>>>>> >>>>>>>>> even a >>>>>>>>>>>> >>>>>>>>>>>>>>>> starting point if we go in this direction, >>>>>>>>>>>> I just >>>>>>>>>>>> wanted to >>>>>>>>>>>> >>>>>> see >>>>>>>>>>>> >>>>>>>> how >>>>>>>>>>>> >>>>>>>>>>>>>>>> challenging it would be to get the API >>>>>>>>>>>> working. >>>>>>>>>>>> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the >>>>>>>>>>>> existing >>>>>>>>>>>> solution >>>>>>>>>>>> >>>>>>> could >>>>>>>>>>>> >>>>>>>> be >>>>>>>>>>>> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had >>>>>>>>>>>> originally >>>>>>>>>>>> >>>>> suggested >>>>>>>>>>>> >>>>>>>> was a >>>>>>>>>>>> >>>>>>>>>>>>>>>> possibility. The reason is that the newly >>>>>>>>>>>> branched >>>>>>>>>>>> streams >>>>>>>>>>>> >>>>>> are >>>>>>>>>>>> >>>>>>>> not >>>>>>>>>>>> >>>>>>>>>>>>>>>> available in the same scope as each >>>>>>>>>>>> other. That >>>>>>>>>>>> is, if we >>>>>>>>>>>> >>>>>>> wanted >>>>>>>>>>>> >>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> merge >>>>>>>>>>>> >>>>>>>>>>>>>>>> them back together again I don't see a way >>>>>>>>>>>> to do >>>>>>>>>>>> that. The >>>>>>>>>>>> >>>>>> KIP >>>>>>>>>>>> >>>>>>>>>>>>> proposal >>>>>>>>>>>> >>>>>>>>>>>>>>>> has the same issue, though - all this >>>>>>>>>>>> means is that >>>>>>>>>>> for >>>>>>>>>>>> >>>>>> either >>>>>>>>>>>> >>>>>>>>>>>>> solution, >>>>>>>>>>>> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is >>>>>>>>>>>> not on the >>>>>>>>>>>> table. >>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan >>>>>>>>>>>> Ponomarev < >>>>>>>>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>> >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>>>>>> OK, let me summarize what we have >>>>>>>>>>>> discussed up to >>>>>>>>>>> this >>>>>>>>>>>> >>>>>> point. >>>>>>>>>>>> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed >>>>>>>>>>>> that >>>>>>>>>>>> branch API >>>>>>>>>>>> >>>>>>> needs >>>>>>>>>>>> >>>>>>>>>>>>>>>>> improvement. Motivation is given in >>>>>>>>>>>> the KIP. >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it: >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed) >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() >>>>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..) >>>>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..) >>>>>>>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional >>>>>>>>>>>> >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... >>>>>>>>>>>> //onTopOf >>>>>>>>>>>> returns >>>>>>>>>>>> >>>>>> its >>>>>>>>>>>> >>>>>>>>> argument >>>>>>>>>>>> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) >>>>>>>>>>>> The code >>>>>>>>>>> won't >>>>>>>>>>>> >>>>> make >>>>>>>>>>>> >>>>>>>> sense >>>>>>>>>>>> >>>>>>>>>>>>> until >>>>>>>>>>>> >>>>>>>>>>>>>>>>> all the necessary ingredients are >>>>>>>>>>>> provided. >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a >>>>>>>>>>>> KafkaStreamsBrancher >>>>>>>>>>>> instance >>>>>>>>>>>> >>>>>>>>> contrasts the >>>>>>>>>>>> >>>>>>>>>>>>>>>>> fluency of other KStream methods. >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes) >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream >>>>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...) >>>>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...) >>>>>>>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or >>>>>>>>>>>> noDefault(). Both >>>>>>>>>>>> >>>>>>>>> defaultBranch(..) >>>>>>>>>>>> >>>>>>>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>>>>>> noDefault() return void >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams >>>>>>>>>>>> interface >>>>>>>>>>> is >>>>>>>>>>>> >>>>>> defined. >>>>>>>>>>>> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal >>>>>>>>>>>> methods >>>>>>>>>>>> >>>>>>>> (defaultBranch(ks->) >>>>>>>>>>>> >>>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very >>>>>>>>>>>> easy to >>>>>>>>>>>> miss the >>>>>>>>>>>> >>>>>> fact >>>>>>>>>>>> >>>>>>>>> that one >>>>>>>>>>>> >>>>>>>>>>>>>>>>> of the terminal methods should be called. >>>>>>>>>>>> If these >>>>>>>>>>>> methods >>>>>>>>>>>> >>>>>> are >>>>>>>>>>>> >>>>>>>> not >>>>>>>>>>>> >>>>>>>>>>>>>>>>> called, we can throw an exception in >>>>>>>>>>>> runtime. >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can >>>>>>>>>>>> we do >>>>>>>>>>> better? >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Paul, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking >>>>>>>>>>>> about >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default.. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot >>>>>>>>>>>> not be >>>>>>>>>>>> implemented the >>>>>>>>>>>> >>>>>>> easy >>>>>>>>>>>> >>>>>>>>> way. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Maybe we all should think further. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal >>>>>>>>>>>> method that >>>>>>>>>>> assumes >>>>>>>>>>>> >>>>> nothing >>>>>>>>>>>> >>>>>>>> will >>>>>>>>>>>> >>>>>>>>>>>>> reach >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> the default branch, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case >>>>>>>>>>>> occurs. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be >>>>>>>>>>>> the only >>>>>>>>>>> option >>>>>>>>>>>> >>>>>> besides >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios >>>>>>>>>>>> when we >>>>>>>>>>>> want to >>>>>>>>>>>> >>>>>> just >>>>>>>>>>>> >>>>>>>>> silently >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any >>>>>>>>>>>> predicate. 2) >>>>>>>>>>>> >>>>>>> Throwing >>>>>>>>>>>> >>>>>>>>> an >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow >>>>>>>>>>>> processing >>>>>>>>>>>> looks >>>>>>>>>>>> >>>>>> like a >>>>>>>>>>>> >>>>>>>> bad >>>>>>>>>>>> >>>>>>>>>>>>> idea. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would >>>>>>>>>>>> prefer to >>>>>>>>>>>> emit a >>>>>>>>>>>> >>>>>>>> special >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is >>>>>>>>>>>> exactly >>>>>>>>>>> where >>>>>>>>>>>> >>>>>>> `default` >>>>>>>>>>>> >>>>>>>>> can >>>>>>>>>>>> >>>>>>>>>>>>> be >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> used. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>>>>>>>>> >>>>> InternalTopologyBuilder >>>>>>>>>>>> >>>>>>> to >>>>>>>>>>>> >>>>>>>>> track >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated >>>>>>>>>>>> and raise >>>>>>>>>>>> a clear >>>>>>>>>>>> >>>>>>> error >>>>>>>>>>>> >>>>>>>>>>>>> before it >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> becomes an issue. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the >>>>>>>>>>>> program is >>>>>>>>>>>> >>>>> compiled >>>>>>>>>>>> >>>>>>> and >>>>>>>>>>>> >>>>>>>>> run? >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply >>>>>>>>>>>> won't >>>>>>>>>>>> compile if >>>>>>>>>>>> >>>>> used >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an >>>>>>>>>>>> API as a >>>>>>>>>>>> method chain >>>>>>>>>>>> >>>>>>>> starting >>>>>>>>>>>> >>>>>>>>>>>>> from >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost >>>>>>>>>>>> difference >>>>>>>>>>>> between >>>>>>>>>>>> >>>>>>>> runtime >>>>>>>>>>>> >>>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure >>>>>>>>>>>> uncovers >>>>>>>>>>>> >>>>> instantly >>>>>>>>>>>> >>>>>> on >>>>>>>>>>>> >>>>>>>>> unit >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project >>>>>>>>>>>> than a >>>>>>>>>>>> compilation >>>>>>>>>>>> >>>>>>>> failure. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal >>>>>>>>>>>> operation being >>>>>>>>>>>> required. >>>>>>>>>>>> >>>>>>> But >>>>>>>>>>>> >>>>>>>> is >>>>>>>>>>>> >>>>>>>>>>>>> that >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> really >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't >>>>>>>>>>>> want a >>>>>>>>>>>> >>>>>> defaultBranch >>>>>>>>>>>> >>>>>>>>> they >>>>>>>>>>>> >>>>>>>>>>>>> can >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> call >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> some other terminal method >>>>>>>>>>>> (noDefaultBranch()?) >>>>>>>>>>>> just as >>>>>>>>>>>> >>>>>>>>> easily. In >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> fact I >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a >>>>>>>>>>>> nicer API >>>>>>>>>>> - a >>>>>>>>>>>> >>>>> user >>>>>>>>>>>> >>>>>>>> could >>>>>>>>>>>> >>>>>>>>>>>>> specify >>>>>>>>>>>> >>>>>>>>>>>>>>>>> a >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing >>>>>>>>>>>> will reach >>>>>>>>>>> the >>>>>>>>>>>> >>>>>> default >>>>>>>>>>>> >>>>>>>>> branch, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case >>>>>>>>>>>> occurs. >>>>>>>>>>> That >>>>>>>>>>>> >>>>> seems >>>>>>>>>>>> >>>>>>> like >>>>>>>>>>>> >>>>>>>>> an >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() >>>>>>>>>>>> API, >>>>>>>>>>>> which allows >>>>>>>>>>>> >>>>>> for >>>>>>>>>>>> >>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>> more >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> subtle >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly >>>>>>>>>>>> getting >>>>>>>>>>> dropped. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation >>>>>>>>>>>> certainly has >>>>>>>>>>>> to be >>>>>>>>>>>> >>>>>> well >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> documented, but >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>>>>>>>>> >>>>> InternalTopologyBuilder >>>>>>>>>>>> >>>>>>> to >>>>>>>>>>>> >>>>>>>>> track >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated >>>>>>>>>>>> and raise >>>>>>>>>>>> a clear >>>>>>>>>>>> >>>>>>> error >>>>>>>>>>>> >>>>>>>>>>>>> before it >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that >>>>>>>>>>>> there is >>>>>>>>>>> a >>>>>>>>>>>> >>>>> "build >>>>>>>>>>>> >>>>>>>> step" >>>>>>>>>>>> >>>>>>>>>>>>> where >>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when >>>>>>>>>>>> >>>>>> StreamsBuilder.build() >>>>>>>>>>>> >>>>>>> is >>>>>>>>>>>> >>>>>>>>>>>>> called. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its >>>>>>>>>>>> argument, I >>>>>>>>>>> agree >>>>>>>>>>>> >>>>> that >>>>>>>>>>>> >>>>>>> it's >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> critical to >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on >>>>>>>>>>>> the input >>>>>>>>>>>> stream. >>>>>>>>>>>> >>>>>>> With >>>>>>>>>>>> >>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>>>>>> fluent >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same >>>>>>>>>>>> way all >>>>>>>>>>> other >>>>>>>>>>>> >>>>>>> operations >>>>>>>>>>>> >>>>>>>>> do - >>>>>>>>>>>> >>>>>>>>>>>>> if >>>>>>>>>>>> >>>>>>>>>>>>>>>>> you >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> want to process off the original >>>>>>>>>>>> KStream >>>>>>>>>>> multiple >>>>>>>>>>>> >>>>> times, >>>>>>>>>>>> >>>>>>> you >>>>>>>>>>>> >>>>>>>>> just >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> need the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call >>>>>>>>>>>> as many >>>>>>>>>>>> operations >>>>>>>>>>>> >>>>>> on >>>>>>>>>>>> >>>>>>> it >>>>>>>>>>>> >>>>>>>>> as >>>>>>>>>>>> >>>>>>>>>>>>> you >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> desire. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thoughts? >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan >>>>>>>>>>>> Ponomarev < >>>>>>>>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we >>>>>>>>>>>> do not >>>>>>>>>>>> always need >>>>>>>>>>>> >>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal >>>>>>>>>>> operation we >>>>>>>>>>>> >>>>> don't >>>>>>>>>>>> >>>>>>>> know >>>>>>>>>>>> >>>>>>>>>>>>> when to >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch >>>>>>>>>>>> switch'. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its >>>>>>>>>>>> argument, >>>>>>>>>>>> so we >>>>>>>>>>>> >>>>> can >>>>>>>>>>>> >>>>>> do >>>>>>>>>>>> >>>>>>>>>>>>> something >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> more with the original branch after >>>>>>>>>>>> branching. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the >>>>>>>>>>>> need of >>>>>>>>>>> special >>>>>>>>>>>> >>>>> object >>>>>>>>>>>> >>>>>>>>>>>>> construction >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream >>>>>>>>>>>> methods. >>>>>>>>>>> But >>>>>>>>>>>> >>>>> here >>>>>>>>>>>> >>>>>> we >>>>>>>>>>>> >>>>>>>>> have a >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to >>>>>>>>>>>> split the >>>>>>>>>>>> flow, >>>>>>>>>>>> >>>>> so >>>>>>>>>>>> >>>>>> I >>>>>>>>>>>> >>>>>>>>> think >>>>>>>>>>>> >>>>>>>>>>>>> this >>>>>>>>>>>> >>>>>>>>>>>>>>>>> is >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> still idiomatic. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve >>>>>>>>>>>> this >>>>>>>>>>>> API, but I >>>>>>>>>>>> >>>>>> find >>>>>>>>>>>> >>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> onTopOff() >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing >>>>>>>>>>>> since it >>>>>>>>>>>> contrasts the >>>>>>>>>>>> >>>>>>> fluency >>>>>>>>>>>> >>>>>>>>> of >>>>>>>>>>>> >>>>>>>>>>>>> other >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd >>>>>>>>>>>> like to >>>>>>>>>>>> just call >>>>>>>>>>>> >>>>> a >>>>>>>>>>>> >>>>>>>>> method on >>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> stream >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if >>>>>>>>>>>> the branch >>>>>>>>>>>> cases >>>>>>>>>>>> >>>>> are >>>>>>>>>>>> >>>>>>>>> defined >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> fluently. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, >>>>>>>>>>>> handleCase) >>>>>>>>>>>> is very >>>>>>>>>>>> >>>>>> nice >>>>>>>>>>>> >>>>>>>>> and the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> right >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> way >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped >>>>>>>>>>>> around >>>>>>>>>>>> how we >>>>>>>>>>>> >>>>>>> specify >>>>>>>>>>>> >>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>> source >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Like: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch() >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>>>>>>>>> this::handle1) >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>>>>>>>>> this::handle2) >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> .defaultBranch(this::handleDefault); >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a >>>>>>>>>>>> KBranchedStreams or >>>>>>>>>>>> >>>>>>>> KStreamBrancher >>>>>>>>>>>> >>>>>>>>> or >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> something, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and >>>>>>>>>>>> terminated by >>>>>>>>>>>> >>>>>>>>>>>>> defaultBranch() >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> (which >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously >>>>>>>>>>>> incompatible with >>>>>>>>>>>> >>>>> the >>>>>>>>>>>> >>>>>>>>> current >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> API, so >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to >>>>>>>>>>>> have a >>>>>>>>>>>> different >>>>>>>>>>>> >>>>>> name, >>>>>>>>>>>> >>>>>>>> but >>>>>>>>>>>> >>>>>>>>> that >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> seems >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we >>>>>>>>>>>> could call it >>>>>>>>>>>> >>>>>> something >>>>>>>>>>>> >>>>>>>> like >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> branched() >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the >>>>>>>>>>>> old API. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of >>>>>>>>>>>> your >>>>>>>>>>>> KIP? It >>>>>>>>>>>> >>>>>> seems >>>>>>>>>>>> >>>>>>>>> like it >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> does to >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line >>>>>>>>>>>> branching >>>>>>>>>>>> while also >>>>>>>>>>>> >>>>>>>> allowing >>>>>>>>>>>> >>>>>>>>> you >>>>>>>>>>>> >>>>>>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of >>>>>>>>>>>> KBranchedStreams >>>>>>>>>>>> >>>>>> if >>>>>>>>>>>> >>>>>>>>> desired. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan >>>>>>>>>>>> Ponomarev >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply! >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void >>>>>>>>>>>> handleFirstCase(KStream<String, String> >>>>>>>>>>>> ks){ >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> ks.filter(....).mapValues(...) >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void >>>>>>>>>>>> handleSecondCase(KStream<String, >>>>>>>>>>>> String> ks){ >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> ks.selectKey(...).groupByKey()... >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ...... >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, >>>>>>>>>>>> String>() >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>>>>>>>>>> this::handleFirstCase) >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>>>>>>>>>> this::handleSecondCase) >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....) >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the >>>>>>>>>>> KafkaStreamsBrancher >>>>>>>>>>>> >>>>> takes a >>>>>>>>>>>> >>>>>>>>> Consumer >>>>>>>>>>>> >>>>>>>>>>>>> as a >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> second >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, >>>>>>>>>>>> and the >>>>>>>>>>>> example in >>>>>>>>>>>> >>>>>> the >>>>>>>>>>>> >>>>>>>> KIP >>>>>>>>>>>> >>>>>>>>>>>>> shows >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> each >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a >>>>>>>>>>>> terminal node >>>>>>>>>>>> >>>>>>>>> (KafkaStreams#to() >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in this >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> case). >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but >>>>>>>>>>>> how would >>>>>>>>>>> we >>>>>>>>>>>> >>>>> handle >>>>>>>>>>>> >>>>>>> the >>>>>>>>>>>> >>>>>>>>> case >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> where the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but >>>>>>>>>>>> wants to >>>>>>>>>>> continue >>>>>>>>>>>> >>>>>>>> processing >>>>>>>>>>>> >>>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>> not >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on >>>>>>>>>>>> the >>>>>>>>>>> branched >>>>>>>>>>>> >>>>>> stream >>>>>>>>>>>> >>>>>>>>>>>>> immediately? >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic >>>>>>>>>>>> as is if >>>>>>>>>>>> we had >>>>>>>>>>>> >>>>>>>> something >>>>>>>>>>>> >>>>>>>>> like >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] >>>>>>>>>>>> branches = >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> predicate2); >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> branches[0].filter(....).mapValues(...).. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM >>>>>>>>>>>> Bill Bejeck >>>>>>>>>>> < >>>>>>>>>>>> >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the >>>>>>>>>>>> discussion for >>>>>>>>>>> KIP- >>>>>>>>>>>> >>>>> 418. >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message: >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion >>>>>>>>>>>> about >>>>>>>>>>> KIP-418. >>>>>>>>>>>> >>>>> Please >>>>>>>>>>>> >>>>>>>> take >>>>>>>>>>>> >>>>>>>>> a >>>>>>>>>>>> >>>>>>>>>>>>> look >>>>>>>>>>>> >>>>>>>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would >>>>>>>>>>>> appreciate any >>>>>>>>>>>> feedback :) >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418: >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164: >>>>>>>>>>>> >>>>> https://github.com/apache/kafka/pull/6164 >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >>
signature.asc
Description: OpenPGP digital signature