Hi Ivan, It looks like you missed my reply on Apr 23rd. I think it’s close, but I had a few last comments.
Thanks, John On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote: > Hello everyone, > > will someone please take a look at the reworked KIP? > > I believe that now it follows design principles and takes into account > all the arguments discussed here. > > > Regards, > > Ivan > > > 23.04.2020 2:45, Ivan Ponomarev пишет: > > Hi, > > > > I have read the John's "DSL design principles" and have completely > > rewritten the KIP, see > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream > > > > > > > > > > This version includes all the previous discussion results and follows > > the design principles, with one exception. > > > > The exception is > > > > branch(Predicate<K,V> predicate, Branched<K,V> branched) > > > > which formally violates 'no more than one parameter' rule, but I think > > here it is justified. > > > > We must provide a predicate for a branch and don't need to provide one > > for the default branch. Thus for both operations we may use a single > > Branched parameter class, with an extra method parameter for `branch`. > > > > Since predicate is a natural, necessary part of a branch, no > > 'proliferation of overloads, deprecations, etc.' is expected here as it > > is said in the rationale for the 'single parameter rule'. > > > > WDYT, is this KIP mature enough to begin voting? > > > > Regards, > > > > Ivan > > > > 21.04.2020 2:09, Matthias J. Sax пишет: > >> Ivan, > >> > >> no worries about getting side tracked. Glad to have you back! > >> > >> The DSL improved further in the meantime and we already have a `Named` > >> config object to name operators. It seems reasonable to me to build on > >> this. > >> > >> Furthermore, John did a writeup about "DSL design principles" that we > >> want to follow: > >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > >> > >> > >> -- might be worth to checkout. > >> > >> > >> -Matthias > >> > >> > >> On 4/17/20 4:30 PM, Ivan Ponomarev wrote: > >>> Hi everyone! > >>> > >>> Let me revive the discussion of this KIP. > >>> > >>> I'm very sorry for stopping my participation in the discussion in June > >>> 2019. My project work was very intensive then and it didn't leave me > >>> spare time. But I think I must finish this, because we invested > >>> substantial effort into this discussion and I'm not feel entitled to > >>> propose other things before this one is finalized. > >>> > >>> During these months I proceeded with writing and reviewing Kafka > >>> Streams-related code. Every time I needed branching, Spring-Kafka's > >>> KafkaStreamBrancher class of my invention (the original idea for this > >>> KIP) worked for me -- that's another reason why I gave up pushing the > >>> KIP forward. When I was coming across the problem with the scope of > >>> branches, I worked around it this way: > >>> > >>> AtomicReference<KStream<...>> result = new AtomicReference<>(); > >>> new KafkaStreamBrancher<....>() > >>> .branch(....) > >>> .defaultBranch(result::set) > >>> .onTopOf(someStream); > >>> result.get()... > >>> > >>> > >>> And yes, of course I don't feel very happy with this approach. > >>> > >>> I think that Matthias came up with a bright solution in his post from > >>> May, 24th 2019. Let me quote it: > >>> > >>> KStream#split() -> KBranchedStream > >>> // branch is not easily accessible in current scope > >>> KBranchedStream#branch(Predicate, Consumer<KStream>) > >>> -> KBranchedStream > >>> // assign a name to the branch and > >>> // return the sub-stream to the current scope later > >>> // > >>> // can be simple as `#branch(p, s->s, "name")` > >>> // or also complex as `#branch(p, s->s.filter(...), "name")` > >>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String) > >>> -> KBranchedStream > >>> // default branch is not easily accessible > >>> // return map of all named sub-stream into current scope > >>> KBranchedStream#default(Cosumer<KStream>) > >>> -> Map<String,KStream> > >>> // assign custom name to default-branch > >>> // return map of all named sub-stream into current scope > >>> KBranchedStream#default(Function<KStream,KStream>, String) > >>> -> Map<String,KStream> > >>> // assign a default name for default > >>> // return map of all named sub-stream into current scope > >>> KBranchedStream#defaultBranch(Function<KStream,KStream>) > >>> -> Map<String,KStream> > >>> // return map of all names sub-stream into current scope > >>> KBranchedStream#noDefaultBranch() > >>> -> Map<String,KStream> > >>> > >>> I believe this would satisfy everyone. Optional names seems to be a good > >>> idea: when you don't need to have the branches in the same scope, you > >>> just don't use names and you don't risk making your code brittle. Or, > >>> you might want to add names just for debugging purposes. Or, finally, > >>> you might use the returned Map to have the named branches in the > >>> original scope. > >>> > >>> There also was an input from John Roesler on June 4th, 2019, who > >>> suggested using Named class. I can't comment on this. The idea seems > >>> reasonable, but in this matter I'd rather trust people who are more > >>> familiar with Streams API design principles than me. > >>> > >>> Regards, > >>> > >>> Ivan > >>> > >>> > >>> > >>> 08.10.2019 1:38, Matthias J. Sax пишет: > >>>> I am moving this KIP into "inactive status". Feel free to resume the > >>>> KIP > >>>> at any point. > >>>> > >>>> If anybody else is interested in picking up this KIP, feel free to > >>>> do so. > >>>> > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 7/11/19 4:00 PM, Matthias J. Sax wrote: > >>>>> Ivan, > >>>>> > >>>>> did you see my last reply? What do you think about my proposal to mix > >>>>> both approaches and try to get best-of-both worlds? > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 6/11/19 3:56 PM, Matthias J. Sax wrote: > >>>>>> Thanks for the input John! > >>>>>> > >>>>>>> under your suggestion, it seems that the name is required > >>>>>> > >>>>>> If you want to get the `KStream` as part of the `Map` back using a > >>>>>> `Function`, yes. If you follow the "embedded chaining" pattern > >>>>>> using a > >>>>>> `Consumer`, no. > >>>>>> > >>>>>> Allowing for a default name via `split()` can of course be done. > >>>>>> Similarly, using `Named` instead of `String` is possible. > >>>>>> > >>>>>> I wanted to sketch out a high level proposal to merge both patterns > >>>>>> only. Your suggestions to align the new API with the existing API > >>>>>> make > >>>>>> totally sense. > >>>>>> > >>>>>> > >>>>>> > >>>>>> One follow up question: Would `Named` be optional or required in > >>>>>> `split()` and `branch()`? It's unclear from your example. > >>>>>> > >>>>>> If both are mandatory, what do we gain by it? The returned `Map` only > >>>>>> contains the corresponding branches, so why should we prefix all of > >>>>>> them? If only `Named` is mandatory in `branch()`, but optional in > >>>>>> `split()`, the same question raises? > >>>>>> > >>>>>> Requiring `Named` in `split()` seems only to make sense, if > >>>>>> `Named` is > >>>>>> optional in `branch()` and we generate `-X` suffix using a counter > >>>>>> for > >>>>>> different branch name. However, this might lead to the problem of > >>>>>> changing names if branches are added/removed. Also, how would the > >>>>>> names > >>>>>> be generated if `Consumer` is mixed in (ie, not all branches are > >>>>>> returned in the `Map`). > >>>>>> > >>>>>> If `Named` is optional for both, it could happen that a user > >>>>>> misses to > >>>>>> specify a name for a branch what would lead to runtime issues. > >>>>>> > >>>>>> > >>>>>> Hence, I am actually in favor to not allow a default name but keep > >>>>>> `split()` without parameter and make `Named` in `branch()` required > >>>>>> if a > >>>>>> `Function` is used. This makes it explicit to the user that > >>>>>> specifying a > >>>>>> name is required if a `Function` is used. > >>>>>> > >>>>>> > >>>>>> > >>>>>> About > >>>>>> > >>>>>>> KBranchedStream#branch(BranchConfig) > >>>>>> > >>>>>> I don't think that the branching predicate is a configuration and > >>>>>> hence > >>>>>> would not include it in a configuration object. > >>>>>> > >>>>>>> withChain(...); > >>>>>> > >>>>>> Similar, `withChain()` (that would only take a `Consumer`?) does not > >>>>>> seem to be a configuration. We can also not prevent a user to call > >>>>>> `withName()` in combination of `withChain()` what does not make sense > >>>>>> IMHO. We could of course throw an RTE but not have a compile time > >>>>>> check > >>>>>> seems less appealing. Also, it could happen that neither > >>>>>> `withChain()` > >>>>>> not `withName()` is called and the branch is missing in the returned > >>>>>> `Map` what lead to runtime issues, too. > >>>>>> > >>>>>> Hence, I don't think that we should add `BranchConfig`. A config > >>>>>> object > >>>>>> is helpful if each configuration can be set independently of all > >>>>>> others, > >>>>>> but this seems not to be the case here. If we add new configuration > >>>>>> later, we can also just move forward by deprecating the methods that > >>>>>> accept `Named` and add new methods that accepted `BranchConfig` (that > >>>>>> would of course implement `Named`). > >>>>>> > >>>>>> > >>>>>> Thoughts? > >>>>>> > >>>>>> > >>>>>> @Ivan, what do you think about the general idea to blend the two main > >>>>>> approaches of returning a `Map` plus an "embedded chaining"? > >>>>>> > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 6/4/19 10:33 AM, John Roesler wrote: > >>>>>>> Thanks for the idea, Matthias, it does seem like this would satisfy > >>>>>>> everyone. Returning the map from the terminal operations also solves > >>>>>>> the problem of merging/joining the branched streams, if we want > >>>>>>> to add > >>>>>>> support for the compliment later on. > >>>>>>> > >>>>>>> Under your suggestion, it seems that the name is required. > >>>>>>> Otherwise, > >>>>>>> we wouldn't have keys for the map to return. I this this is actually > >>>>>>> not too bad, since experience has taught us that, although names for > >>>>>>> operations are not required to define stream processing logic, it > >>>>>>> does > >>>>>>> significantly improve the operational experience when you can map > >>>>>>> the > >>>>>>> topology, logs, metrics, etc. back to the source code. Since you > >>>>>>> wouldn't (have to) reference the name to chain extra processing onto > >>>>>>> the branch (thanks to the second argument), you can avoid the > >>>>>>> "unchecked name" problem that Ivan pointed out. > >>>>>>> > >>>>>>> In the current implementation of Branch, you can name the branch > >>>>>>> operator itself, and then all the branches get index-suffixed names > >>>>>>> built from the branch operator name. I guess under this proposal, we > >>>>>>> could naturally append the branch name to the branching operator > >>>>>>> name, > >>>>>>> like this: > >>>>>>> > >>>>>>> stream.split(Named.withName("mysplit")) //creates node > >>>>>>> "mysplit" > >>>>>>> .branch(..., ..., "abranch") // creates node > >>>>>>> "mysplit-abranch" > >>>>>>> .defaultBranch(...) // creates node > >>>>>>> "mysplit-default" > >>>>>>> > >>>>>>> It does make me wonder about the DSL syntax itself, though. > >>>>>>> > >>>>>>> We don't have a defined grammar, so there's plenty of room to debate > >>>>>>> the "best" syntax in the context of each operation, but in general, > >>>>>>> the KStream DSL operators follow this pattern: > >>>>>>> > >>>>>>> operator(function, config_object?) OR operator(config_object) > >>>>>>> > >>>>>>> where config_object is often just Named in the "function" variant. > >>>>>>> Even when the config_object isn't a Named, but some other config > >>>>>>> class, that config class _always_ implements NamedOperation. > >>>>>>> > >>>>>>> Here, we're introducing a totally different pattern: > >>>>>>> > >>>>>>> operator(function, function, string) > >>>>>>> > >>>>>>> where the string is the name. > >>>>>>> My first question is whether the name should instead be specified > >>>>>>> with > >>>>>>> the NamedOperation interface. > >>>>>>> > >>>>>>> My second question is whether we should just roll all these > >>>>>>> arguments > >>>>>>> up into a config object like: > >>>>>>> > >>>>>>> KBranchedStream#branch(BranchConfig) > >>>>>>> > >>>>>>> interface BranchConfig extends NamedOperation { > >>>>>>> withPredicate(...); > >>>>>>> withChain(...); > >>>>>>> withName(...); > >>>>>>> } > >>>>>>> > >>>>>>> Although I guess we'd like to call BranchConfig something more like > >>>>>>> "Branched", even if I don't particularly like that pattern. > >>>>>>> > >>>>>>> This makes the source code a little noisier, but it also makes us > >>>>>>> more > >>>>>>> future-proof, as we can deal with a wide range of alternatives > >>>>>>> purely > >>>>>>> in the config interface, and never have to deal with adding > >>>>>>> overloads > >>>>>>> to the KBranchedStream if/when we decide we want the name to be > >>>>>>> optional, or the KStream->KStream to be optional. > >>>>>>> > >>>>>>> WDYT? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> -John > >>>>>>> > >>>>>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis > >>>>>>> <michael.droga...@confluent.io> wrote: > >>>>>>>> > >>>>>>>> Matthias: I think that's pretty reasonable from my point of view. > >>>>>>>> Good > >>>>>>>> suggestion. > >>>>>>>> > >>>>>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax > >>>>>>>> <matth...@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Interesting discussion. > >>>>>>>>> > >>>>>>>>> I am wondering, if we cannot unify the advantage of both > >>>>>>>>> approaches: > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> KStream#split() -> KBranchedStream > >>>>>>>>> > >>>>>>>>> // branch is not easily accessible in current scope > >>>>>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>) > >>>>>>>>> -> KBranchedStream > >>>>>>>>> > >>>>>>>>> // assign a name to the branch and > >>>>>>>>> // return the sub-stream to the current scope later > >>>>>>>>> // > >>>>>>>>> // can be simple as `#branch(p, s->s, "name")` > >>>>>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")` > >>>>>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, > >>>>>>>>> String) > >>>>>>>>> -> KBranchedStream > >>>>>>>>> > >>>>>>>>> // default branch is not easily accessible > >>>>>>>>> // return map of all named sub-stream into current scope > >>>>>>>>> KBranchedStream#default(Cosumer<KStream>) > >>>>>>>>> -> Map<String,KStream> > >>>>>>>>> > >>>>>>>>> // assign custom name to default-branch > >>>>>>>>> // return map of all named sub-stream into current scope > >>>>>>>>> KBranchedStream#default(Function<KStream,KStream>, String) > >>>>>>>>> -> Map<String,KStream> > >>>>>>>>> > >>>>>>>>> // assign a default name for default > >>>>>>>>> // return map of all named sub-stream into current scope > >>>>>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>) > >>>>>>>>> -> Map<String,KStream> > >>>>>>>>> > >>>>>>>>> // return map of all names sub-stream into current scope > >>>>>>>>> KBranchedStream#noDefaultBranch() > >>>>>>>>> -> Map<String,KStream> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Hence, for each sub-stream, the user can pick to add a name and > >>>>>>>>> return > >>>>>>>>> the branch "result" to the calling scope or not. The > >>>>>>>>> implementation can > >>>>>>>>> also check at runtime that all returned names are unique. The > >>>>>>>>> returned > >>>>>>>>> Map can be empty and it's also optional to use the Map. > >>>>>>>>> > >>>>>>>>> To me, it seems like a good way to get best of both worlds. > >>>>>>>>> > >>>>>>>>> Thoughts? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 5/6/19 5:15 PM, John Roesler wrote: > >>>>>>>>>> Ivan, > >>>>>>>>>> > >>>>>>>>>> That's a very good point about the "start" operator in the > >>>>>>>>>> dynamic case. > >>>>>>>>>> I had no problem with "split()"; I was just questioning the > >>>>>>>>>> necessity. > >>>>>>>>>> Since you've provided a proof of necessity, I'm in favor of the > >>>>>>>>>> "split()" start operator. Thanks! > >>>>>>>>>> > >>>>>>>>>> Separately, I'm interested to see where the present discussion > >>>>>>>>>> leads. > >>>>>>>>>> I've written enough Javascript code in my life to be > >>>>>>>>>> suspicious of > >>>>>>>>>> nested closures. You have a good point about using method > >>>>>>>>>> references (or > >>>>>>>>>> indeed function literals also work). It should be validating > >>>>>>>>>> that this > >>>>>>>>>> was also the JS community's first approach to flattening the > >>>>>>>>>> logic when > >>>>>>>>>> their nested closure situation got out of hand. Unfortunately, > >>>>>>>>>> it's > >>>>>>>>>> replacing nesting with redirection, both of which disrupt code > >>>>>>>>>> readability (but in different ways for different reasons). In > >>>>>>>>>> other > >>>>>>>>>> words, I agree that function references is *the* first-order > >>>>>>>>>> solution if > >>>>>>>>>> the nested code does indeed become a problem. > >>>>>>>>>> > >>>>>>>>>> However, the history of JS also tells us that function > >>>>>>>>>> references aren't > >>>>>>>>>> the end of the story either, and you can see that by observing > >>>>>>>>>> that > >>>>>>>>>> there have been two follow-on eras, as they continue trying to > >>>>>>>>>> cope with > >>>>>>>>>> the consequences of living in such a callback-heavy language. > >>>>>>>>>> First, you > >>>>>>>>>> have Futures/Promises, which essentially let you convert nested > >>>>>>>>>> code to > >>>>>>>>>> method-chained code (Observables/FP is a popular variation on > >>>>>>>>>> this). > >>>>>>>>>> Most lately, you have async/await, which is an effort to apply > >>>>>>>>>> language > >>>>>>>>>> (not just API) syntax to the problem, and offer the "flattest" > >>>>>>>>>> possible > >>>>>>>>>> programming style to solve the problem (because you get back to > >>>>>>>>>> just one > >>>>>>>>>> code block per functional unit). > >>>>>>>>>> > >>>>>>>>>> Stream-processing is a different domain, and Java+KStreams is > >>>>>>>>>> nowhere > >>>>>>>>>> near as callback heavy as JS, so I don't think we have to take > >>>>>>>>>> the JS > >>>>>>>>>> story for granted, but then again, I think we can derive some > >>>>>>>>>> valuable > >>>>>>>>>> lessons by looking sideways to adjacent domains. I'm just > >>>>>>>>>> bringing this > >>>>>>>>>> up to inspire further/deeper discussion. At the same time, just > >>>>>>>>>> like JS, > >>>>>>>>>> we can afford to take an iterative approach to the problem. > >>>>>>>>>> > >>>>>>>>>> Separately again, I'm interested in the post-branch merge (and > >>>>>>>>>> I'd also > >>>>>>>>>> add join) problem that Paul brought up. We can clearly punt on > >>>>>>>>>> it, by > >>>>>>>>>> terminating the nested branches with sink operators. But is > >>>>>>>>>> there a DSL > >>>>>>>>>> way to do it? > >>>>>>>>>> > >>>>>>>>>> Thanks again for your driving this, > >>>>>>>>>> -John > >>>>>>>>>> > >>>>>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com > >>>>>>>>>> <mailto:pgwha...@gmail.com>> wrote: > >>>>>>>>>> > >>>>>>>>>> Ivan, I’ll definitely forfeit my point on the clumsiness of > >>>>>>>>>> the > >>>>>>>>>> branch(predicate, consumer) solution, I don’t see any real > >>>>>>>>>> drawbacks > >>>>>>>>>> for the dynamic case. > >>>>>>>>>> > >>>>>>>>>> IMO the one trade off to consider at this point is the > >>>>>>>>>> scope > >>>>>>>>>> question. I don’t know if I totally agree that “we rarely > >>>>>>>>>> need them > >>>>>>>>>> in the same scope” since merging the branches back together > >>>>>>>>>> later > >>>>>>>>>> seems like a perfectly plausible use case that can be a lot > >>>>>>>>>> nicer > >>>>>>>>>> when the branched streams are in the same scope. That being > >>>>>>>>>> said, > >>>>>>>>>> for the reasons Ivan listed, I think it is overall the > >>>>>>>>>> better > >>>>>>>>>> solution - working around the scope thing is easy enough if > >>>>>>>>>> you need > >>>>>>>>>> to. > >>>>>>>>>> > >>>>>>>>>> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev > >>>>>>>>>> <iponoma...@mail.ru.invalid> wrote: > >>>>>>>>>> > > >>>>>>>>>> > Hello everyone, thank you all for joining the discussion! > >>>>>>>>>> > > >>>>>>>>>> > Well, I don't think the idea of named branches, be it a > >>>>>>>>>> LinkedHashMap (no other Map will do, because order of > >>>>>>>>>> definition > >>>>>>>>>> matters) or `branch` method taking name and Consumer > >>>>>>>>>> has more > >>>>>>>>>> advantages than drawbacks. > >>>>>>>>>> > > >>>>>>>>>> > In my opinion, the only real positive outcome from > >>>>>>>>>> Michael's > >>>>>>>>>> proposal is that all the returned branches are in the same > >>>>>>>>>> scope. > >>>>>>>>>> But 1) we rarely need them in the same scope 2) there is a > >>>>>>>>>> workaround for the scope problem, described in the KIP. > >>>>>>>>>> > > >>>>>>>>>> > 'Inlining the complex logic' is not a problem, because we > >>>>>>>>>> can use > >>>>>>>>>> method references instead of lambdas. In real world > >>>>>>>>>> scenarios you > >>>>>>>>>> tend to split the complex logic to methods anyway, so the > >>>>>>>>>> code is > >>>>>>>>>> going to be clean. > >>>>>>>>>> > > >>>>>>>>>> > The drawbacks are strong. The cohesion between predicates > >>>>>>>>>> and > >>>>>>>>>> handlers is lost. We have to define predicates in one > >>>>>>>>>> place, and > >>>>>>>>>> handlers in another. This opens the door for bugs: > >>>>>>>>>> > > >>>>>>>>>> > - what if we forget to define a handler for a name? or a > >>>>>>>>>> name for > >>>>>>>>>> a handler? > >>>>>>>>>> > - what if we misspell a name? > >>>>>>>>>> > - what if we copy-paste and duplicate a name? > >>>>>>>>>> > > >>>>>>>>>> > What Michael propose would have been totally OK if we had > >>>>>>>>>> been > >>>>>>>>>> writing the API in Lua, Ruby or Python. In those > >>>>>>>>>> languages the > >>>>>>>>>> "dynamic naming" approach would have looked most concise > >>>>>>>>>> and > >>>>>>>>>> beautiful. But in Java we expect all the problems > >>>>>>>>>> related to > >>>>>>>>>> identifiers to be eliminated in compile time. > >>>>>>>>>> > > >>>>>>>>>> > Do we have to invent duck-typing for the Java API? > >>>>>>>>>> > > >>>>>>>>>> > And if we do, what advantage are we supposed to get > >>>>>>>>>> besides having > >>>>>>>>>> all the branches in the same scope? Michael, maybe I'm > >>>>>>>>>> missing your > >>>>>>>>>> point? > >>>>>>>>>> > > >>>>>>>>>> > --- > >>>>>>>>>> > > >>>>>>>>>> > Earlier in this discussion John Roesler also proposed > >>>>>>>>>> to do > >>>>>>>>>> without "start branching" operator, and later Paul > >>>>>>>>>> mentioned that in > >>>>>>>>>> the case when we have to add a dynamic number of > >>>>>>>>>> branches, the > >>>>>>>>>> current KIP is 'clumsier' compared to Michael's 'Map' > >>>>>>>>>> solution. Let > >>>>>>>>>> me address both comments here. > >>>>>>>>>> > > >>>>>>>>>> > 1) "Start branching" operator (I think that *split* is a > >>>>>>>>>> good name > >>>>>>>>>> for it indeed) is critical when we need to do a dynamic > >>>>>>>>>> branching, > >>>>>>>>>> see example below. > >>>>>>>>>> > > >>>>>>>>>> > 2) No, dynamic branching in current KIP is not clumsy at > >>>>>>>>>> all. > >>>>>>>>>> Imagine a real-world scenario when you need one branch per > >>>>>>>>>> enum > >>>>>>>>>> value (say, RecordType). You can have something like this: > >>>>>>>>>> > > >>>>>>>>>> > /*John:if we had to start with stream.branch(...) here, > >>>>>>>>>> it would > >>>>>>>>>> have been much messier.*/ > >>>>>>>>>> > KBranchedStream branched = stream.split(); > >>>>>>>>>> > > >>>>>>>>>> > /*Not clumsy at all :-)*/ > >>>>>>>>>> > for (RecordType recordType : RecordType.values()) > >>>>>>>>>> > branched = branched.branch((k, v) -> > >>>>>>>>>> v.getRecType() == > >>>>>>>>>> recordType, > >>>>>>>>>> > recordType::processRecords); > >>>>>>>>>> > > >>>>>>>>>> > Regards, > >>>>>>>>>> > > >>>>>>>>>> > Ivan > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > 02.05.2019 14:40, Matthias J. Sax пишет: > >>>>>>>>>> >> I also agree with Michael's observation about the core > >>>>>>>>>> problem of > >>>>>>>>>> >> current `branch()` implementation. > >>>>>>>>>> >> > >>>>>>>>>> >> However, I also don't like to pass in a clumsy Map > >>>>>>>>>> object. My > >>>>>>>>>> thinking > >>>>>>>>>> >> was more aligned with Paul's proposal to just add a name > >>>>>>>>>> to each > >>>>>>>>>> >> `branch()` statement and return a `Map<String,KStream>`. > >>>>>>>>>> >> > >>>>>>>>>> >> It makes the code easier to read, and also make the > >>>>>>>>>> order of > >>>>>>>>>> >> `Predicates` (that is essential) easier to grasp. > >>>>>>>>>> >> > >>>>>>>>>> >>>>>> Map<String, KStream<K, V>> branches = stream.split() > >>>>>>>>>> >>>>>> .branch("branchOne", Predicate<K, V>) > >>>>>>>>>> >>>>>> .branch( "branchTwo", Predicate<K, V>) > >>>>>>>>>> >>>>>> .defaultBranch("defaultBranch"); > >>>>>>>>>> >> An open question is the case for which no > >>>>>>>>>> defaultBranch() should > >>>>>>>>> be > >>>>>>>>>> >> specified. Atm, `split()` and `branch()` would return > >>>>>>>>>> `BranchedKStream` > >>>>>>>>>> >> and the call to `defaultBranch()` that returns the > >>>>>>>>>> `Map` is > >>>>>>>>> mandatory > >>>>>>>>>> >> (what is not the case atm). Or is this actually not a > >>>>>>>>>> real > >>>>>>>>> problem, > >>>>>>>>>> >> because users can just ignore the branch returned by > >>>>>>>>>> `defaultBranch()` > >>>>>>>>>> >> in the result `Map` ? > >>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>>> >> About "inlining": So far, it seems to be a matter of > >>>>>>>>>> personal > >>>>>>>>>> >> preference. I can see arguments for both, but no "killer > >>>>>>>>>> argument" yet > >>>>>>>>>> >> that clearly make the case for one or the other. > >>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>>> >> -Matthias > >>>>>>>>>> >> > >>>>>>>>>> >>> On 5/1/19 6:26 PM, Paul Whalen wrote: > >>>>>>>>>> >>> Perhaps inlining is the wrong terminology. It doesn’t > >>>>>>>>>> require > >>>>>>>>>> that a lambda with the full downstream topology be defined > >>>>>>>>>> inline - > >>>>>>>>>> it can be a method reference as with Ivan’s original > >>>>>>>>>> suggestion. > >>>>>>>>>> The advantage of putting the predicate and its downstream > >>>>>>>>>> logic > >>>>>>>>>> (Consumer) together in branch() is that they are required > >>>>>>>>>> to be near > >>>>>>>>>> to each other. > >>>>>>>>>> >>> > >>>>>>>>>> >>> Ultimately the downstream code has to live somewhere, > >>>>>>>>>> and deep > >>>>>>>>>> branch trees will be hard to read regardless. > >>>>>>>>>> >>> > >>>>>>>>>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis > >>>>>>>>>> <michael.droga...@confluent.io > >>>>>>>>>> <mailto:michael.droga...@confluent.io>> wrote: > >>>>>>>>>> >>>> > >>>>>>>>>> >>>> I'm less enthusiastic about inlining the branch logic > >>>>>>>>>> with its > >>>>>>>>>> downstream > >>>>>>>>>> >>>> functionality. Programs that have deep branch trees > >>>>>>>>>> will > >>>>>>>>>> quickly become > >>>>>>>>>> >>>> harder to read as a single unit. > >>>>>>>>>> >>>> > >>>>>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen > >>>>>>>>>> <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote: > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, > >>>>>>>>>> I think > >>>>>>>>>> that sets a > >>>>>>>>>> >>>>> great framework for the discussion. > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Regarding the SortedMap solution, my understanding is > >>>>>>>>>> that the > >>>>>>>>>> current > >>>>>>>>>> >>>>> proposal in the KIP is what is in my PR which > >>>>>>>>>> (pending naming > >>>>>>>>>> decisions) is > >>>>>>>>>> >>>>> roughly this: > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> stream.split() > >>>>>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) > >>>>>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) > >>>>>>>>>> >>>>> .defaultBranch(Consumer<KStream<K, V>>); > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Obviously some ordering is necessary, since branching > >>>>>>>>>> as a > >>>>>>>>>> construct > >>>>>>>>>> >>>>> doesn't work without it, but this solution seems > >>>>>>>>>> like it > >>>>>>>>>> provides as much > >>>>>>>>>> >>>>> associativity as the SortedMap solution, because each > >>>>>>>>>> branch() > >>>>>>>>>> call > >>>>>>>>>> >>>>> directly associates the "conditional" with the "code > >>>>>>>>>> block." > >>>>>>>>>> The value it > >>>>>>>>>> >>>>> provides over the KIP solution is the accessing of > >>>>>>>>>> streams in > >>>>>>>>>> the same > >>>>>>>>>> >>>>> scope. > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap > >>>>>>>>>> solution > >>>>>>>>>> in the sense > >>>>>>>>>> >>>>> that it is slightly clumsier to add a dynamic > >>>>>>>>>> number of > >>>>>>>>>> branches, but it is > >>>>>>>>>> >>>>> certainly possible. It seems to me like the API > >>>>>>>>>> should favor > >>>>>>>>>> the "static" > >>>>>>>>>> >>>>> case anyway, and should make it simple and > >>>>>>>>>> readable to > >>>>>>>>>> fluently declare and > >>>>>>>>>> >>>>> access your branches in-line. It also makes it > >>>>>>>>>> impossible to > >>>>>>>>>> ignore a > >>>>>>>>>> >>>>> branch, and it is possible to build an (almost) > >>>>>>>>>> identical > >>>>>>>>>> SortedMap > >>>>>>>>>> >>>>> solution on top of it. > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> I could also see a middle ground where instead of > >>>>>>>>>> a raw > >>>>>>>>>> SortedMap being > >>>>>>>>>> >>>>> taken in, branch() takes a name and not a Consumer. > >>>>>>>>>> Something > >>>>>>>>>> like this: > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Map<String, KStream<K, V>> branches = stream.split() > >>>>>>>>>> >>>>> .branch("branchOne", Predicate<K, V>) > >>>>>>>>>> >>>>> .branch( "branchTwo", Predicate<K, V>) > >>>>>>>>>> >>>>> .defaultBranch("defaultBranch", > >>>>>>>>>> Consumer<KStream<K, V>>); > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Pros for that solution: > >>>>>>>>>> >>>>> - accessing branched KStreams in same scope > >>>>>>>>>> >>>>> - no double brace initialization, hopefully slightly > >>>>>>>>>> more > >>>>>>>>>> readable than > >>>>>>>>>> >>>>> SortedMap > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Cons > >>>>>>>>>> >>>>> - downstream branch logic cannot be specified inline > >>>>>>>>>> which > >>>>>>>>>> makes it harder > >>>>>>>>>> >>>>> to read top to bottom (like existing API and > >>>>>>>>>> SortedMap, but > >>>>>>>>>> unlike the KIP) > >>>>>>>>>> >>>>> - you can forget to "handle" one of the branched > >>>>>>>>>> streams (like > >>>>>>>>>> existing > >>>>>>>>>> >>>>> API and SortedMap, but unlike the KIP) > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> (KBranchedStreams could even work *both* ways but > >>>>>>>>>> perhaps > >>>>>>>>>> that's overdoing > >>>>>>>>>> >>>>> it). > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Overall I'm curious how important it is to be able to > >>>>>>>>>> easily > >>>>>>>>>> access the > >>>>>>>>>> >>>>> branched KStream in the same scope as the original. > >>>>>>>>>> It's > >>>>>>>>>> possible that it > >>>>>>>>>> >>>>> doesn't need to be handled directly by the API, but > >>>>>>>>>> instead > >>>>>>>>>> left up to the > >>>>>>>>>> >>>>> user. I'm sort of in the middle on it. > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> Paul > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman > >>>>>>>>>> <sop...@confluent.io <mailto:sop...@confluent.io>> > >>>>>>>>>> >>>>> wrote: > >>>>>>>>>> >>>>> > >>>>>>>>>> >>>>>> I'd like to +1 what Michael said about the issues > >>>>>>>>>> with the > >>>>>>>>>> existing > >>>>>>>>>> >>>>> branch > >>>>>>>>>> >>>>>> method, I agree with what he's outlined and I think > >>>>>>>>>> we should > >>>>>>>>>> proceed by > >>>>>>>>>> >>>>>> trying to alleviate these problems. Specifically it > >>>>>>>>>> seems > >>>>>>>>>> important to be > >>>>>>>>>> >>>>>> able to cleanly access the individual branches (eg > >>>>>>>>>> by mapping > >>>>>>>>>> >>>>>> name->stream), which I thought was the original > >>>>>>>>>> intention of > >>>>>>>>>> this KIP. > >>>>>>>>>> >>>>>> > >>>>>>>>>> >>>>>> That said, I don't think we should so easily give in > >>>>>>>>>> to the > >>>>>>>>>> double brace > >>>>>>>>>> >>>>>> anti-pattern or force ours users into it if at all > >>>>>>>>>> possible to > >>>>>>>>>> >>>>> avoid...just > >>>>>>>>>> >>>>>> my two cents. > >>>>>>>>>> >>>>>> > >>>>>>>>>> >>>>>> Cheers, > >>>>>>>>>> >>>>>> Sophie > >>>>>>>>>> >>>>>> > >>>>>>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < > >>>>>>>>>> >>>>>> michael.droga...@confluent.io > >>>>>>>>>> <mailto:michael.droga...@confluent.io>> wrote: > >>>>>>>>>> >>>>>> > >>>>>>>>>> >>>>>>> I’d like to propose a different way of thinking > >>>>>>>>>> about this. > >>>>>>>>>> To me, > >>>>>>>>>> >>>>> there > >>>>>>>>>> >>>>>>> are three problems with the existing branch > >>>>>>>>>> signature: > >>>>>>>>>> >>>>>>> > >>>>>>>>>> >>>>>>> 1. If you use it the way most people do, Java > >>>>>>>>>> raises unsafe > >>>>>>>>> type > >>>>>>>>>> >>>>>> warnings. > >>>>>>>>>> >>>>>>> 2. The way in which you use the stream branches is > >>>>>>>>>> positionally coupled > >>>>>>>>>> >>>>>> to > >>>>>>>>>> >>>>>>> the ordering of the conditionals. > >>>>>>>>>> >>>>>>> 3. It is brittle to extend existing branch calls > >>>>>>>>>> with > >>>>>>>>>> additional code > >>>>>>>>>> >>>>>>> paths. > >>>>>>>>>> >>>>>>> > >>>>>>>>>> >>>>>>> Using associative constructs instead of relying on > >>>>>>>>>> ordered > >>>>>>>>>> constructs > >>>>>>>>>> >>>>>> would > >>>>>>>>>> >>>>>>> be a stronger approach. Consider a signature that > >>>>>>>>>> instead > >>>>>>>>>> looks like > >>>>>>>>>> >>>>>> this: > >>>>>>>>>> >>>>>>> Map<String, KStream<K,V>> > >>>>>>>>>> KStream#branch(SortedMap<String, > >>>>>>>>>> Predicate<? > >>>>>>>>>> >>>>>>> super K,? super V>>); > >>>>>>>>>> >>>>>>> > >>>>>>>>>> >>>>>>> Branches are given names in a map, and as a result, > >>>>>>>>>> the API > >>>>>>>>>> returns a > >>>>>>>>>> >>>>>>> mapping of names to streams. The ordering of the > >>>>>>>>> conditionals is > >>>>>>>>>> >>>>>> maintained > >>>>>>>>>> >>>>>>> because it’s a sorted map. Insert order determines > >>>>>>>>>> the order > >>>>>>>>> of > >>>>>>>>>> >>>>>> evaluation. > >>>>>>>>>> >>>>>>> This solves problem 1 because there are no more > >>>>>>>>>> varargs. It > >>>>>>>>>> solves > >>>>>>>>>> >>>>>> problem > >>>>>>>>>> >>>>>>> 2 because you no longer lean on ordering to > >>>>>>>>>> access the > >>>>>>>>>> branch you’re > >>>>>>>>>> >>>>>>> interested in. It solves problem 3 because you can > >>>>>>>>>> introduce > >>>>>>>>>> another > >>>>>>>>>> >>>>>>> conditional by simply attaching another name to the > >>>>>>>>>> structure, rather > >>>>>>>>>> >>>>>> than > >>>>>>>>>> >>>>>>> messing with the existing indices. > >>>>>>>>>> >>>>>>> > >>>>>>>>>> >>>>>>> One of the drawbacks is that creating the map > >>>>>>>>>> inline is > >>>>>>>>>> historically > >>>>>>>>>> >>>>>>> awkward in Java. I know it’s an anti-pattern to use > >>>>>>>>>> voluminously, but > >>>>>>>>>> >>>>>>> double brace initialization would clean up the > >>>>>>>>>> aesthetics. > >>>>>>>>>> >>>>>>> > >>>>>>>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler > >>>>>>>>>> <j...@confluent.io <mailto:j...@confluent.io>> > >>>>>>>>>> >>>>> wrote: > >>>>>>>>>> >>>>>>>> Hi Ivan, > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>> Thanks for the update. > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>> FWIW, I agree with Matthias that the current > >>>>>>>>>> "start > >>>>>>>>> branching" > >>>>>>>>>> >>>>> operator > >>>>>>>>>> >>>>>>> is > >>>>>>>>>> >>>>>>>> confusing when named the same way as the actual > >>>>>>>>>> branches. > >>>>>>>>>> "Split" > >>>>>>>>>> >>>>> seems > >>>>>>>>>> >>>>>>>> like a good name. Alternatively, we can do without > >>>>>>>>>> a "start > >>>>>>>>>> >>>>> branching" > >>>>>>>>>> >>>>>>>> operator at all, and just do: > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>> stream > >>>>>>>>>> >>>>>>>> .branch(Predicate) > >>>>>>>>>> >>>>>>>> .branch(Predicate) > >>>>>>>>>> >>>>>>>> .defaultBranch(); > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>> Tentatively, I think that this branching operation > >>>>>>>>>> should be > >>>>>>>>>> >>>>> terminal. > >>>>>>>>>> >>>>>>> That > >>>>>>>>>> >>>>>>>> way, we don't create ambiguity about how to use > >>>>>>>>>> it. That > >>>>>>>>>> is, `branch` > >>>>>>>>>> >>>>>>>> should return `KBranchedStream`, while > >>>>>>>>>> `defaultBranch` is > >>>>>>>>>> `void`, to > >>>>>>>>>> >>>>>>>> enforce that it comes last, and that there is only > >>>>>>>>>> one > >>>>>>>>>> definition of > >>>>>>>>>> >>>>>> the > >>>>>>>>>> >>>>>>>> default branch. Potentially, we should log a > >>>>>>>>>> warning if > >>>>>>>>>> there's no > >>>>>>>>>> >>>>>>> default, > >>>>>>>>>> >>>>>>>> and additionally log a warning (or throw an > >>>>>>>>>> exception) if a > >>>>>>>>>> record > >>>>>>>>>> >>>>>> falls > >>>>>>>>>> >>>>>>>> though with no default. > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>> Thoughts? > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>> Thanks, > >>>>>>>>>> >>>>>>>> -John > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < > >>>>>>>>>> >>>>> matth...@confluent.io <mailto:matth...@confluent.io> > >>>>>>>>>> >>>>>>>> wrote: > >>>>>>>>>> >>>>>>>> > >>>>>>>>>> >>>>>>>>> Thanks for updating the KIP and your answers. > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>>> this is to make the name similar to String#split > >>>>>>>>>> >>>>>>>>>>> that also returns an array, right? > >>>>>>>>>> >>>>>>>>> The intend was to avoid name duplication. The > >>>>>>>>>> return type > >>>>>>>>>> should > >>>>>>>>>> >>>>>> _not_ > >>>>>>>>>> >>>>>>>>> be an array. > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> The current proposal is > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> stream.branch() > >>>>>>>>>> >>>>>>>>> .branch(Predicate) > >>>>>>>>>> >>>>>>>>> .branch(Predicate) > >>>>>>>>>> >>>>>>>>> .defaultBranch(); > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> IMHO, this reads a little odd, because the first > >>>>>>>>>> `branch()` does > >>>>>>>>>> >>>>> not > >>>>>>>>>> >>>>>>>>> take any parameters and has different semantics > >>>>>>>>>> than the > >>>>>>>>> later > >>>>>>>>>> >>>>>>>>> `branch()` calls. Note, that from the code > >>>>>>>>>> snippet above, > >>>>>>>>> it's > >>>>>>>>>> >>>>> hidden > >>>>>>>>>> >>>>>>>>> that the first call is `KStream#branch()` while > >>>>>>>>>> the others > >>>>>>>>> are > >>>>>>>>>> >>>>>>>>> `KBranchedStream#branch()` what makes reading the > >>>>>>>>>> code > >>>>>>>>> harder. > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> > >>>>>>>>>> `branch()`, > >>>>>>>>>> I though > >>>>>>>>>> >>>>>> it > >>>>>>>>>> >>>>>>>>> might be better to also rename `KStream#branch()` > >>>>>>>>>> to avoid > >>>>>>>>> the > >>>>>>>>>> >>>>> naming > >>>>>>>>>> >>>>>>>>> overlap that seems to be confusing. The following > >>>>>>>>>> reads > >>>>>>>>> much > >>>>>>>>>> >>>>> cleaner > >>>>>>>>>> >>>>>> to > >>>>>>>>>> >>>>>>>> me: > >>>>>>>>>> >>>>>>>>> stream.split() > >>>>>>>>>> >>>>>>>>> .branch(Predicate) > >>>>>>>>>> >>>>>>>>> .branch(Predicate) > >>>>>>>>>> >>>>>>>>> .defaultBranch(); > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> Maybe there is a better alternative to `split()` > >>>>>>>>>> though to > >>>>>>>>>> avoid > >>>>>>>>>> >>>>> the > >>>>>>>>>> >>>>>>>>> naming overlap. > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>>> 'default' is, however, a reserved word, so > >>>>>>>>>> unfortunately > >>>>>>>>> we > >>>>>>>>>> >>>>> cannot > >>>>>>>>>> >>>>>>> have > >>>>>>>>>> >>>>>>>>> a method with such name :-) > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still > >>>>>>>>>> come up > >>>>>>>>>> with a > >>>>>>>>>> >>>>> short > >>>>>>>>>> >>>>>>>> name? > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to > >>>>>>>>>> the KIP > >>>>>>>>>> with all > >>>>>>>>>> >>>>> it's > >>>>>>>>>> >>>>>>>>> methods? It will be part of public API and > >>>>>>>>>> should be > >>>>>>>>>> contained in > >>>>>>>>>> >>>>> the > >>>>>>>>>> >>>>>>>>> KIP. For example, it's unclear atm, what the > >>>>>>>>>> return type of > >>>>>>>>>> >>>>>>>>> `defaultBranch()` is. > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> You did not comment on the idea to add a > >>>>>>>>>> `KBranchedStream#get(int > >>>>>>>>>> >>>>>>> index) > >>>>>>>>>> >>>>>>>>> -> KStream` method to get the individually > >>>>>>>>>> branched-KStreams. Would > >>>>>>>>>> >>>>>> be > >>>>>>>>>> >>>>>>>>> nice to get your feedback about it. It seems you > >>>>>>>>>> suggest > >>>>>>>>>> that users > >>>>>>>>>> >>>>>>>>> would need to write custom utility code > >>>>>>>>>> otherwise, to > >>>>>>>>>> access them. > >>>>>>>>>> >>>>> We > >>>>>>>>>> >>>>>>>>> should discuss the pros and cons of both > >>>>>>>>>> approaches. It > >>>>>>>>> feels > >>>>>>>>>> >>>>>>>>> "incomplete" to me atm, if the API has no > >>>>>>>>>> built-in support > >>>>>>>>>> to get > >>>>>>>>>> >>>>> the > >>>>>>>>>> >>>>>>>>> branched-KStreams directly. > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> -Matthias > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: > >>>>>>>>>> >>>>>>>>>> Hi all! > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> I have updated the KIP-418 according to the new > >>>>>>>>>> vision. > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> Matthias, thanks for your comment! > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split() > >>>>>>>>>> >>>>>>>>>> I can see your point: this is to make the name > >>>>>>>>>> similar to > >>>>>>>>>> >>>>>>> String#split > >>>>>>>>>> >>>>>>>>>> that also returns an array, right? But is it > >>>>>>>>>> worth the > >>>>>>>>>> loss of > >>>>>>>>>> >>>>>>>> backwards > >>>>>>>>>> >>>>>>>>>> compatibility? We can have overloaded branch() > >>>>>>>>>> as well > >>>>>>>>>> without > >>>>>>>>>> >>>>>>>> affecting > >>>>>>>>>> >>>>>>>>>> the existing code. Maybe the old array-based > >>>>>>>>>> `branch` > >>>>>>>>> method > >>>>>>>>>> >>>>> should > >>>>>>>>>> >>>>>>> be > >>>>>>>>>> >>>>>>>>>> deprecated, but this is a subject for > >>>>>>>>>> discussion. > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() -> > >>>>>>>>>> >>>>> BranchingKStream#branch(), > >>>>>>>>>> >>>>>>>>>> KBranchedStream#defaultBranch() -> > >>>>>>>>> BranchingKStream#default() > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. > >>>>>>>>>> 'default' > >>>>>>>>> is, > >>>>>>>>>> >>>>>>> however, a > >>>>>>>>>> >>>>>>>>>> reserved word, so unfortunately we cannot have a > >>>>>>>>>> method > >>>>>>>>>> with such > >>>>>>>>>> >>>>>>> name > >>>>>>>>>> >>>>>>>>> :-) > >>>>>>>>>> >>>>>>>>>>> defaultBranch() does take an `Predicate` as > >>>>>>>>>> argument, > >>>>>>>>> but I > >>>>>>>>>> >>>>> think > >>>>>>>>>> >>>>>>> that > >>>>>>>>>> >>>>>>>>>> is not required? > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste > >>>>>>>>>> error or > >>>>>>>>>> something. > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> Dear colleagues, > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> please revise the new version of the KIP and > >>>>>>>>>> Paul's PR > >>>>>>>>>> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512) > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> Any new suggestions/objections? > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> Ivan > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет: > >>>>>>>>>> >>>>>>>>>>> Thanks for driving the discussion of this KIP. > >>>>>>>>>> It seems > >>>>>>>>> that > >>>>>>>>>> >>>>>>> everybody > >>>>>>>>>> >>>>>>>>>>> agrees that the current branch() method using > >>>>>>>>>> arrays is > >>>>>>>>> not > >>>>>>>>>> >>>>>> optimal. > >>>>>>>>>> >>>>>>>>>>> I had a quick look into the PR and I like the > >>>>>>>>>> overall > >>>>>>>>>> proposal. > >>>>>>>>>> >>>>>>> There > >>>>>>>>>> >>>>>>>>>>> are some minor things we need to consider. I > >>>>>>>>>> would > >>>>>>>>>> recommend the > >>>>>>>>>> >>>>>>>>>>> following renaming: > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> KStream#branch() -> #split() > >>>>>>>>>> >>>>>>>>>>> KBranchedStream#addBranch() -> > >>>>>>>>>> BranchingKStream#branch() > >>>>>>>>>> >>>>>>>>>>> KBranchedStream#defaultBranch() -> > >>>>>>>>>> BranchingKStream#default() > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter > >>>>>>>>>> method > >>>>>>>>> names. > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an > >>>>>>>>>> `Predicate` as > >>>>>>>>>> >>>>>>>> argument, > >>>>>>>>>> >>>>>>>>>>> but I think that is not required? > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was > >>>>>>>>>> recently > >>>>>>>>>> accepted and > >>>>>>>>>> >>>>>> is > >>>>>>>>>> >>>>>>>>>>> currently implemented: > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>> > >>>>>>>>>> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> Ie, we should add overloads that accepted a > >>>>>>>>>> `Named` > >>>>>>>>>> parameter. > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> For the issue that the created `KStream` object > >>>>>>>>>> are in > >>>>>>>>>> different > >>>>>>>>>> >>>>>>>> scopes: > >>>>>>>>>> >>>>>>>>>>> could we extend `KBranchedStream` with a > >>>>>>>>>> `get(int > >>>>>>>>>> index)` method > >>>>>>>>>> >>>>>>> that > >>>>>>>>>> >>>>>>>>>>> returns the corresponding "branched" result > >>>>>>>>>> `KStream` > >>>>>>>>>> object? > >>>>>>>>>> >>>>>> Maybe, > >>>>>>>>>> >>>>>>>> the > >>>>>>>>>> >>>>>>>>>>> second argument of `addBranch()` should not > >>>>>>>>>> be a > >>>>>>>>>> >>>>>> `Consumer<KStream>` > >>>>>>>>>> >>>>>>>> but > >>>>>>>>>> >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could > >>>>>>>>>> return > >>>>>>>>>> whatever > >>>>>>>>>> >>>>>> the > >>>>>>>>>> >>>>>>>>>>> `Function` returns? > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP > >>>>>>>>>> with the > >>>>>>>>>> current > >>>>>>>>>> >>>>>>>>>>> proposal. That makes it easier to review. > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> -Matthias > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: > >>>>>>>>>> >>>>>>>>>>>> Ivan, > >>>>>>>>>> >>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I > >>>>>>>>>> think it > >>>>>>>>>> makes sense > >>>>>>>>>> >>>>>> for > >>>>>>>>>> >>>>>>>> you > >>>>>>>>>> >>>>>>>>> to > >>>>>>>>>> >>>>>>>>>>>> revise the KIP and continue the discussion. > >>>>>>>>>> Obviously > >>>>>>>>>> we'll > >>>>>>>>>> >>>>> need > >>>>>>>>>> >>>>>>>> some > >>>>>>>>>> >>>>>>>>>>>> buy-in from committers that have actual > >>>>>>>>>> binding votes on > >>>>>>>>>> >>>>> whether > >>>>>>>>>> >>>>>>> the > >>>>>>>>>> >>>>>>>>> KIP > >>>>>>>>>> >>>>>>>>>>>> could be adopted. It would be great to hear > >>>>>>>>>> if they > >>>>>>>>>> think this > >>>>>>>>>> >>>>>> is > >>>>>>>>>> >>>>>>> a > >>>>>>>>>> >>>>>>>>> good > >>>>>>>>>> >>>>>>>>>>>> idea overall. I'm not sure if that happens > >>>>>>>>>> just by > >>>>>>>>>> starting a > >>>>>>>>>> >>>>>>> vote, > >>>>>>>>>> >>>>>>>>> or if > >>>>>>>>>> >>>>>>>>>>>> there is generally some indication of interest > >>>>>>>>> beforehand. > >>>>>>>>>> >>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>> That being said, I'll continue the discussion > >>>>>>>>>> a bit: > >>>>>>>>>> assuming > >>>>>>>>>> >>>>> we > >>>>>>>>>> >>>>>> do > >>>>>>>>>> >>>>>>>>> move > >>>>>>>>>> >>>>>>>>>>>> forward the solution of "stream.branch() > >>>>>>>>>> returns > >>>>>>>>>> >>>>>> KBranchedStream", > >>>>>>>>>> >>>>>>> do > >>>>>>>>>> >>>>>>>>> we > >>>>>>>>>> >>>>>>>>>>>> deprecate "stream.branch(...) returns > >>>>>>>>>> KStream[]"? I > >>>>>>>>> would > >>>>>>>>>> >>>>> favor > >>>>>>>>>> >>>>>>>>>>>> deprecating, since having two mutually > >>>>>>>>>> exclusive APIs > >>>>>>>>> that > >>>>>>>>>> >>>>>>> accomplish > >>>>>>>>>> >>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>> same thing is confusing, especially when > >>>>>>>>>> they're fairly > >>>>>>>>>> similar > >>>>>>>>>> >>>>>>>>> anyway. We > >>>>>>>>>> >>>>>>>>>>>> just need to be sure we're not making > >>>>>>>>>> something > >>>>>>>>>> >>>>>>> impossible/difficult > >>>>>>>>>> >>>>>>>>> that > >>>>>>>>>> >>>>>>>>>>>> is currently possible/easy. > >>>>>>>>>> >>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>> Regarding my PR - I think the general > >>>>>>>>>> structure would > >>>>>>>>> work, > >>>>>>>>>> >>>>> it's > >>>>>>>>>> >>>>>>>> just a > >>>>>>>>>> >>>>>>>>>>>> little sloppy overall in terms of naming and > >>>>>>>>>> clarity. In > >>>>>>>>>> >>>>>>> particular, > >>>>>>>>>> >>>>>>>>>>>> passing in the "predicates" and "children" > >>>>>>>>>> lists which > >>>>>>>>> get > >>>>>>>>>> >>>>>> modified > >>>>>>>>>> >>>>>>>> in > >>>>>>>>>> >>>>>>>>>>>> KBranchedStream but read from all the way > >>>>>>>>>> KStreamLazyBranch is > >>>>>>>>>> >>>>> a > >>>>>>>>>> >>>>>>> bit > >>>>>>>>>> >>>>>>>>>>>> complicated to follow. > >>>>>>>>>> >>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>> Thanks, > >>>>>>>>>> >>>>>>>>>>>> Paul > >>>>>>>>>> >>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan > >>>>>>>>>> Ponomarev < > >>>>>>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> > >>>>>>>>>> >>>>>>>>> wrote: > >>>>>>>>>> >>>>>>>>>>>>> Hi Paul! > >>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully > >>>>>>>>>> convinced: your > >>>>>>>>>> >>>>>>>> proposal > >>>>>>>>>> >>>>>>>>>>>>> looks better and should work. We just have to > >>>>>>>>>> document > >>>>>>>>> the > >>>>>>>>>> >>>>>> crucial > >>>>>>>>>> >>>>>>>>> fact > >>>>>>>>>> >>>>>>>>>>>>> that KStream consumers are invoked as they're > >>>>>>>>>> added. > >>>>>>>>>> And then > >>>>>>>>>> >>>>>> it's > >>>>>>>>>> >>>>>>>> all > >>>>>>>>>> >>>>>>>>>>>>> going to be very nice. > >>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the > >>>>>>>>>> KIP and > >>>>>>>>>> resume the > >>>>>>>>>> >>>>>>>>>>>>> discussion here, right? > >>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not > >>>>>>>>>> be even a > >>>>>>>>>> >>>>> starting > >>>>>>>>>> >>>>>>>> point > >>>>>>>>>> >>>>>>>>> if > >>>>>>>>>> >>>>>>>>>>>>> we go in this direction'? To me it looks like > >>>>>>>>>> a good > >>>>>>>>>> starting > >>>>>>>>>> >>>>>>> point. > >>>>>>>>>> >>>>>>>>> But > >>>>>>>>>> >>>>>>>>>>>>> as a novice in this project I might miss some > >>>>>>>>>> important > >>>>>>>>>> >>>>> details. > >>>>>>>>>> >>>>>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>> Ivan > >>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет: > >>>>>>>>>> >>>>>>>>>>>>>> Ivan, > >>>>>>>>>> >>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>> Maybe I’m missing the point, but I > >>>>>>>>>> believe the > >>>>>>>>>> >>>>> stream.branch() > >>>>>>>>>> >>>>>>>>> solution > >>>>>>>>>> >>>>>>>>>>>>> supports this. The couponIssuer::set* > >>>>>>>>>> consumers will be > >>>>>>>>>> >>>>> invoked > >>>>>>>>>> >>>>>> as > >>>>>>>>>> >>>>>>>>> they’re > >>>>>>>>>> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So > >>>>>>>>>> the user > >>>>>>>>>> still > >>>>>>>>>> >>>>>> ought > >>>>>>>>>> >>>>>>> to > >>>>>>>>>> >>>>>>>>> be > >>>>>>>>>> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward > >>>>>>>>>> and > >>>>>>>>>> depend on > >>>>>>>>>> >>>>> the > >>>>>>>>>> >>>>>>>>> branched > >>>>>>>>>> >>>>>>>>>>>>> streams having been set. > >>>>>>>>>> >>>>>>>>>>>>>> The issue I mean to point out is that it is > >>>>>>>>>> hard to > >>>>>>>>>> access > >>>>>>>>>> >>>>> the > >>>>>>>>>> >>>>>>>>> branched > >>>>>>>>>> >>>>>>>>>>>>> streams in the same scope as the original > >>>>>>>>>> stream (that > >>>>>>>>>> is, not > >>>>>>>>>> >>>>>>>> inside > >>>>>>>>>> >>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>> couponIssuer), which is a problem with both > >>>>>>>>>> proposed > >>>>>>>>>> >>>>> solutions. > >>>>>>>>>> >>>>>> It > >>>>>>>>>> >>>>>>>>> can be > >>>>>>>>>> >>>>>>>>>>>>> worked around though. > >>>>>>>>>> >>>>>>>>>>>>>> [Also, great to hear additional interest in > >>>>>>>>>> 401, I’m > >>>>>>>>>> excited > >>>>>>>>>> >>>>> to > >>>>>>>>>> >>>>>>>> hear > >>>>>>>>>> >>>>>>>>>>>>> your thoughts!] > >>>>>>>>>> >>>>>>>>>>>>>> Paul > >>>>>>>>>> >>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan > >>>>>>>>>> Ponomarev < > >>>>>>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> > >>>>>>>>>> >>>>>>>>> wrote: > >>>>>>>>>> >>>>>>>>>>>>>>> Hi Paul! > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches > >>>>>>>>>> to the > >>>>>>>>>> >>>>>>>>>>>>> streamsBuilder.build() also looked great for > >>>>>>>>>> me at > >>>>>>>>> first > >>>>>>>>>> >>>>> glance, > >>>>>>>>>> >>>>>>> but > >>>>>>>>>> >>>>>>>>> --- > >>>>>>>>>> >>>>>>>>>>>>>>>> the newly branched streams are not > >>>>>>>>>> available in the > >>>>>>>>>> same > >>>>>>>>>> >>>>>> scope > >>>>>>>>>> >>>>>>> as > >>>>>>>>>> >>>>>>>>> each > >>>>>>>>>> >>>>>>>>>>>>> other. That is, if we wanted to merge > >>>>>>>>>> them back > >>>>>>>>> together > >>>>>>>>>> >>>>> again > >>>>>>>>>> >>>>>> I > >>>>>>>>>> >>>>>>>>> don't see > >>>>>>>>>> >>>>>>>>>>>>> a way to do that. > >>>>>>>>>> >>>>>>>>>>>>>>> You just took the words right out of my > >>>>>>>>>> mouth, I was > >>>>>>>>>> just > >>>>>>>>>> >>>>>> going > >>>>>>>>>> >>>>>>> to > >>>>>>>>>> >>>>>>>>>>>>> write in details about this issue. > >>>>>>>>>> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. > >>>>>>>>>> 101: say > >>>>>>>>>> we need > >>>>>>>>>> >>>>> to > >>>>>>>>>> >>>>>>>>> identify > >>>>>>>>>> >>>>>>>>>>>>> customers who have bought coffee and made a > >>>>>>>>>> purchase > >>>>>>>>>> in the > >>>>>>>>>> >>>>>>>>> electronics > >>>>>>>>>> >>>>>>>>>>>>> store to give them coupons. > >>>>>>>>>> >>>>>>>>>>>>>>> This is the code I usually write under > >>>>>>>>>> these > >>>>>>>>>> circumstances > >>>>>>>>>> >>>>>> using > >>>>>>>>>> >>>>>>>> my > >>>>>>>>>> >>>>>>>>>>>>> 'brancher' class: > >>>>>>>>>> >>>>>>>>>>>>>>> @Setter > >>>>>>>>>> >>>>>>>>>>>>>>> class CouponIssuer{ > >>>>>>>>>> >>>>>>>>>>>>>>> private KStream<....> coffePurchases; > >>>>>>>>>> >>>>>>>>>>>>>>> private KStream<....> > >>>>>>>>>> electronicsPurchases; > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){ > >>>>>>>>>> >>>>>>>>>>>>>>> return > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever > >>>>>>>>>> >>>>>>>>>>>>>>> /*In the real world the code here > >>>>>>>>>> can be > >>>>>>>>>> complex, so > >>>>>>>>>> >>>>>>>>> creation of > >>>>>>>>>> >>>>>>>>>>>>> a separate CouponIssuer class is fully > >>>>>>>>>> justified, in > >>>>>>>>>> order to > >>>>>>>>>> >>>>>>>> separate > >>>>>>>>>> >>>>>>>>>>>>> classes' responsibilities.*/ > >>>>>>>>>> >>>>>>>>>>>>>>> } > >>>>>>>>>> >>>>>>>>>>>>>>> } > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new > >>>>>>>>>> CouponIssuer(); > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>() > >>>>>>>>>> >>>>>>>>>>>>>>> .branch(predicate1, > >>>>>>>>> couponIssuer::setCoffePurchases) > >>>>>>>>>> >>>>>>>>>>>>>>> .branch(predicate2, > >>>>>>>>>> >>>>>> couponIssuer::setElectronicsPurchases) > >>>>>>>>>> >>>>>>>>>>>>>>> .onTopOf(transactionStream); > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to > >>>>>>>>>> wire up > >>>>>>>>>> everything > >>>>>>>>>> >>>>>>>> later, > >>>>>>>>>> >>>>>>>>>>>>> without the terminal operation!!!*/ > >>>>>>>>>> >>>>>>>>>>>>>>> couponIssuer.coupons()... > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly > >>>>>>>>>> initialize the > >>>>>>>>>> >>>>>>>>> CouponIssuer > >>>>>>>>>> >>>>>>>>>>>>> we need the terminal operation to be called > >>>>>>>>>> before > >>>>>>>>>> >>>>>>>>> streamsBuilder.build() > >>>>>>>>>> >>>>>>>>>>>>> is called. > >>>>>>>>>> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your > >>>>>>>>>> KIP-401 is > >>>>>>>>>> essentially > >>>>>>>>>> >>>>>> the > >>>>>>>>>> >>>>>>>>> next > >>>>>>>>>> >>>>>>>>>>>>> KIP I was going to write here. I have some > >>>>>>>>>> thoughts > >>>>>>>>>> based on > >>>>>>>>>> >>>>> my > >>>>>>>>>> >>>>>>>>> experience, > >>>>>>>>>> >>>>>>>>>>>>> so I will join the discussion on KIP-401 > >>>>>>>>>> soon.] > >>>>>>>>>> >>>>>>>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> Ivan > >>>>>>>>>> >>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет: > >>>>>>>>>> >>>>>>>>>>>>>>>> Ivan, > >>>>>>>>>> >>>>>>>>>>>>>>>> I tried to make a very rough proof of > >>>>>>>>>> concept of a > >>>>>>>>>> fluent > >>>>>>>>>> >>>>> API > >>>>>>>>>> >>>>>>>> based > >>>>>>>>>> >>>>>>>>>>>>> off of > >>>>>>>>>> >>>>>>>>>>>>>>>> KStream here > >>>>>>>>>> (https://github.com/apache/kafka/pull/6512), > >>>>>>>>>> >>>>>> and > >>>>>>>>>> >>>>>>> I > >>>>>>>>>> >>>>>>>>> think > >>>>>>>>>> >>>>>>>>>>>>> I > >>>>>>>>>> >>>>>>>>>>>>>>>> succeeded at removing both cons. > >>>>>>>>>> >>>>>>>>>>>>>>>> - Compatibility: I was incorrect > >>>>>>>>>> earlier about > >>>>>>>>>> >>>>>>> compatibility > >>>>>>>>>> >>>>>>>>>>>>> issues, > >>>>>>>>>> >>>>>>>>>>>>>>>> there aren't any direct ones. I was > >>>>>>>>>> unaware > >>>>>>>>>> that Java > >>>>>>>>>> >>>>> is > >>>>>>>>>> >>>>>>>> smart > >>>>>>>>>> >>>>>>>>>>>>> enough to > >>>>>>>>>> >>>>>>>>>>>>>>>> distinguish between a > >>>>>>>>>> branch(varargs...) > >>>>>>>>>> returning one > >>>>>>>>>> >>>>>>> thing > >>>>>>>>>> >>>>>>>>> and > >>>>>>>>>> >>>>>>>>>>>>> branch() > >>>>>>>>>> >>>>>>>>>>>>>>>> with no arguments returning another > >>>>>>>>>> thing. > >>>>>>>>>> >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't > >>>>>>>>>> actually > >>>>>>>>>> need > >>>>>>>>>> >>>>> it. > >>>>>>>>>> >>>>>>> We > >>>>>>>>>> >>>>>>>>> can > >>>>>>>>>> >>>>>>>>>>>>> just > >>>>>>>>>> >>>>>>>>>>>>>>>> build up the branches in the > >>>>>>>>>> KBranchedStream who > >>>>>>>>>> shares > >>>>>>>>>> >>>>>> its > >>>>>>>>>> >>>>>>>>> state > >>>>>>>>>> >>>>>>>>>>>>> with the > >>>>>>>>>> >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do > >>>>>>>>>> the > >>>>>>>>>> branching. > >>>>>>>>>> >>>>>>> It's > >>>>>>>>>> >>>>>>>>> not > >>>>>>>>>> >>>>>>>>>>>>> terribly > >>>>>>>>>> >>>>>>>>>>>>>>>> pretty in its current form, but I > >>>>>>>>>> think it > >>>>>>>>>> demonstrates > >>>>>>>>>> >>>>>> its > >>>>>>>>>> >>>>>>>>>>>>> feasibility. > >>>>>>>>>> >>>>>>>>>>>>>>>> To be clear, I don't think that pull > >>>>>>>>>> request should > >>>>>>>>> be > >>>>>>>>>> >>>>> final > >>>>>>>>>> >>>>>> or > >>>>>>>>>> >>>>>>>>> even a > >>>>>>>>>> >>>>>>>>>>>>>>>> starting point if we go in this direction, > >>>>>>>>>> I just > >>>>>>>>>> wanted to > >>>>>>>>>> >>>>>> see > >>>>>>>>>> >>>>>>>> how > >>>>>>>>>> >>>>>>>>>>>>>>>> challenging it would be to get the API > >>>>>>>>>> working. > >>>>>>>>>> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the > >>>>>>>>>> existing > >>>>>>>>>> solution > >>>>>>>>>> >>>>>>> could > >>>>>>>>>> >>>>>>>> be > >>>>>>>>>> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had > >>>>>>>>>> originally > >>>>>>>>>> >>>>> suggested > >>>>>>>>>> >>>>>>>> was a > >>>>>>>>>> >>>>>>>>>>>>>>>> possibility. The reason is that the newly > >>>>>>>>>> branched > >>>>>>>>>> streams > >>>>>>>>>> >>>>>> are > >>>>>>>>>> >>>>>>>> not > >>>>>>>>>> >>>>>>>>>>>>>>>> available in the same scope as each > >>>>>>>>>> other. That > >>>>>>>>>> is, if we > >>>>>>>>>> >>>>>>> wanted > >>>>>>>>>> >>>>>>>>> to > >>>>>>>>>> >>>>>>>>>>>>> merge > >>>>>>>>>> >>>>>>>>>>>>>>>> them back together again I don't see a way > >>>>>>>>>> to do > >>>>>>>>>> that. The > >>>>>>>>>> >>>>>> KIP > >>>>>>>>>> >>>>>>>>>>>>> proposal > >>>>>>>>>> >>>>>>>>>>>>>>>> has the same issue, though - all this > >>>>>>>>>> means is that > >>>>>>>>> for > >>>>>>>>>> >>>>>> either > >>>>>>>>>> >>>>>>>>>>>>> solution, > >>>>>>>>>> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is > >>>>>>>>>> not on the > >>>>>>>>>> table. > >>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>> >>>>>>>>>>>>>>>> Paul > >>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan > >>>>>>>>>> Ponomarev < > >>>>>>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>> > >>>>>>>>>> >>>>>>>>>>>>> wrote: > >>>>>>>>>> >>>>>>>>>>>>>>>>> OK, let me summarize what we have > >>>>>>>>>> discussed up to > >>>>>>>>> this > >>>>>>>>>> >>>>>> point. > >>>>>>>>>> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed > >>>>>>>>>> that > >>>>>>>>>> branch API > >>>>>>>>>> >>>>>>> needs > >>>>>>>>>> >>>>>>>>>>>>>>>>> improvement. Motivation is given in > >>>>>>>>>> the KIP. > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it: > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed) > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() > >>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..) > >>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..) > >>>>>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional > >>>>>>>>>> >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... > >>>>>>>>>> //onTopOf > >>>>>>>>>> returns > >>>>>>>>>> >>>>>> its > >>>>>>>>>> >>>>>>>>> argument > >>>>>>>>>> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) > >>>>>>>>>> The code > >>>>>>>>> won't > >>>>>>>>>> >>>>> make > >>>>>>>>>> >>>>>>>> sense > >>>>>>>>>> >>>>>>>>>>>>> until > >>>>>>>>>> >>>>>>>>>>>>>>>>> all the necessary ingredients are > >>>>>>>>>> provided. > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a > >>>>>>>>>> KafkaStreamsBrancher > >>>>>>>>>> instance > >>>>>>>>>> >>>>>>>>> contrasts the > >>>>>>>>>> >>>>>>>>>>>>>>>>> fluency of other KStream methods. > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes) > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> stream > >>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...) > >>>>>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...) > >>>>>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or > >>>>>>>>>> noDefault(). Both > >>>>>>>>>> >>>>>>>>> defaultBranch(..) > >>>>>>>>>> >>>>>>>>>>>>> and > >>>>>>>>>> >>>>>>>>>>>>>>>>> noDefault() return void > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams > >>>>>>>>>> interface > >>>>>>>>> is > >>>>>>>>>> >>>>>> defined. > >>>>>>>>>> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal > >>>>>>>>>> methods > >>>>>>>>>> >>>>>>>> (defaultBranch(ks->) > >>>>>>>>>> >>>>>>>>> and > >>>>>>>>>> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very > >>>>>>>>>> easy to > >>>>>>>>>> miss the > >>>>>>>>>> >>>>>> fact > >>>>>>>>>> >>>>>>>>> that one > >>>>>>>>>> >>>>>>>>>>>>>>>>> of the terminal methods should be called. > >>>>>>>>>> If these > >>>>>>>>>> methods > >>>>>>>>>> >>>>>> are > >>>>>>>>>> >>>>>>>> not > >>>>>>>>>> >>>>>>>>>>>>>>>>> called, we can throw an exception in > >>>>>>>>>> runtime. > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can > >>>>>>>>>> we do > >>>>>>>>> better? > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> Ivan > >>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: > >>>>>>>>>> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> Paul, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking > >>>>>>>>>> about > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default.. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot > >>>>>>>>>> not be > >>>>>>>>>> implemented the > >>>>>>>>>> >>>>>>> easy > >>>>>>>>>> >>>>>>>>> way. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> Maybe we all should think further. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal > >>>>>>>>>> method that > >>>>>>>>> assumes > >>>>>>>>>> >>>>> nothing > >>>>>>>>>> >>>>>>>> will > >>>>>>>>>> >>>>>>>>>>>>> reach > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> the default branch, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case > >>>>>>>>>> occurs. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be > >>>>>>>>>> the only > >>>>>>>>> option > >>>>>>>>>> >>>>>> besides > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios > >>>>>>>>>> when we > >>>>>>>>>> want to > >>>>>>>>>> >>>>>> just > >>>>>>>>>> >>>>>>>>> silently > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any > >>>>>>>>>> predicate. 2) > >>>>>>>>>> >>>>>>> Throwing > >>>>>>>>>> >>>>>>>>> an > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow > >>>>>>>>>> processing > >>>>>>>>>> looks > >>>>>>>>>> >>>>>> like a > >>>>>>>>>> >>>>>>>> bad > >>>>>>>>>> >>>>>>>>>>>>> idea. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would > >>>>>>>>>> prefer to > >>>>>>>>>> emit a > >>>>>>>>>> >>>>>>>> special > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is > >>>>>>>>>> exactly > >>>>>>>>> where > >>>>>>>>>> >>>>>>> `default` > >>>>>>>>>> >>>>>>>>> can > >>>>>>>>>> >>>>>>>>>>>>> be > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> used. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the > >>>>>>>>>> >>>>> InternalTopologyBuilder > >>>>>>>>>> >>>>>>> to > >>>>>>>>>> >>>>>>>>> track > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> dangling > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated > >>>>>>>>>> and raise > >>>>>>>>>> a clear > >>>>>>>>>> >>>>>>> error > >>>>>>>>>> >>>>>>>>>>>>> before it > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> becomes an issue. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the > >>>>>>>>>> program is > >>>>>>>>>> >>>>> compiled > >>>>>>>>>> >>>>>>> and > >>>>>>>>>> >>>>>>>>> run? > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply > >>>>>>>>>> won't > >>>>>>>>>> compile if > >>>>>>>>>> >>>>> used > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an > >>>>>>>>>> API as a > >>>>>>>>>> method chain > >>>>>>>>>> >>>>>>>> starting > >>>>>>>>>> >>>>>>>>>>>>> from > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost > >>>>>>>>>> difference > >>>>>>>>>> between > >>>>>>>>>> >>>>>>>> runtime > >>>>>>>>>> >>>>>>>>> and > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure > >>>>>>>>>> uncovers > >>>>>>>>>> >>>>> instantly > >>>>>>>>>> >>>>>> on > >>>>>>>>>> >>>>>>>>> unit > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project > >>>>>>>>>> than a > >>>>>>>>>> compilation > >>>>>>>>>> >>>>>>>> failure. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> Ivan > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Ivan, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal > >>>>>>>>>> operation being > >>>>>>>>>> required. > >>>>>>>>>> >>>>>>> But > >>>>>>>>>> >>>>>>>> is > >>>>>>>>>> >>>>>>>>>>>>> that > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> really > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't > >>>>>>>>>> want a > >>>>>>>>>> >>>>>> defaultBranch > >>>>>>>>>> >>>>>>>>> they > >>>>>>>>>> >>>>>>>>>>>>> can > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> call > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> some other terminal method > >>>>>>>>>> (noDefaultBranch()?) > >>>>>>>>>> just as > >>>>>>>>>> >>>>>>>>> easily. In > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> fact I > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a > >>>>>>>>>> nicer API > >>>>>>>>> - a > >>>>>>>>>> >>>>> user > >>>>>>>>>> >>>>>>>> could > >>>>>>>>>> >>>>>>>>>>>>> specify > >>>>>>>>>> >>>>>>>>>>>>>>>>> a > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing > >>>>>>>>>> will reach > >>>>>>>>> the > >>>>>>>>>> >>>>>> default > >>>>>>>>>> >>>>>>>>> branch, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case > >>>>>>>>>> occurs. > >>>>>>>>> That > >>>>>>>>>> >>>>> seems > >>>>>>>>>> >>>>>>> like > >>>>>>>>>> >>>>>>>>> an > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() > >>>>>>>>>> API, > >>>>>>>>>> which allows > >>>>>>>>>> >>>>>> for > >>>>>>>>>> >>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>> more > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> subtle > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly > >>>>>>>>>> getting > >>>>>>>>> dropped. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation > >>>>>>>>>> certainly has > >>>>>>>>>> to be > >>>>>>>>>> >>>>>> well > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> documented, but > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the > >>>>>>>>>> >>>>> InternalTopologyBuilder > >>>>>>>>>> >>>>>>> to > >>>>>>>>>> >>>>>>>>> track > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> dangling > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated > >>>>>>>>>> and raise > >>>>>>>>>> a clear > >>>>>>>>>> >>>>>>> error > >>>>>>>>>> >>>>>>>>>>>>> before it > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that > >>>>>>>>>> there is > >>>>>>>>> a > >>>>>>>>>> >>>>> "build > >>>>>>>>>> >>>>>>>> step" > >>>>>>>>>> >>>>>>>>>>>>> where > >>>>>>>>>> >>>>>>>>>>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when > >>>>>>>>>> >>>>>> StreamsBuilder.build() > >>>>>>>>>> >>>>>>> is > >>>>>>>>>> >>>>>>>>>>>>> called. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its > >>>>>>>>>> argument, I > >>>>>>>>> agree > >>>>>>>>>> >>>>> that > >>>>>>>>>> >>>>>>> it's > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> critical to > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on > >>>>>>>>>> the input > >>>>>>>>>> stream. > >>>>>>>>>> >>>>>>> With > >>>>>>>>>> >>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>>>>>> fluent > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same > >>>>>>>>>> way all > >>>>>>>>> other > >>>>>>>>>> >>>>>>> operations > >>>>>>>>>> >>>>>>>>> do - > >>>>>>>>>> >>>>>>>>>>>>> if > >>>>>>>>>> >>>>>>>>>>>>>>>>> you > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> want to process off the original > >>>>>>>>>> KStream > >>>>>>>>> multiple > >>>>>>>>>> >>>>> times, > >>>>>>>>>> >>>>>>> you > >>>>>>>>>> >>>>>>>>> just > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> need the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call > >>>>>>>>>> as many > >>>>>>>>>> operations > >>>>>>>>>> >>>>>> on > >>>>>>>>>> >>>>>>> it > >>>>>>>>>> >>>>>>>>> as > >>>>>>>>>> >>>>>>>>>>>>> you > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> desire. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thoughts? > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Paul > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan > >>>>>>>>>> Ponomarev < > >>>>>>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we > >>>>>>>>>> do not > >>>>>>>>>> always need > >>>>>>>>>> >>>>>> the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal > >>>>>>>>> operation we > >>>>>>>>>> >>>>> don't > >>>>>>>>>> >>>>>>>> know > >>>>>>>>>> >>>>>>>>>>>>> when to > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch > >>>>>>>>>> switch'. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its > >>>>>>>>>> argument, > >>>>>>>>>> so we > >>>>>>>>>> >>>>> can > >>>>>>>>>> >>>>>> do > >>>>>>>>>> >>>>>>>>>>>>> something > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> more with the original branch after > >>>>>>>>>> branching. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the > >>>>>>>>>> need of > >>>>>>>>> special > >>>>>>>>>> >>>>> object > >>>>>>>>>> >>>>>>>>>>>>> construction > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream > >>>>>>>>>> methods. > >>>>>>>>> But > >>>>>>>>>> >>>>> here > >>>>>>>>>> >>>>>> we > >>>>>>>>>> >>>>>>>>> have a > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to > >>>>>>>>>> split the > >>>>>>>>>> flow, > >>>>>>>>>> >>>>> so > >>>>>>>>>> >>>>>> I > >>>>>>>>>> >>>>>>>>> think > >>>>>>>>>> >>>>>>>>>>>>> this > >>>>>>>>>> >>>>>>>>>>>>>>>>> is > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> still idiomatic. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Ivan, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve > >>>>>>>>>> this > >>>>>>>>>> API, but I > >>>>>>>>>> >>>>>> find > >>>>>>>>>> >>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> onTopOff() > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing > >>>>>>>>>> since it > >>>>>>>>>> contrasts the > >>>>>>>>>> >>>>>>> fluency > >>>>>>>>>> >>>>>>>>> of > >>>>>>>>>> >>>>>>>>>>>>> other > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd > >>>>>>>>>> like to > >>>>>>>>>> just call > >>>>>>>>>> >>>>> a > >>>>>>>>>> >>>>>>>>> method on > >>>>>>>>>> >>>>>>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if > >>>>>>>>>> the branch > >>>>>>>>>> cases > >>>>>>>>>> >>>>> are > >>>>>>>>>> >>>>>>>>> defined > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> fluently. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, > >>>>>>>>>> handleCase) > >>>>>>>>>> is very > >>>>>>>>>> >>>>>> nice > >>>>>>>>>> >>>>>>>>> and the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> right > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> way > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped > >>>>>>>>>> around > >>>>>>>>>> how we > >>>>>>>>>> >>>>>>> specify > >>>>>>>>>> >>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>> source > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Like: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch() > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, > >>>>>>>>> this::handle1) > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, > >>>>>>>>> this::handle2) > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> .defaultBranch(this::handleDefault); > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a > >>>>>>>>>> KBranchedStreams or > >>>>>>>>>> >>>>>>>> KStreamBrancher > >>>>>>>>>> >>>>>>>>> or > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> something, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and > >>>>>>>>>> terminated by > >>>>>>>>>> >>>>>>>>>>>>> defaultBranch() > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> (which > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously > >>>>>>>>>> incompatible with > >>>>>>>>>> >>>>> the > >>>>>>>>>> >>>>>>>>> current > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> API, so > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to > >>>>>>>>>> have a > >>>>>>>>>> different > >>>>>>>>>> >>>>>> name, > >>>>>>>>>> >>>>>>>> but > >>>>>>>>>> >>>>>>>>> that > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> seems > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we > >>>>>>>>>> could call it > >>>>>>>>>> >>>>>> something > >>>>>>>>>> >>>>>>>> like > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> branched() > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> or > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the > >>>>>>>>>> old API. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of > >>>>>>>>>> your > >>>>>>>>>> KIP? It > >>>>>>>>>> >>>>>> seems > >>>>>>>>>> >>>>>>>>> like it > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> does to > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line > >>>>>>>>>> branching > >>>>>>>>>> while also > >>>>>>>>>> >>>>>>>> allowing > >>>>>>>>>> >>>>>>>>> you > >>>>>>>>>> >>>>>>>>>>>>> to > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of > >>>>>>>>>> KBranchedStreams > >>>>>>>>>> >>>>>> if > >>>>>>>>>> >>>>>>>>> desired. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Paul > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan > >>>>>>>>>> Ponomarev > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply! > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void > >>>>>>>>>> handleFirstCase(KStream<String, String> > >>>>>>>>>> ks){ > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> ks.filter(....).mapValues(...) > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void > >>>>>>>>>> handleSecondCase(KStream<String, > >>>>>>>>>> String> ks){ > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> ks.selectKey(...).groupByKey()... > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ...... > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, > >>>>>>>>>> String>() > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, > >>>>>>>>>> this::handleFirstCase) > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, > >>>>>>>>>> this::handleSecondCase) > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....) > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the > >>>>>>>>> KafkaStreamsBrancher > >>>>>>>>>> >>>>> takes a > >>>>>>>>>> >>>>>>>>> Consumer > >>>>>>>>>> >>>>>>>>>>>>> as a > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> second > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, > >>>>>>>>>> and the > >>>>>>>>>> example in > >>>>>>>>>> >>>>>> the > >>>>>>>>>> >>>>>>>> KIP > >>>>>>>>>> >>>>>>>>>>>>> shows > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> each > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a > >>>>>>>>>> terminal node > >>>>>>>>>> >>>>>>>>> (KafkaStreams#to() > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in this > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> case). > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but > >>>>>>>>>> how would > >>>>>>>>> we > >>>>>>>>>> >>>>> handle > >>>>>>>>>> >>>>>>> the > >>>>>>>>>> >>>>>>>>> case > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> where the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but > >>>>>>>>>> wants to > >>>>>>>>> continue > >>>>>>>>>> >>>>>>>> processing > >>>>>>>>>> >>>>>>>>> and > >>>>>>>>>> >>>>>>>>>>>>> not > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on > >>>>>>>>>> the > >>>>>>>>> branched > >>>>>>>>>> >>>>>> stream > >>>>>>>>>> >>>>>>>>>>>>> immediately? > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic > >>>>>>>>>> as is if > >>>>>>>>>> we had > >>>>>>>>>> >>>>>>>> something > >>>>>>>>>> >>>>>>>>> like > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] > >>>>>>>>>> branches = > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> predicate2); > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> branches[0].filter(....).mapValues(...).. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> branches[1].selectKey(...).groupByKey()..... > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks! > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bill > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM > >>>>>>>>>> Bill Bejeck > >>>>>>>>> < > >>>>>>>>>> >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the > >>>>>>>>>> discussion for > >>>>>>>>> KIP- > >>>>>>>>>> >>>>> 418. > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message: > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion > >>>>>>>>>> about > >>>>>>>>> KIP-418. > >>>>>>>>>> >>>>> Please > >>>>>>>>>> >>>>>>>> take > >>>>>>>>>> >>>>>>>>> a > >>>>>>>>>> >>>>>>>>>>>>> look > >>>>>>>>>> >>>>>>>>>>>>>>>>> at > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would > >>>>>>>>>> appreciate any > >>>>>>>>>> feedback :) > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418: > >>>>>>>>>> >>>>> > >>>>>>>>>> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: > >>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164: > >>>>>>>>>> >>>>> https://github.com/apache/kafka/pull/6164 > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>> > >>>>>>>>>> > > >>>>>>>>>> > >>>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > > > >