I am moving this KIP into "inactive status". Feel free to resume the KIP at any point.
If anybody else is interested in picking up this KIP, feel free to do so. -Matthias On 7/11/19 4:00 PM, Matthias J. Sax wrote: > Ivan, > > did you see my last reply? What do you think about my proposal to mix > both approaches and try to get best-of-both worlds? > > > -Matthias > > On 6/11/19 3:56 PM, Matthias J. Sax wrote: >> Thanks for the input John! >> >>> under your suggestion, it seems that the name is required >> >> If you want to get the `KStream` as part of the `Map` back using a >> `Function`, yes. If you follow the "embedded chaining" pattern using a >> `Consumer`, no. >> >> Allowing for a default name via `split()` can of course be done. >> Similarly, using `Named` instead of `String` is possible. >> >> I wanted to sketch out a high level proposal to merge both patterns >> only. Your suggestions to align the new API with the existing API make >> totally sense. >> >> >> >> One follow up question: Would `Named` be optional or required in >> `split()` and `branch()`? It's unclear from your example. >> >> If both are mandatory, what do we gain by it? The returned `Map` only >> contains the corresponding branches, so why should we prefix all of >> them? If only `Named` is mandatory in `branch()`, but optional in >> `split()`, the same question raises? >> >> Requiring `Named` in `split()` seems only to make sense, if `Named` is >> optional in `branch()` and we generate `-X` suffix using a counter for >> different branch name. However, this might lead to the problem of >> changing names if branches are added/removed. Also, how would the names >> be generated if `Consumer` is mixed in (ie, not all branches are >> returned in the `Map`). >> >> If `Named` is optional for both, it could happen that a user misses to >> specify a name for a branch what would lead to runtime issues. >> >> >> Hence, I am actually in favor to not allow a default name but keep >> `split()` without parameter and make `Named` in `branch()` required if a >> `Function` is used. This makes it explicit to the user that specifying a >> name is required if a `Function` is used. >> >> >> >> About >> >>> KBranchedStream#branch(BranchConfig) >> >> I don't think that the branching predicate is a configuration and hence >> would not include it in a configuration object. >> >>> withChain(...); >> >> Similar, `withChain()` (that would only take a `Consumer`?) does not >> seem to be a configuration. We can also not prevent a user to call >> `withName()` in combination of `withChain()` what does not make sense >> IMHO. We could of course throw an RTE but not have a compile time check >> seems less appealing. Also, it could happen that neither `withChain()` >> not `withName()` is called and the branch is missing in the returned >> `Map` what lead to runtime issues, too. >> >> Hence, I don't think that we should add `BranchConfig`. A config object >> is helpful if each configuration can be set independently of all others, >> but this seems not to be the case here. If we add new configuration >> later, we can also just move forward by deprecating the methods that >> accept `Named` and add new methods that accepted `BranchConfig` (that >> would of course implement `Named`). >> >> >> Thoughts? >> >> >> @Ivan, what do you think about the general idea to blend the two main >> approaches of returning a `Map` plus an "embedded chaining"? >> >> >> >> -Matthias >> >> >> >> On 6/4/19 10:33 AM, John Roesler wrote: >>> Thanks for the idea, Matthias, it does seem like this would satisfy >>> everyone. Returning the map from the terminal operations also solves >>> the problem of merging/joining the branched streams, if we want to add >>> support for the compliment later on. >>> >>> Under your suggestion, it seems that the name is required. Otherwise, >>> we wouldn't have keys for the map to return. I this this is actually >>> not too bad, since experience has taught us that, although names for >>> operations are not required to define stream processing logic, it does >>> significantly improve the operational experience when you can map the >>> topology, logs, metrics, etc. back to the source code. Since you >>> wouldn't (have to) reference the name to chain extra processing onto >>> the branch (thanks to the second argument), you can avoid the >>> "unchecked name" problem that Ivan pointed out. >>> >>> In the current implementation of Branch, you can name the branch >>> operator itself, and then all the branches get index-suffixed names >>> built from the branch operator name. I guess under this proposal, we >>> could naturally append the branch name to the branching operator name, >>> like this: >>> >>> stream.split(Named.withName("mysplit")) //creates node "mysplit" >>> .branch(..., ..., "abranch") // creates node "mysplit-abranch" >>> .defaultBranch(...) // creates node "mysplit-default" >>> >>> It does make me wonder about the DSL syntax itself, though. >>> >>> We don't have a defined grammar, so there's plenty of room to debate >>> the "best" syntax in the context of each operation, but in general, >>> the KStream DSL operators follow this pattern: >>> >>> operator(function, config_object?) OR operator(config_object) >>> >>> where config_object is often just Named in the "function" variant. >>> Even when the config_object isn't a Named, but some other config >>> class, that config class _always_ implements NamedOperation. >>> >>> Here, we're introducing a totally different pattern: >>> >>> operator(function, function, string) >>> >>> where the string is the name. >>> My first question is whether the name should instead be specified with >>> the NamedOperation interface. >>> >>> My second question is whether we should just roll all these arguments >>> up into a config object like: >>> >>> KBranchedStream#branch(BranchConfig) >>> >>> interface BranchConfig extends NamedOperation { >>> withPredicate(...); >>> withChain(...); >>> withName(...); >>> } >>> >>> Although I guess we'd like to call BranchConfig something more like >>> "Branched", even if I don't particularly like that pattern. >>> >>> This makes the source code a little noisier, but it also makes us more >>> future-proof, as we can deal with a wide range of alternatives purely >>> in the config interface, and never have to deal with adding overloads >>> to the KBranchedStream if/when we decide we want the name to be >>> optional, or the KStream->KStream to be optional. >>> >>> WDYT? >>> >>> Thanks, >>> -John >>> >>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis >>> <michael.droga...@confluent.io> wrote: >>>> >>>> Matthias: I think that's pretty reasonable from my point of view. Good >>>> suggestion. >>>> >>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> Interesting discussion. >>>>> >>>>> I am wondering, if we cannot unify the advantage of both approaches: >>>>> >>>>> >>>>> >>>>> KStream#split() -> KBranchedStream >>>>> >>>>> // branch is not easily accessible in current scope >>>>> KBranchedStream#branch(Predicate, Consumer<KStream>) >>>>> -> KBranchedStream >>>>> >>>>> // assign a name to the branch and >>>>> // return the sub-stream to the current scope later >>>>> // >>>>> // can be simple as `#branch(p, s->s, "name")` >>>>> // or also complex as `#branch(p, s->s.filter(...), "name")` >>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String) >>>>> -> KBranchedStream >>>>> >>>>> // default branch is not easily accessible >>>>> // return map of all named sub-stream into current scope >>>>> KBranchedStream#default(Cosumer<KStream>) >>>>> -> Map<String,KStream> >>>>> >>>>> // assign custom name to default-branch >>>>> // return map of all named sub-stream into current scope >>>>> KBranchedStream#default(Function<KStream,KStream>, String) >>>>> -> Map<String,KStream> >>>>> >>>>> // assign a default name for default >>>>> // return map of all named sub-stream into current scope >>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>) >>>>> -> Map<String,KStream> >>>>> >>>>> // return map of all names sub-stream into current scope >>>>> KBranchedStream#noDefaultBranch() >>>>> -> Map<String,KStream> >>>>> >>>>> >>>>> >>>>> Hence, for each sub-stream, the user can pick to add a name and return >>>>> the branch "result" to the calling scope or not. The implementation can >>>>> also check at runtime that all returned names are unique. The returned >>>>> Map can be empty and it's also optional to use the Map. >>>>> >>>>> To me, it seems like a good way to get best of both worlds. >>>>> >>>>> Thoughts? >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> >>>>> On 5/6/19 5:15 PM, John Roesler wrote: >>>>>> Ivan, >>>>>> >>>>>> That's a very good point about the "start" operator in the dynamic case. >>>>>> I had no problem with "split()"; I was just questioning the necessity. >>>>>> Since you've provided a proof of necessity, I'm in favor of the >>>>>> "split()" start operator. Thanks! >>>>>> >>>>>> Separately, I'm interested to see where the present discussion leads. >>>>>> I've written enough Javascript code in my life to be suspicious of >>>>>> nested closures. You have a good point about using method references (or >>>>>> indeed function literals also work). It should be validating that this >>>>>> was also the JS community's first approach to flattening the logic when >>>>>> their nested closure situation got out of hand. Unfortunately, it's >>>>>> replacing nesting with redirection, both of which disrupt code >>>>>> readability (but in different ways for different reasons). In other >>>>>> words, I agree that function references is *the* first-order solution if >>>>>> the nested code does indeed become a problem. >>>>>> >>>>>> However, the history of JS also tells us that function references aren't >>>>>> the end of the story either, and you can see that by observing that >>>>>> there have been two follow-on eras, as they continue trying to cope with >>>>>> the consequences of living in such a callback-heavy language. First, you >>>>>> have Futures/Promises, which essentially let you convert nested code to >>>>>> method-chained code (Observables/FP is a popular variation on this). >>>>>> Most lately, you have async/await, which is an effort to apply language >>>>>> (not just API) syntax to the problem, and offer the "flattest" possible >>>>>> programming style to solve the problem (because you get back to just one >>>>>> code block per functional unit). >>>>>> >>>>>> Stream-processing is a different domain, and Java+KStreams is nowhere >>>>>> near as callback heavy as JS, so I don't think we have to take the JS >>>>>> story for granted, but then again, I think we can derive some valuable >>>>>> lessons by looking sideways to adjacent domains. I'm just bringing this >>>>>> up to inspire further/deeper discussion. At the same time, just like JS, >>>>>> we can afford to take an iterative approach to the problem. >>>>>> >>>>>> Separately again, I'm interested in the post-branch merge (and I'd also >>>>>> add join) problem that Paul brought up. We can clearly punt on it, by >>>>>> terminating the nested branches with sink operators. But is there a DSL >>>>>> way to do it? >>>>>> >>>>>> Thanks again for your driving this, >>>>>> -John >>>>>> >>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com >>>>>> <mailto:pgwha...@gmail.com>> wrote: >>>>>> >>>>>> Ivan, I’ll definitely forfeit my point on the clumsiness of the >>>>>> branch(predicate, consumer) solution, I don’t see any real drawbacks >>>>>> for the dynamic case. >>>>>> >>>>>> IMO the one trade off to consider at this point is the scope >>>>>> question. I don’t know if I totally agree that “we rarely need them >>>>>> in the same scope” since merging the branches back together later >>>>>> seems like a perfectly plausible use case that can be a lot nicer >>>>>> when the branched streams are in the same scope. That being said, >>>>>> for the reasons Ivan listed, I think it is overall the better >>>>>> solution - working around the scope thing is easy enough if you need >>>>>> to. >>>>>> >>>>>> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev >>>>>> <iponoma...@mail.ru.invalid> wrote: >>>>>> > >>>>>> > Hello everyone, thank you all for joining the discussion! >>>>>> > >>>>>> > Well, I don't think the idea of named branches, be it a >>>>>> LinkedHashMap (no other Map will do, because order of definition >>>>>> matters) or `branch` method taking name and Consumer has more >>>>>> advantages than drawbacks. >>>>>> > >>>>>> > In my opinion, the only real positive outcome from Michael's >>>>>> proposal is that all the returned branches are in the same scope. >>>>>> But 1) we rarely need them in the same scope 2) there is a >>>>>> workaround for the scope problem, described in the KIP. >>>>>> > >>>>>> > 'Inlining the complex logic' is not a problem, because we can use >>>>>> method references instead of lambdas. In real world scenarios you >>>>>> tend to split the complex logic to methods anyway, so the code is >>>>>> going to be clean. >>>>>> > >>>>>> > The drawbacks are strong. The cohesion between predicates and >>>>>> handlers is lost. We have to define predicates in one place, and >>>>>> handlers in another. This opens the door for bugs: >>>>>> > >>>>>> > - what if we forget to define a handler for a name? or a name for >>>>>> a handler? >>>>>> > - what if we misspell a name? >>>>>> > - what if we copy-paste and duplicate a name? >>>>>> > >>>>>> > What Michael propose would have been totally OK if we had been >>>>>> writing the API in Lua, Ruby or Python. In those languages the >>>>>> "dynamic naming" approach would have looked most concise and >>>>>> beautiful. But in Java we expect all the problems related to >>>>>> identifiers to be eliminated in compile time. >>>>>> > >>>>>> > Do we have to invent duck-typing for the Java API? >>>>>> > >>>>>> > And if we do, what advantage are we supposed to get besides having >>>>>> all the branches in the same scope? Michael, maybe I'm missing your >>>>>> point? >>>>>> > >>>>>> > --- >>>>>> > >>>>>> > Earlier in this discussion John Roesler also proposed to do >>>>>> without "start branching" operator, and later Paul mentioned that in >>>>>> the case when we have to add a dynamic number of branches, the >>>>>> current KIP is 'clumsier' compared to Michael's 'Map' solution. Let >>>>>> me address both comments here. >>>>>> > >>>>>> > 1) "Start branching" operator (I think that *split* is a good name >>>>>> for it indeed) is critical when we need to do a dynamic branching, >>>>>> see example below. >>>>>> > >>>>>> > 2) No, dynamic branching in current KIP is not clumsy at all. >>>>>> Imagine a real-world scenario when you need one branch per enum >>>>>> value (say, RecordType). You can have something like this: >>>>>> > >>>>>> > /*John:if we had to start with stream.branch(...) here, it would >>>>>> have been much messier.*/ >>>>>> > KBranchedStream branched = stream.split(); >>>>>> > >>>>>> > /*Not clumsy at all :-)*/ >>>>>> > for (RecordType recordType : RecordType.values()) >>>>>> > branched = branched.branch((k, v) -> v.getRecType() == >>>>>> recordType, >>>>>> > recordType::processRecords); >>>>>> > >>>>>> > Regards, >>>>>> > >>>>>> > Ivan >>>>>> > >>>>>> > >>>>>> > 02.05.2019 14:40, Matthias J. Sax пишет: >>>>>> >> I also agree with Michael's observation about the core problem of >>>>>> >> current `branch()` implementation. >>>>>> >> >>>>>> >> However, I also don't like to pass in a clumsy Map object. My >>>>>> thinking >>>>>> >> was more aligned with Paul's proposal to just add a name to each >>>>>> >> `branch()` statement and return a `Map<String,KStream>`. >>>>>> >> >>>>>> >> It makes the code easier to read, and also make the order of >>>>>> >> `Predicates` (that is essential) easier to grasp. >>>>>> >> >>>>>> >>>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>>> >>>>>> .branch("branchOne", Predicate<K, V>) >>>>>> >>>>>> .branch( "branchTwo", Predicate<K, V>) >>>>>> >>>>>> .defaultBranch("defaultBranch"); >>>>>> >> An open question is the case for which no defaultBranch() should >>>>> be >>>>>> >> specified. Atm, `split()` and `branch()` would return >>>>>> `BranchedKStream` >>>>>> >> and the call to `defaultBranch()` that returns the `Map` is >>>>> mandatory >>>>>> >> (what is not the case atm). Or is this actually not a real >>>>> problem, >>>>>> >> because users can just ignore the branch returned by >>>>>> `defaultBranch()` >>>>>> >> in the result `Map` ? >>>>>> >> >>>>>> >> >>>>>> >> About "inlining": So far, it seems to be a matter of personal >>>>>> >> preference. I can see arguments for both, but no "killer >>>>>> argument" yet >>>>>> >> that clearly make the case for one or the other. >>>>>> >> >>>>>> >> >>>>>> >> -Matthias >>>>>> >> >>>>>> >>> On 5/1/19 6:26 PM, Paul Whalen wrote: >>>>>> >>> Perhaps inlining is the wrong terminology. It doesn’t require >>>>>> that a lambda with the full downstream topology be defined inline - >>>>>> it can be a method reference as with Ivan’s original suggestion. >>>>>> The advantage of putting the predicate and its downstream logic >>>>>> (Consumer) together in branch() is that they are required to be near >>>>>> to each other. >>>>>> >>> >>>>>> >>> Ultimately the downstream code has to live somewhere, and deep >>>>>> branch trees will be hard to read regardless. >>>>>> >>> >>>>>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis >>>>>> <michael.droga...@confluent.io >>>>>> <mailto:michael.droga...@confluent.io>> wrote: >>>>>> >>>> >>>>>> >>>> I'm less enthusiastic about inlining the branch logic with its >>>>>> downstream >>>>>> >>>> functionality. Programs that have deep branch trees will >>>>>> quickly become >>>>>> >>>> harder to read as a single unit. >>>>>> >>>> >>>>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen >>>>>> <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote: >>>>>> >>>>> >>>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, I think >>>>>> that sets a >>>>>> >>>>> great framework for the discussion. >>>>>> >>>>> >>>>>> >>>>> Regarding the SortedMap solution, my understanding is that the >>>>>> current >>>>>> >>>>> proposal in the KIP is what is in my PR which (pending naming >>>>>> decisions) is >>>>>> >>>>> roughly this: >>>>>> >>>>> >>>>>> >>>>> stream.split() >>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>>> >>>>> .defaultBranch(Consumer<KStream<K, V>>); >>>>>> >>>>> >>>>>> >>>>> Obviously some ordering is necessary, since branching as a >>>>>> construct >>>>>> >>>>> doesn't work without it, but this solution seems like it >>>>>> provides as much >>>>>> >>>>> associativity as the SortedMap solution, because each branch() >>>>>> call >>>>>> >>>>> directly associates the "conditional" with the "code block." >>>>>> The value it >>>>>> >>>>> provides over the KIP solution is the accessing of streams in >>>>>> the same >>>>>> >>>>> scope. >>>>>> >>>>> >>>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap solution >>>>>> in the sense >>>>>> >>>>> that it is slightly clumsier to add a dynamic number of >>>>>> branches, but it is >>>>>> >>>>> certainly possible. It seems to me like the API should favor >>>>>> the "static" >>>>>> >>>>> case anyway, and should make it simple and readable to >>>>>> fluently declare and >>>>>> >>>>> access your branches in-line. It also makes it impossible to >>>>>> ignore a >>>>>> >>>>> branch, and it is possible to build an (almost) identical >>>>>> SortedMap >>>>>> >>>>> solution on top of it. >>>>>> >>>>> >>>>>> >>>>> I could also see a middle ground where instead of a raw >>>>>> SortedMap being >>>>>> >>>>> taken in, branch() takes a name and not a Consumer. Something >>>>>> like this: >>>>>> >>>>> >>>>>> >>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>>> >>>>> .branch("branchOne", Predicate<K, V>) >>>>>> >>>>> .branch( "branchTwo", Predicate<K, V>) >>>>>> >>>>> .defaultBranch("defaultBranch", Consumer<KStream<K, V>>); >>>>>> >>>>> >>>>>> >>>>> Pros for that solution: >>>>>> >>>>> - accessing branched KStreams in same scope >>>>>> >>>>> - no double brace initialization, hopefully slightly more >>>>>> readable than >>>>>> >>>>> SortedMap >>>>>> >>>>> >>>>>> >>>>> Cons >>>>>> >>>>> - downstream branch logic cannot be specified inline which >>>>>> makes it harder >>>>>> >>>>> to read top to bottom (like existing API and SortedMap, but >>>>>> unlike the KIP) >>>>>> >>>>> - you can forget to "handle" one of the branched streams (like >>>>>> existing >>>>>> >>>>> API and SortedMap, but unlike the KIP) >>>>>> >>>>> >>>>>> >>>>> (KBranchedStreams could even work *both* ways but perhaps >>>>>> that's overdoing >>>>>> >>>>> it). >>>>>> >>>>> >>>>>> >>>>> Overall I'm curious how important it is to be able to easily >>>>>> access the >>>>>> >>>>> branched KStream in the same scope as the original. It's >>>>>> possible that it >>>>>> >>>>> doesn't need to be handled directly by the API, but instead >>>>>> left up to the >>>>>> >>>>> user. I'm sort of in the middle on it. >>>>>> >>>>> >>>>>> >>>>> Paul >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman >>>>>> <sop...@confluent.io <mailto:sop...@confluent.io>> >>>>>> >>>>> wrote: >>>>>> >>>>> >>>>>> >>>>>> I'd like to +1 what Michael said about the issues with the >>>>>> existing >>>>>> >>>>> branch >>>>>> >>>>>> method, I agree with what he's outlined and I think we should >>>>>> proceed by >>>>>> >>>>>> trying to alleviate these problems. Specifically it seems >>>>>> important to be >>>>>> >>>>>> able to cleanly access the individual branches (eg by mapping >>>>>> >>>>>> name->stream), which I thought was the original intention of >>>>>> this KIP. >>>>>> >>>>>> >>>>>> >>>>>> That said, I don't think we should so easily give in to the >>>>>> double brace >>>>>> >>>>>> anti-pattern or force ours users into it if at all possible to >>>>>> >>>>> avoid...just >>>>>> >>>>>> my two cents. >>>>>> >>>>>> >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Sophie >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < >>>>>> >>>>>> michael.droga...@confluent.io >>>>>> <mailto:michael.droga...@confluent.io>> wrote: >>>>>> >>>>>> >>>>>> >>>>>>> I’d like to propose a different way of thinking about this. >>>>>> To me, >>>>>> >>>>> there >>>>>> >>>>>>> are three problems with the existing branch signature: >>>>>> >>>>>>> >>>>>> >>>>>>> 1. If you use it the way most people do, Java raises unsafe >>>>> type >>>>>> >>>>>> warnings. >>>>>> >>>>>>> 2. The way in which you use the stream branches is >>>>>> positionally coupled >>>>>> >>>>>> to >>>>>> >>>>>>> the ordering of the conditionals. >>>>>> >>>>>>> 3. It is brittle to extend existing branch calls with >>>>>> additional code >>>>>> >>>>>>> paths. >>>>>> >>>>>>> >>>>>> >>>>>>> Using associative constructs instead of relying on ordered >>>>>> constructs >>>>>> >>>>>> would >>>>>> >>>>>>> be a stronger approach. Consider a signature that instead >>>>>> looks like >>>>>> >>>>>> this: >>>>>> >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String, >>>>>> Predicate<? >>>>>> >>>>>>> super K,? super V>>); >>>>>> >>>>>>> >>>>>> >>>>>>> Branches are given names in a map, and as a result, the API >>>>>> returns a >>>>>> >>>>>>> mapping of names to streams. The ordering of the >>>>> conditionals is >>>>>> >>>>>> maintained >>>>>> >>>>>>> because it’s a sorted map. Insert order determines the order >>>>> of >>>>>> >>>>>> evaluation. >>>>>> >>>>>>> This solves problem 1 because there are no more varargs. It >>>>>> solves >>>>>> >>>>>> problem >>>>>> >>>>>>> 2 because you no longer lean on ordering to access the >>>>>> branch you’re >>>>>> >>>>>>> interested in. It solves problem 3 because you can introduce >>>>>> another >>>>>> >>>>>>> conditional by simply attaching another name to the >>>>>> structure, rather >>>>>> >>>>>> than >>>>>> >>>>>>> messing with the existing indices. >>>>>> >>>>>>> >>>>>> >>>>>>> One of the drawbacks is that creating the map inline is >>>>>> historically >>>>>> >>>>>>> awkward in Java. I know it’s an anti-pattern to use >>>>>> voluminously, but >>>>>> >>>>>>> double brace initialization would clean up the aesthetics. >>>>>> >>>>>>> >>>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler >>>>>> <j...@confluent.io <mailto:j...@confluent.io>> >>>>>> >>>>> wrote: >>>>>> >>>>>>>> Hi Ivan, >>>>>> >>>>>>>> >>>>>> >>>>>>>> Thanks for the update. >>>>>> >>>>>>>> >>>>>> >>>>>>>> FWIW, I agree with Matthias that the current "start >>>>> branching" >>>>>> >>>>> operator >>>>>> >>>>>>> is >>>>>> >>>>>>>> confusing when named the same way as the actual branches. >>>>>> "Split" >>>>>> >>>>> seems >>>>>> >>>>>>>> like a good name. Alternatively, we can do without a "start >>>>>> >>>>> branching" >>>>>> >>>>>>>> operator at all, and just do: >>>>>> >>>>>>>> >>>>>> >>>>>>>> stream >>>>>> >>>>>>>> .branch(Predicate) >>>>>> >>>>>>>> .branch(Predicate) >>>>>> >>>>>>>> .defaultBranch(); >>>>>> >>>>>>>> >>>>>> >>>>>>>> Tentatively, I think that this branching operation should be >>>>>> >>>>> terminal. >>>>>> >>>>>>> That >>>>>> >>>>>>>> way, we don't create ambiguity about how to use it. That >>>>>> is, `branch` >>>>>> >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is >>>>>> `void`, to >>>>>> >>>>>>>> enforce that it comes last, and that there is only one >>>>>> definition of >>>>>> >>>>>> the >>>>>> >>>>>>>> default branch. Potentially, we should log a warning if >>>>>> there's no >>>>>> >>>>>>> default, >>>>>> >>>>>>>> and additionally log a warning (or throw an exception) if a >>>>>> record >>>>>> >>>>>> falls >>>>>> >>>>>>>> though with no default. >>>>>> >>>>>>>> >>>>>> >>>>>>>> Thoughts? >>>>>> >>>>>>>> >>>>>> >>>>>>>> Thanks, >>>>>> >>>>>>>> -John >>>>>> >>>>>>>> >>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < >>>>>> >>>>> matth...@confluent.io <mailto:matth...@confluent.io> >>>>>> >>>>>>>> wrote: >>>>>> >>>>>>>> >>>>>> >>>>>>>>> Thanks for updating the KIP and your answers. >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> >>>>>>>>>> this is to make the name similar to String#split >>>>>> >>>>>>>>>>> that also returns an array, right? >>>>>> >>>>>>>>> The intend was to avoid name duplication. The return type >>>>>> should >>>>>> >>>>>> _not_ >>>>>> >>>>>>>>> be an array. >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> The current proposal is >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> stream.branch() >>>>>> >>>>>>>>> .branch(Predicate) >>>>>> >>>>>>>>> .branch(Predicate) >>>>>> >>>>>>>>> .defaultBranch(); >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> IMHO, this reads a little odd, because the first >>>>>> `branch()` does >>>>>> >>>>> not >>>>>> >>>>>>>>> take any parameters and has different semantics than the >>>>> later >>>>>> >>>>>>>>> `branch()` calls. Note, that from the code snippet above, >>>>> it's >>>>>> >>>>> hidden >>>>>> >>>>>>>>> that the first call is `KStream#branch()` while the others >>>>> are >>>>>> >>>>>>>>> `KBranchedStream#branch()` what makes reading the code >>>>> harder. >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`, >>>>>> I though >>>>>> >>>>>> it >>>>>> >>>>>>>>> might be better to also rename `KStream#branch()` to avoid >>>>> the >>>>>> >>>>> naming >>>>>> >>>>>>>>> overlap that seems to be confusing. The following reads >>>>> much >>>>>> >>>>> cleaner >>>>>> >>>>>> to >>>>>> >>>>>>>> me: >>>>>> >>>>>>>>> stream.split() >>>>>> >>>>>>>>> .branch(Predicate) >>>>>> >>>>>>>>> .branch(Predicate) >>>>>> >>>>>>>>> .defaultBranch(); >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> Maybe there is a better alternative to `split()` though to >>>>>> avoid >>>>>> >>>>> the >>>>>> >>>>>>>>> naming overlap. >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately >>>>> we >>>>>> >>>>> cannot >>>>>> >>>>>>> have >>>>>> >>>>>>>>> a method with such name :-) >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up >>>>>> with a >>>>>> >>>>> short >>>>>> >>>>>>>> name? >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP >>>>>> with all >>>>>> >>>>> it's >>>>>> >>>>>>>>> methods? It will be part of public API and should be >>>>>> contained in >>>>>> >>>>> the >>>>>> >>>>>>>>> KIP. For example, it's unclear atm, what the return type of >>>>>> >>>>>>>>> `defaultBranch()` is. >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> You did not comment on the idea to add a >>>>>> `KBranchedStream#get(int >>>>>> >>>>>>> index) >>>>>> >>>>>>>>> -> KStream` method to get the individually >>>>>> branched-KStreams. Would >>>>>> >>>>>> be >>>>>> >>>>>>>>> nice to get your feedback about it. It seems you suggest >>>>>> that users >>>>>> >>>>>>>>> would need to write custom utility code otherwise, to >>>>>> access them. >>>>>> >>>>> We >>>>>> >>>>>>>>> should discuss the pros and cons of both approaches. It >>>>> feels >>>>>> >>>>>>>>> "incomplete" to me atm, if the API has no built-in support >>>>>> to get >>>>>> >>>>> the >>>>>> >>>>>>>>> branched-KStreams directly. >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> -Matthias >>>>>> >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: >>>>>> >>>>>>>>>> Hi all! >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> I have updated the KIP-418 according to the new vision. >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> Matthias, thanks for your comment! >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split() >>>>>> >>>>>>>>>> I can see your point: this is to make the name similar to >>>>>> >>>>>>> String#split >>>>>> >>>>>>>>>> that also returns an array, right? But is it worth the >>>>>> loss of >>>>>> >>>>>>>> backwards >>>>>> >>>>>>>>>> compatibility? We can have overloaded branch() as well >>>>>> without >>>>>> >>>>>>>> affecting >>>>>> >>>>>>>>>> the existing code. Maybe the old array-based `branch` >>>>> method >>>>>> >>>>> should >>>>>> >>>>>>> be >>>>>> >>>>>>>>>> deprecated, but this is a subject for discussion. >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() -> >>>>>> >>>>> BranchingKStream#branch(), >>>>>> >>>>>>>>>> KBranchedStream#defaultBranch() -> >>>>> BranchingKStream#default() >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default' >>>>> is, >>>>>> >>>>>>> however, a >>>>>> >>>>>>>>>> reserved word, so unfortunately we cannot have a method >>>>>> with such >>>>>> >>>>>>> name >>>>>> >>>>>>>>> :-) >>>>>> >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument, >>>>> but I >>>>>> >>>>> think >>>>>> >>>>>>> that >>>>>> >>>>>>>>>> is not required? >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste error or >>>>>> something. >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> Dear colleagues, >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> please revise the new version of the KIP and Paul's PR >>>>>> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512) >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> Any new suggestions/objections? >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> Regards, >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> Ivan >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> >>>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет: >>>>>> >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems >>>>> that >>>>>> >>>>>>> everybody >>>>>> >>>>>>>>>>> agrees that the current branch() method using arrays is >>>>> not >>>>>> >>>>>> optimal. >>>>>> >>>>>>>>>>> I had a quick look into the PR and I like the overall >>>>>> proposal. >>>>>> >>>>>>> There >>>>>> >>>>>>>>>>> are some minor things we need to consider. I would >>>>>> recommend the >>>>>> >>>>>>>>>>> following renaming: >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> KStream#branch() -> #split() >>>>>> >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch() >>>>>> >>>>>>>>>>> KBranchedStream#defaultBranch() -> >>>>>> BranchingKStream#default() >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter method >>>>> names. >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an >>>>>> `Predicate` as >>>>>> >>>>>>>> argument, >>>>>> >>>>>>>>>>> but I think that is not required? >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was recently >>>>>> accepted and >>>>>> >>>>>> is >>>>>> >>>>>>>>>>> currently implemented: >>>>>> >>>>>>>>>>> >>>>>> >>>>> >>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>> >>>>>>>>>>> Ie, we should add overloads that accepted a `Named` >>>>>> parameter. >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> For the issue that the created `KStream` object are in >>>>>> different >>>>>> >>>>>>>> scopes: >>>>>> >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int >>>>>> index)` method >>>>>> >>>>>>> that >>>>>> >>>>>>>>>>> returns the corresponding "branched" result `KStream` >>>>>> object? >>>>>> >>>>>> Maybe, >>>>>> >>>>>>>> the >>>>>> >>>>>>>>>>> second argument of `addBranch()` should not be a >>>>>> >>>>>> `Consumer<KStream>` >>>>>> >>>>>>>> but >>>>>> >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return >>>>>> whatever >>>>>> >>>>>> the >>>>>> >>>>>>>>>>> `Function` returns? >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP with the >>>>>> current >>>>>> >>>>>>>>>>> proposal. That makes it easier to review. >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> -Matthias >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: >>>>>> >>>>>>>>>>>> Ivan, >>>>>> >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it >>>>>> makes sense >>>>>> >>>>>> for >>>>>> >>>>>>>> you >>>>>> >>>>>>>>> to >>>>>> >>>>>>>>>>>> revise the KIP and continue the discussion. Obviously >>>>>> we'll >>>>>> >>>>> need >>>>>> >>>>>>>> some >>>>>> >>>>>>>>>>>> buy-in from committers that have actual binding votes on >>>>>> >>>>> whether >>>>>> >>>>>>> the >>>>>> >>>>>>>>> KIP >>>>>> >>>>>>>>>>>> could be adopted. It would be great to hear if they >>>>>> think this >>>>>> >>>>>> is >>>>>> >>>>>>> a >>>>>> >>>>>>>>> good >>>>>> >>>>>>>>>>>> idea overall. I'm not sure if that happens just by >>>>>> starting a >>>>>> >>>>>>> vote, >>>>>> >>>>>>>>> or if >>>>>> >>>>>>>>>>>> there is generally some indication of interest >>>>> beforehand. >>>>>> >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> That being said, I'll continue the discussion a bit: >>>>>> assuming >>>>>> >>>>> we >>>>>> >>>>>> do >>>>>> >>>>>>>>> move >>>>>> >>>>>>>>>>>> forward the solution of "stream.branch() returns >>>>>> >>>>>> KBranchedStream", >>>>>> >>>>>>> do >>>>>> >>>>>>>>> we >>>>>> >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? I >>>>> would >>>>>> >>>>> favor >>>>>> >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs >>>>> that >>>>>> >>>>>>> accomplish >>>>>> >>>>>>>>> the >>>>>> >>>>>>>>>>>> same thing is confusing, especially when they're fairly >>>>>> similar >>>>>> >>>>>>>>> anyway. We >>>>>> >>>>>>>>>>>> just need to be sure we're not making something >>>>>> >>>>>>> impossible/difficult >>>>>> >>>>>>>>> that >>>>>> >>>>>>>>>>>> is currently possible/easy. >>>>>> >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> Regarding my PR - I think the general structure would >>>>> work, >>>>>> >>>>> it's >>>>>> >>>>>>>> just a >>>>>> >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In >>>>>> >>>>>>> particular, >>>>>> >>>>>>>>>>>> passing in the "predicates" and "children" lists which >>>>> get >>>>>> >>>>>> modified >>>>>> >>>>>>>> in >>>>>> >>>>>>>>>>>> KBranchedStream but read from all the way >>>>>> KStreamLazyBranch is >>>>>> >>>>> a >>>>>> >>>>>>> bit >>>>>> >>>>>>>>>>>> complicated to follow. >>>>>> >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> Thanks, >>>>>> >>>>>>>>>>>> Paul >>>>>> >>>>>>>>>>>> >>>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev < >>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>> >>>>>>>>> wrote: >>>>>> >>>>>>>>>>>>> Hi Paul! >>>>>> >>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully >>>>>> convinced: your >>>>>> >>>>>>>> proposal >>>>>> >>>>>>>>>>>>> looks better and should work. We just have to document >>>>> the >>>>>> >>>>>> crucial >>>>>> >>>>>>>>> fact >>>>>> >>>>>>>>>>>>> that KStream consumers are invoked as they're added. >>>>>> And then >>>>>> >>>>>> it's >>>>>> >>>>>>>> all >>>>>> >>>>>>>>>>>>> going to be very nice. >>>>>> >>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and >>>>>> resume the >>>>>> >>>>>>>>>>>>> discussion here, right? >>>>>> >>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a >>>>>> >>>>> starting >>>>>> >>>>>>>> point >>>>>> >>>>>>>>> if >>>>>> >>>>>>>>>>>>> we go in this direction'? To me it looks like a good >>>>>> starting >>>>>> >>>>>>> point. >>>>>> >>>>>>>>> But >>>>>> >>>>>>>>>>>>> as a novice in this project I might miss some important >>>>>> >>>>> details. >>>>>> >>>>>>>>>>>>> Regards, >>>>>> >>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>> Ivan >>>>>> >>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет: >>>>>> >>>>>>>>>>>>>> Ivan, >>>>>> >>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the >>>>>> >>>>> stream.branch() >>>>>> >>>>>>>>> solution >>>>>> >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be >>>>>> >>>>> invoked >>>>>> >>>>>> as >>>>>> >>>>>>>>> they’re >>>>>> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user >>>>>> still >>>>>> >>>>>> ought >>>>>> >>>>>>> to >>>>>> >>>>>>>>> be >>>>>> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and >>>>>> depend on >>>>>> >>>>> the >>>>>> >>>>>>>>> branched >>>>>> >>>>>>>>>>>>> streams having been set. >>>>>> >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to >>>>>> access >>>>>> >>>>> the >>>>>> >>>>>>>>> branched >>>>>> >>>>>>>>>>>>> streams in the same scope as the original stream (that >>>>>> is, not >>>>>> >>>>>>>> inside >>>>>> >>>>>>>>> the >>>>>> >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed >>>>>> >>>>> solutions. >>>>>> >>>>>> It >>>>>> >>>>>>>>> can be >>>>>> >>>>>>>>>>>>> worked around though. >>>>>> >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m >>>>>> excited >>>>>> >>>>> to >>>>>> >>>>>>>> hear >>>>>> >>>>>>>>>>>>> your thoughts!] >>>>>> >>>>>>>>>>>>>> Paul >>>>>> >>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev < >>>>>> >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>> >>>>>>>>> wrote: >>>>>> >>>>>>>>>>>>>>> Hi Paul! >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the >>>>>> >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at >>>>> first >>>>>> >>>>> glance, >>>>>> >>>>>>> but >>>>>> >>>>>>>>> --- >>>>>> >>>>>>>>>>>>>>>> the newly branched streams are not available in the >>>>>> same >>>>>> >>>>>> scope >>>>>> >>>>>>> as >>>>>> >>>>>>>>> each >>>>>> >>>>>>>>>>>>> other. That is, if we wanted to merge them back >>>>> together >>>>>> >>>>> again >>>>>> >>>>>> I >>>>>> >>>>>>>>> don't see >>>>>> >>>>>>>>>>>>> a way to do that. >>>>>> >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was >>>>>> just >>>>>> >>>>>> going >>>>>> >>>>>>> to >>>>>> >>>>>>>>>>>>> write in details about this issue. >>>>>> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say >>>>>> we need >>>>>> >>>>> to >>>>>> >>>>>>>>> identify >>>>>> >>>>>>>>>>>>> customers who have bought coffee and made a purchase >>>>>> in the >>>>>> >>>>>>>>> electronics >>>>>> >>>>>>>>>>>>> store to give them coupons. >>>>>> >>>>>>>>>>>>>>> This is the code I usually write under these >>>>>> circumstances >>>>>> >>>>>> using >>>>>> >>>>>>>> my >>>>>> >>>>>>>>>>>>> 'brancher' class: >>>>>> >>>>>>>>>>>>>>> @Setter >>>>>> >>>>>>>>>>>>>>> class CouponIssuer{ >>>>>> >>>>>>>>>>>>>>> private KStream<....> coffePurchases; >>>>>> >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases; >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){ >>>>>> >>>>>>>>>>>>>>> return >>>>>> >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever >>>>>> >>>>>>>>>>>>>>> /*In the real world the code here can be >>>>>> complex, so >>>>>> >>>>>>>>> creation of >>>>>> >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in >>>>>> order to >>>>>> >>>>>>>> separate >>>>>> >>>>>>>>>>>>> classes' responsibilities.*/ >>>>>> >>>>>>>>>>>>>>> } >>>>>> >>>>>>>>>>>>>>> } >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer(); >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>() >>>>>> >>>>>>>>>>>>>>> .branch(predicate1, >>>>> couponIssuer::setCoffePurchases) >>>>>> >>>>>>>>>>>>>>> .branch(predicate2, >>>>>> >>>>>> couponIssuer::setElectronicsPurchases) >>>>>> >>>>>>>>>>>>>>> .onTopOf(transactionStream); >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up >>>>>> everything >>>>>> >>>>>>>> later, >>>>>> >>>>>>>>>>>>> without the terminal operation!!!*/ >>>>>> >>>>>>>>>>>>>>> couponIssuer.coupons()... >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly >>>>>> initialize the >>>>>> >>>>>>>>> CouponIssuer >>>>>> >>>>>>>>>>>>> we need the terminal operation to be called before >>>>>> >>>>>>>>> streamsBuilder.build() >>>>>> >>>>>>>>>>>>> is called. >>>>>> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is >>>>>> essentially >>>>>> >>>>>> the >>>>>> >>>>>>>>> next >>>>>> >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts >>>>>> based on >>>>>> >>>>> my >>>>>> >>>>>>>>> experience, >>>>>> >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.] >>>>>> >>>>>>>>>>>>>>> Regards, >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> Ivan >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет: >>>>>> >>>>>>>>>>>>>>>> Ivan, >>>>>> >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a >>>>>> fluent >>>>>> >>>>> API >>>>>> >>>>>>>> based >>>>>> >>>>>>>>>>>>> off of >>>>>> >>>>>>>>>>>>>>>> KStream here >>>>>> (https://github.com/apache/kafka/pull/6512), >>>>>> >>>>>> and >>>>>> >>>>>>> I >>>>>> >>>>>>>>> think >>>>>> >>>>>>>>>>>>> I >>>>>> >>>>>>>>>>>>>>>> succeeded at removing both cons. >>>>>> >>>>>>>>>>>>>>>> - Compatibility: I was incorrect earlier about >>>>>> >>>>>>> compatibility >>>>>> >>>>>>>>>>>>> issues, >>>>>> >>>>>>>>>>>>>>>> there aren't any direct ones. I was unaware >>>>>> that Java >>>>>> >>>>> is >>>>>> >>>>>>>> smart >>>>>> >>>>>>>>>>>>> enough to >>>>>> >>>>>>>>>>>>>>>> distinguish between a branch(varargs...) >>>>>> returning one >>>>>> >>>>>>> thing >>>>>> >>>>>>>>> and >>>>>> >>>>>>>>>>>>> branch() >>>>>> >>>>>>>>>>>>>>>> with no arguments returning another thing. >>>>>> >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't actually >>>>>> need >>>>>> >>>>> it. >>>>>> >>>>>>> We >>>>>> >>>>>>>>> can >>>>>> >>>>>>>>>>>>> just >>>>>> >>>>>>>>>>>>>>>> build up the branches in the KBranchedStream who >>>>>> shares >>>>>> >>>>>> its >>>>>> >>>>>>>>> state >>>>>> >>>>>>>>>>>>> with the >>>>>> >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do the >>>>>> branching. >>>>>> >>>>>>> It's >>>>>> >>>>>>>>> not >>>>>> >>>>>>>>>>>>> terribly >>>>>> >>>>>>>>>>>>>>>> pretty in its current form, but I think it >>>>>> demonstrates >>>>>> >>>>>> its >>>>>> >>>>>>>>>>>>> feasibility. >>>>>> >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should >>>>> be >>>>>> >>>>> final >>>>>> >>>>>> or >>>>>> >>>>>>>>> even a >>>>>> >>>>>>>>>>>>>>>> starting point if we go in this direction, I just >>>>>> wanted to >>>>>> >>>>>> see >>>>>> >>>>>>>> how >>>>>> >>>>>>>>>>>>>>>> challenging it would be to get the API working. >>>>>> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing >>>>>> solution >>>>>> >>>>>>> could >>>>>> >>>>>>>> be >>>>>> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally >>>>>> >>>>> suggested >>>>>> >>>>>>>> was a >>>>>> >>>>>>>>>>>>>>>> possibility. The reason is that the newly branched >>>>>> streams >>>>>> >>>>>> are >>>>>> >>>>>>>> not >>>>>> >>>>>>>>>>>>>>>> available in the same scope as each other. That >>>>>> is, if we >>>>>> >>>>>>> wanted >>>>>> >>>>>>>>> to >>>>>> >>>>>>>>>>>>> merge >>>>>> >>>>>>>>>>>>>>>> them back together again I don't see a way to do >>>>>> that. The >>>>>> >>>>>> KIP >>>>>> >>>>>>>>>>>>> proposal >>>>>> >>>>>>>>>>>>>>>> has the same issue, though - all this means is that >>>>> for >>>>>> >>>>>> either >>>>>> >>>>>>>>>>>>> solution, >>>>>> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the >>>>>> table. >>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>> >>>>>>>>>>>>>>>> Paul >>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev < >>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>> >>>>>> >>>>>>>>>>>>> wrote: >>>>>> >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to >>>>> this >>>>>> >>>>>> point. >>>>>> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that >>>>>> branch API >>>>>> >>>>>>> needs >>>>>> >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP. >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it: >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed) >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() >>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..) >>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..) >>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional >>>>>> >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf >>>>>> returns >>>>>> >>>>>> its >>>>>> >>>>>>>>> argument >>>>>> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code >>>>> won't >>>>>> >>>>> make >>>>>> >>>>>>>> sense >>>>>> >>>>>>>>>>>>> until >>>>>> >>>>>>>>>>>>>>>>> all the necessary ingredients are provided. >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher >>>>>> instance >>>>>> >>>>>>>>> contrasts the >>>>>> >>>>>>>>>>>>>>>>> fluency of other KStream methods. >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes) >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> stream >>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...) >>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...) >>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both >>>>>> >>>>>>>>> defaultBranch(..) >>>>>> >>>>>>>>>>>>> and >>>>>> >>>>>>>>>>>>>>>>> noDefault() return void >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface >>>>> is >>>>>> >>>>>> defined. >>>>>> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods >>>>>> >>>>>>>> (defaultBranch(ks->) >>>>>> >>>>>>>>> and >>>>>> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to >>>>>> miss the >>>>>> >>>>>> fact >>>>>> >>>>>>>>> that one >>>>>> >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these >>>>>> methods >>>>>> >>>>>> are >>>>>> >>>>>>>> not >>>>>> >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime. >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do >>>>> better? >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> Ivan >>>>>> >>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: >>>>>> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: >>>>>> >>>>>>>>>>>>>>>>>>> Paul, >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking about >>>>>> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default.. >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be >>>>>> implemented the >>>>>> >>>>>>> easy >>>>>> >>>>>>>>> way. >>>>>> >>>>>>>>>>>>>>>>>>> Maybe we all should think further. >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas. >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that >>>>> assumes >>>>>> >>>>> nothing >>>>>> >>>>>>>> will >>>>>> >>>>>>>>>>>>> reach >>>>>> >>>>>>>>>>>>>>>>>>>> the default branch, >>>>>> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only >>>>> option >>>>>> >>>>>> besides >>>>>> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we >>>>>> want to >>>>>> >>>>>> just >>>>>> >>>>>>>>> silently >>>>>> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any >>>>>> predicate. 2) >>>>>> >>>>>>> Throwing >>>>>> >>>>>>>>> an >>>>>> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing >>>>>> looks >>>>>> >>>>>> like a >>>>>> >>>>>>>> bad >>>>>> >>>>>>>>>>>>> idea. >>>>>> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to >>>>>> emit a >>>>>> >>>>>>>> special >>>>>> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly >>>>> where >>>>>> >>>>>>> `default` >>>>>> >>>>>>>>> can >>>>>> >>>>>>>>>>>>> be >>>>>> >>>>>>>>>>>>>>>>>>> used. >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>>> >>>>> InternalTopologyBuilder >>>>>> >>>>>>> to >>>>>> >>>>>>>>> track >>>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>>> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise >>>>>> a clear >>>>>> >>>>>>> error >>>>>> >>>>>>>>>>>>> before it >>>>>> >>>>>>>>>>>>>>>>>>> becomes an issue. >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is >>>>>> >>>>> compiled >>>>>> >>>>>>> and >>>>>> >>>>>>>>> run? >>>>>> >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't >>>>>> compile if >>>>>> >>>>> used >>>>>> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a >>>>>> method chain >>>>>> >>>>>>>> starting >>>>>> >>>>>>>>>>>>> from >>>>>> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference >>>>>> between >>>>>> >>>>>>>> runtime >>>>>> >>>>>>>>> and >>>>>> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers >>>>>> >>>>> instantly >>>>>> >>>>>> on >>>>>> >>>>>>>>> unit >>>>>> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a >>>>>> compilation >>>>>> >>>>>>>> failure. >>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> Ivan >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: >>>>>> >>>>>>>>>>>>>>>>>>>> Ivan, >>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being >>>>>> required. >>>>>> >>>>>>> But >>>>>> >>>>>>>> is >>>>>> >>>>>>>>>>>>> that >>>>>> >>>>>>>>>>>>>>>>>>>> really >>>>>> >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't want a >>>>>> >>>>>> defaultBranch >>>>>> >>>>>>>>> they >>>>>> >>>>>>>>>>>>> can >>>>>> >>>>>>>>>>>>>>>>>>>> call >>>>>> >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?) >>>>>> just as >>>>>> >>>>>>>>> easily. In >>>>>> >>>>>>>>>>>>>>>>>>>> fact I >>>>>> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API >>>>> - a >>>>>> >>>>> user >>>>>> >>>>>>>> could >>>>>> >>>>>>>>>>>>> specify >>>>>> >>>>>>>>>>>>>>>>> a >>>>>> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach >>>>> the >>>>>> >>>>>> default >>>>>> >>>>>>>>> branch, >>>>>> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. >>>>> That >>>>>> >>>>> seems >>>>>> >>>>>>> like >>>>>> >>>>>>>>> an >>>>>> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API, >>>>>> which allows >>>>>> >>>>>> for >>>>>> >>>>>>>> the >>>>>> >>>>>>>>>>>>> more >>>>>> >>>>>>>>>>>>>>>>>>>> subtle >>>>>> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting >>>>> dropped. >>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has >>>>>> to be >>>>>> >>>>>> well >>>>>> >>>>>>>>>>>>>>>>>>>> documented, but >>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>>> >>>>> InternalTopologyBuilder >>>>>> >>>>>>> to >>>>>> >>>>>>>>> track >>>>>> >>>>>>>>>>>>>>>>>>>> dangling >>>>>> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise >>>>>> a clear >>>>>> >>>>>>> error >>>>>> >>>>>>>>>>>>> before it >>>>>> >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that there is >>>>> a >>>>>> >>>>> "build >>>>>> >>>>>>>> step" >>>>>> >>>>>>>>>>>>> where >>>>>> >>>>>>>>>>>>>>>>> the >>>>>> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when >>>>>> >>>>>> StreamsBuilder.build() >>>>>> >>>>>>> is >>>>>> >>>>>>>>>>>>> called. >>>>>> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I >>>>> agree >>>>>> >>>>> that >>>>>> >>>>>>> it's >>>>>> >>>>>>>>>>>>>>>>>>>> critical to >>>>>> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input >>>>>> stream. >>>>>> >>>>>>> With >>>>>> >>>>>>>>> the >>>>>> >>>>>>>>>>>>>>>>> fluent >>>>>> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all >>>>> other >>>>>> >>>>>>> operations >>>>>> >>>>>>>>> do - >>>>>> >>>>>>>>>>>>> if >>>>>> >>>>>>>>>>>>>>>>> you >>>>>> >>>>>>>>>>>>>>>>>>>> want to process off the original KStream >>>>> multiple >>>>>> >>>>> times, >>>>>> >>>>>>> you >>>>>> >>>>>>>>> just >>>>>> >>>>>>>>>>>>>>>>>>>> need the >>>>>> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many >>>>>> operations >>>>>> >>>>>> on >>>>>> >>>>>>> it >>>>>> >>>>>>>>> as >>>>>> >>>>>>>>>>>>> you >>>>>> >>>>>>>>>>>>>>>>>>>> desire. >>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>> Thoughts? >>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>> >>>>>>>>>>>>>>>>>>>> Paul >>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev < >>>>>> >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru> >>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul, >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not >>>>>> always need >>>>>> >>>>>> the >>>>>> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal >>>>> operation we >>>>>> >>>>> don't >>>>>> >>>>>>>> know >>>>>> >>>>>>>>>>>>> when to >>>>>> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'. >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument, >>>>>> so we >>>>>> >>>>> can >>>>>> >>>>>> do >>>>>> >>>>>>>>>>>>> something >>>>>> >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching. >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of >>>>> special >>>>>> >>>>> object >>>>>> >>>>>>>>>>>>> construction >>>>>> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods. >>>>> But >>>>>> >>>>> here >>>>>> >>>>>> we >>>>>> >>>>>>>>> have a >>>>>> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the >>>>>> flow, >>>>>> >>>>> so >>>>>> >>>>>> I >>>>>> >>>>>>>>> think >>>>>> >>>>>>>>>>>>> this >>>>>> >>>>>>>>>>>>>>>>> is >>>>>> >>>>>>>>>>>>>>>>>>>>> still idiomatic. >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: >>>>>> >>>>>>>>>>>>>>>>>>>>>> Ivan, >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this >>>>>> API, but I >>>>>> >>>>>> find >>>>>> >>>>>>>> the >>>>>> >>>>>>>>>>>>>>>>>>>>>> onTopOff() >>>>>> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it >>>>>> contrasts the >>>>>> >>>>>>> fluency >>>>>> >>>>>>>>> of >>>>>> >>>>>>>>>>>>> other >>>>>> >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like to >>>>>> just call >>>>>> >>>>> a >>>>>> >>>>>>>>> method on >>>>>> >>>>>>>>>>>>> the >>>>>> >>>>>>>>>>>>>>>>>>>>> stream >>>>>> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch >>>>>> cases >>>>>> >>>>> are >>>>>> >>>>>>>>> defined >>>>>> >>>>>>>>>>>>>>>>>>>>>> fluently. >>>>>> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase) >>>>>> is very >>>>>> >>>>>> nice >>>>>> >>>>>>>>> and the >>>>>> >>>>>>>>>>>>>>>>>>>>>> right >>>>>> >>>>>>>>>>>>>>>>>>>>> way >>>>>> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around >>>>>> how we >>>>>> >>>>>>> specify >>>>>> >>>>>>>>> the >>>>>> >>>>>>>>>>>>> source >>>>>> >>>>>>>>>>>>>>>>>>>>>> stream. >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> Like: >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch() >>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>>> this::handle1) >>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>>> this::handle2) >>>>>> >>>>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDefault); >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or >>>>>> >>>>>>>> KStreamBrancher >>>>>> >>>>>>>>> or >>>>>> >>>>>>>>>>>>>>>>>>>>> something, >>>>>> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and >>>>>> terminated by >>>>>> >>>>>>>>>>>>> defaultBranch() >>>>>> >>>>>>>>>>>>>>>>>>>>>> (which >>>>>> >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously >>>>>> incompatible with >>>>>> >>>>> the >>>>>> >>>>>>>>> current >>>>>> >>>>>>>>>>>>>>>>>>>>>> API, so >>>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>>> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a >>>>>> different >>>>>> >>>>>> name, >>>>>> >>>>>>>> but >>>>>> >>>>>>>>> that >>>>>> >>>>>>>>>>>>>>>>>>>>>> seems >>>>>> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it >>>>>> >>>>>> something >>>>>> >>>>>>>> like >>>>>> >>>>>>>>>>>>>>>>>>>>>> branched() >>>>>> >>>>>>>>>>>>>>>>>>>>> or >>>>>> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API. >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your >>>>>> KIP? It >>>>>> >>>>>> seems >>>>>> >>>>>>>>> like it >>>>>> >>>>>>>>>>>>>>>>>>>>>> does to >>>>>> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching >>>>>> while also >>>>>> >>>>>>>> allowing >>>>>> >>>>>>>>> you >>>>>> >>>>>>>>>>>>> to >>>>>> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of >>>>>> KBranchedStreams >>>>>> >>>>>> if >>>>>> >>>>>>>>> desired. >>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>> >>>>>>>>>>>>>>>>>>>>>> Paul >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev >>>>>> >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid> >>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply! >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it: >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> >>>>>> ks){ >>>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...) >>>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String, >>>>>> String> ks){ >>>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey()... >>>>>> >>>>>>>>>>>>>>>>>>>>>>> } >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> ...... >>>>>> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>() >>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, >>>>>> this::handleFirstCase) >>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, >>>>>> this::handleSecondCase) >>>>>> >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....) >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan >>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan, >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the >>>>> KafkaStreamsBrancher >>>>>> >>>>> takes a >>>>>> >>>>>>>>> Consumer >>>>>> >>>>>>>>>>>>> as a >>>>>> >>>>>>>>>>>>>>>>>>>>>>> second >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the >>>>>> example in >>>>>> >>>>>> the >>>>>> >>>>>>>> KIP >>>>>> >>>>>>>>>>>>> shows >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> each >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node >>>>>> >>>>>>>>> (KafkaStreams#to() >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in this >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> case). >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would >>>>> we >>>>>> >>>>> handle >>>>>> >>>>>>> the >>>>>> >>>>>>>>> case >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> where the >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to >>>>> continue >>>>>> >>>>>>>> processing >>>>>> >>>>>>>>> and >>>>>> >>>>>>>>>>>>> not >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the >>>>> branched >>>>>> >>>>>> stream >>>>>> >>>>>>>>>>>>> immediately? >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if >>>>>> we had >>>>>> >>>>>>>> something >>>>>> >>>>>>>>> like >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this: >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches = >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> predicate2); >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks! >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck >>>>> < >>>>>> >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All, >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for >>>>> KIP- >>>>>> >>>>> 418. >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message: >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about >>>>> KIP-418. >>>>>> >>>>> Please >>>>>> >>>>>>>> take >>>>>> >>>>>>>>> a >>>>>> >>>>>>>>>>>>> look >>>>>> >>>>>>>>>>>>>>>>> at >>>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any >>>>>> feedback :) >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418: >>>>>> >>>>> >>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: >>>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164: >>>>>> >>>>> https://github.com/apache/kafka/pull/6164 >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> >>>>>>>>> >>>>>> > >>>>>> >>>>> >> >
signature.asc
Description: OpenPGP digital signature