Hi Ivan,

It looks like you missed my reply on Apr 23rd. I think it’s close, but I had a 
few last comments. 

Thanks,
John

On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote:
> Hello everyone,
> 
> will someone please take a look at the reworked KIP?
> 
> I believe that now it follows design principles and takes into account 
> all the arguments discussed here.
> 
> 
> Regards,
> 
> Ivan
> 
> 
> 23.04.2020 2:45, Ivan Ponomarev пишет:
> > Hi,
> > 
> > I have read the John's "DSL design principles" and have completely 
> > rewritten the KIP, see 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >  
> > 
> > 
> > 
> > This version includes all the previous discussion results and follows 
> > the design principles, with one exception.
> > 
> > The exception is
> > 
> > branch(Predicate<K,V> predicate, Branched<K,V> branched)
> > 
> > which formally violates 'no more than one parameter' rule, but I think 
> > here it is justified.
> > 
> > We must provide a predicate for a branch and don't need to provide one 
> > for the default branch. Thus for both operations we may use a single 
> > Branched parameter class, with an extra method parameter for `branch`.
> > 
> > Since predicate is a natural, necessary part of a branch, no 
> > 'proliferation of overloads, deprecations, etc.' is expected here as it 
> > is said in the rationale for the 'single parameter rule'.
> > 
> > WDYT, is this KIP mature enough to begin voting?
> > 
> > Regards,
> > 
> > Ivan
> > 
> > 21.04.2020 2:09, Matthias J. Sax пишет:
> >> Ivan,
> >>
> >> no worries about getting side tracked. Glad to have you back!
> >>
> >> The DSL improved further in the meantime and we already have a `Named`
> >> config object to name operators. It seems reasonable to me to build on 
> >> this.
> >>
> >> Furthermore, John did a writeup about "DSL design principles" that we
> >> want to follow:
> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> >>  
> >>
> >> -- might be worth to checkout.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
> >>> Hi everyone!
> >>>
> >>> Let me revive the discussion of this KIP.
> >>>
> >>> I'm very sorry for stopping my participation in the discussion in June
> >>> 2019. My project work was very intensive then and it didn't leave me
> >>> spare time. But I think I must finish this, because we invested
> >>> substantial effort into this discussion and I'm not feel entitled to
> >>> propose other things before this one is finalized.
> >>>
> >>> During these months I proceeded with writing and reviewing Kafka
> >>> Streams-related code. Every time I needed branching, Spring-Kafka's
> >>> KafkaStreamBrancher class of my invention (the original idea for this
> >>> KIP) worked for me -- that's another reason why I gave up pushing the
> >>> KIP forward. When I was coming across the problem with the scope of
> >>> branches, I worked around it this way:
> >>>
> >>> AtomicReference<KStream<...>> result = new AtomicReference<>();
> >>> new KafkaStreamBrancher<....>()
> >>>      .branch(....)
> >>>      .defaultBranch(result::set)
> >>>      .onTopOf(someStream);
> >>> result.get()...
> >>>
> >>>
> >>> And yes, of course I don't feel very happy with this approach.
> >>>
> >>> I think that Matthias came up with a bright solution in his post from
> >>> May, 24th 2019. Let me quote it:
> >>>
> >>> KStream#split() -> KBranchedStream
> >>> // branch is not easily accessible in current scope
> >>> KBranchedStream#branch(Predicate, Consumer<KStream>)
> >>>    -> KBranchedStream
> >>> // assign a name to the branch and
> >>> // return the sub-stream to the current scope later
> >>> //
> >>> // can be simple as `#branch(p, s->s, "name")`
> >>> // or also complex as `#branch(p, s->s.filter(...), "name")`
> >>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
> >>>    -> KBranchedStream
> >>> // default branch is not easily accessible
> >>> // return map of all named sub-stream into current scope
> >>> KBranchedStream#default(Cosumer<KStream>)
> >>>    -> Map<String,KStream>
> >>> // assign custom name to default-branch
> >>> // return map of all named sub-stream into current scope
> >>> KBranchedStream#default(Function<KStream,KStream>, String)
> >>>    -> Map<String,KStream>
> >>> // assign a default name for default
> >>> // return map of all named sub-stream into current scope
> >>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
> >>>    -> Map<String,KStream>
> >>> // return map of all names sub-stream into current scope
> >>> KBranchedStream#noDefaultBranch()
> >>>    -> Map<String,KStream>
> >>>
> >>> I believe this would satisfy everyone. Optional names seems to be a good
> >>> idea: when you don't need to have the branches in the same scope, you
> >>> just don't use names and you don't risk making your code brittle. Or,
> >>> you might want to add names just for debugging purposes. Or, finally,
> >>> you might use the returned Map to have the named branches in the
> >>> original scope.
> >>>
> >>> There also was an input from John Roesler on June 4th, 2019, who
> >>> suggested using Named class. I can't comment on this. The idea seems
> >>> reasonable, but in this matter I'd rather trust people who are more
> >>> familiar with Streams API design principles than me.
> >>>
> >>> Regards,
> >>>
> >>> Ivan
> >>>
> >>>
> >>>
> >>> 08.10.2019 1:38, Matthias J. Sax пишет:
> >>>> I am moving this KIP into "inactive status". Feel free to resume the 
> >>>> KIP
> >>>> at any point.
> >>>>
> >>>> If anybody else is interested in picking up this KIP, feel free to 
> >>>> do so.
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 7/11/19 4:00 PM, Matthias J. Sax wrote:
> >>>>> Ivan,
> >>>>>
> >>>>> did you see my last reply? What do you think about my proposal to mix
> >>>>> both approaches and try to get best-of-both worlds?
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
> >>>>>> Thanks for the input John!
> >>>>>>
> >>>>>>> under your suggestion, it seems that the name is required
> >>>>>>
> >>>>>> If you want to get the `KStream` as part of the `Map` back using a
> >>>>>> `Function`, yes. If you follow the "embedded chaining" pattern 
> >>>>>> using a
> >>>>>> `Consumer`, no.
> >>>>>>
> >>>>>> Allowing for a default name via `split()` can of course be done.
> >>>>>> Similarly, using `Named` instead of `String` is possible.
> >>>>>>
> >>>>>> I wanted to sketch out a high level proposal to merge both patterns
> >>>>>> only. Your suggestions to align the new API with the existing API 
> >>>>>> make
> >>>>>> totally sense.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> One follow up question: Would `Named` be optional or required in
> >>>>>> `split()` and `branch()`? It's unclear from your example.
> >>>>>>
> >>>>>> If both are mandatory, what do we gain by it? The returned `Map` only
> >>>>>> contains the corresponding branches, so why should we prefix all of
> >>>>>> them? If only `Named` is mandatory in `branch()`, but optional in
> >>>>>> `split()`, the same question raises?
> >>>>>>
> >>>>>> Requiring `Named` in `split()` seems only to make sense, if 
> >>>>>> `Named` is
> >>>>>> optional in `branch()` and we generate `-X` suffix using a counter 
> >>>>>> for
> >>>>>> different branch name. However, this might lead to the problem of
> >>>>>> changing names if branches are added/removed. Also, how would the 
> >>>>>> names
> >>>>>> be generated if `Consumer` is mixed in (ie, not all branches are
> >>>>>> returned in the `Map`).
> >>>>>>
> >>>>>> If `Named` is optional for both, it could happen that a user 
> >>>>>> misses to
> >>>>>> specify a name for a branch what would lead to runtime issues.
> >>>>>>
> >>>>>>
> >>>>>> Hence, I am actually in favor to not allow a default name but keep
> >>>>>> `split()` without parameter and make `Named` in `branch()` required
> >>>>>> if a
> >>>>>> `Function` is used. This makes it explicit to the user that
> >>>>>> specifying a
> >>>>>> name is required if a `Function` is used.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> About
> >>>>>>
> >>>>>>> KBranchedStream#branch(BranchConfig)
> >>>>>>
> >>>>>> I don't think that the branching predicate is a configuration and 
> >>>>>> hence
> >>>>>> would not include it in a configuration object.
> >>>>>>
> >>>>>>>       withChain(...);
> >>>>>>
> >>>>>> Similar, `withChain()` (that would only take a `Consumer`?) does not
> >>>>>> seem to be a configuration. We can also not prevent a user to call
> >>>>>> `withName()` in combination of `withChain()` what does not make sense
> >>>>>> IMHO. We could of course throw an RTE but not have a compile time 
> >>>>>> check
> >>>>>> seems less appealing. Also, it could happen that neither 
> >>>>>> `withChain()`
> >>>>>> not `withName()` is called and the branch is missing in the returned
> >>>>>> `Map` what lead to runtime issues, too.
> >>>>>>
> >>>>>> Hence, I don't think that we should add `BranchConfig`. A config 
> >>>>>> object
> >>>>>> is helpful if each configuration can be set independently of all
> >>>>>> others,
> >>>>>> but this seems not to be the case here. If we add new configuration
> >>>>>> later, we can also just move forward by deprecating the methods that
> >>>>>> accept `Named` and add new methods that accepted `BranchConfig` (that
> >>>>>> would of course implement `Named`).
> >>>>>>
> >>>>>>
> >>>>>> Thoughts?
> >>>>>>
> >>>>>>
> >>>>>> @Ivan, what do you think about the general idea to blend the two main
> >>>>>> approaches of returning a `Map` plus an "embedded chaining"?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 6/4/19 10:33 AM, John Roesler wrote:
> >>>>>>> Thanks for the idea, Matthias, it does seem like this would satisfy
> >>>>>>> everyone. Returning the map from the terminal operations also solves
> >>>>>>> the problem of merging/joining the branched streams, if we want 
> >>>>>>> to add
> >>>>>>> support for the compliment later on.
> >>>>>>>
> >>>>>>> Under your suggestion, it seems that the name is required. 
> >>>>>>> Otherwise,
> >>>>>>> we wouldn't have keys for the map to return. I this this is actually
> >>>>>>> not too bad, since experience has taught us that, although names for
> >>>>>>> operations are not required to define stream processing logic, it 
> >>>>>>> does
> >>>>>>> significantly improve the operational experience when you can map 
> >>>>>>> the
> >>>>>>> topology, logs, metrics, etc. back to the source code. Since you
> >>>>>>> wouldn't (have to) reference the name to chain extra processing onto
> >>>>>>> the branch (thanks to the second argument), you can avoid the
> >>>>>>> "unchecked name" problem that Ivan pointed out.
> >>>>>>>
> >>>>>>> In the current implementation of Branch, you can name the branch
> >>>>>>> operator itself, and then all the branches get index-suffixed names
> >>>>>>> built from the branch operator name. I guess under this proposal, we
> >>>>>>> could naturally append the branch name to the branching operator 
> >>>>>>> name,
> >>>>>>> like this:
> >>>>>>>
> >>>>>>>      stream.split(Named.withName("mysplit")) //creates node 
> >>>>>>> "mysplit"
> >>>>>>>                 .branch(..., ..., "abranch") // creates node
> >>>>>>> "mysplit-abranch"
> >>>>>>>                 .defaultBranch(...) // creates node 
> >>>>>>> "mysplit-default"
> >>>>>>>
> >>>>>>> It does make me wonder about the DSL syntax itself, though.
> >>>>>>>
> >>>>>>> We don't have a defined grammar, so there's plenty of room to debate
> >>>>>>> the "best" syntax in the context of each operation, but in general,
> >>>>>>> the KStream DSL operators follow this pattern:
> >>>>>>>
> >>>>>>>       operator(function, config_object?) OR operator(config_object)
> >>>>>>>
> >>>>>>> where config_object is often just Named in the "function" variant.
> >>>>>>> Even when the config_object isn't a Named, but some other config
> >>>>>>> class, that config class _always_ implements NamedOperation.
> >>>>>>>
> >>>>>>> Here, we're introducing a totally different pattern:
> >>>>>>>
> >>>>>>>     operator(function, function, string)
> >>>>>>>
> >>>>>>> where the string is the name.
> >>>>>>> My first question is whether the name should instead be specified 
> >>>>>>> with
> >>>>>>> the NamedOperation interface.
> >>>>>>>
> >>>>>>> My second question is whether we should just roll all these 
> >>>>>>> arguments
> >>>>>>> up into a config object like:
> >>>>>>>
> >>>>>>>      KBranchedStream#branch(BranchConfig)
> >>>>>>>
> >>>>>>>      interface BranchConfig extends NamedOperation {
> >>>>>>>       withPredicate(...);
> >>>>>>>       withChain(...);
> >>>>>>>       withName(...);
> >>>>>>>     }
> >>>>>>>
> >>>>>>> Although I guess we'd like to call BranchConfig something more like
> >>>>>>> "Branched", even if I don't particularly like that pattern.
> >>>>>>>
> >>>>>>> This makes the source code a little noisier, but it also makes us 
> >>>>>>> more
> >>>>>>> future-proof, as we can deal with a wide range of alternatives 
> >>>>>>> purely
> >>>>>>> in the config interface, and never have to deal with adding 
> >>>>>>> overloads
> >>>>>>> to the KBranchedStream if/when we decide we want the name to be
> >>>>>>> optional, or the KStream->KStream to be optional.
> >>>>>>>
> >>>>>>> WDYT?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> -John
> >>>>>>>
> >>>>>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
> >>>>>>> <michael.droga...@confluent.io> wrote:
> >>>>>>>>
> >>>>>>>> Matthias: I think that's pretty reasonable from my point of view.
> >>>>>>>> Good
> >>>>>>>> suggestion.
> >>>>>>>>
> >>>>>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax
> >>>>>>>> <matth...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Interesting discussion.
> >>>>>>>>>
> >>>>>>>>> I am wondering, if we cannot unify the advantage of both 
> >>>>>>>>> approaches:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> KStream#split() -> KBranchedStream
> >>>>>>>>>
> >>>>>>>>> // branch is not easily accessible in current scope
> >>>>>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
> >>>>>>>>>     -> KBranchedStream
> >>>>>>>>>
> >>>>>>>>> // assign a name to the branch and
> >>>>>>>>> // return the sub-stream to the current scope later
> >>>>>>>>> //
> >>>>>>>>> // can be simple as `#branch(p, s->s, "name")`
> >>>>>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
> >>>>>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, 
> >>>>>>>>> String)
> >>>>>>>>>     -> KBranchedStream
> >>>>>>>>>
> >>>>>>>>> // default branch is not easily accessible
> >>>>>>>>> // return map of all named sub-stream into current scope
> >>>>>>>>> KBranchedStream#default(Cosumer<KStream>)
> >>>>>>>>>     -> Map<String,KStream>
> >>>>>>>>>
> >>>>>>>>> // assign custom name to default-branch
> >>>>>>>>> // return map of all named sub-stream into current scope
> >>>>>>>>> KBranchedStream#default(Function<KStream,KStream>, String)
> >>>>>>>>>     -> Map<String,KStream>
> >>>>>>>>>
> >>>>>>>>> // assign a default name for default
> >>>>>>>>> // return map of all named sub-stream into current scope
> >>>>>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
> >>>>>>>>>     -> Map<String,KStream>
> >>>>>>>>>
> >>>>>>>>> // return map of all names sub-stream into current scope
> >>>>>>>>> KBranchedStream#noDefaultBranch()
> >>>>>>>>>     -> Map<String,KStream>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hence, for each sub-stream, the user can pick to add a name and
> >>>>>>>>> return
> >>>>>>>>> the branch "result" to the calling scope or not. The
> >>>>>>>>> implementation can
> >>>>>>>>> also check at runtime that all returned names are unique. The
> >>>>>>>>> returned
> >>>>>>>>> Map can be empty and it's also optional to use the Map.
> >>>>>>>>>
> >>>>>>>>> To me, it seems like a good way to get best of both worlds.
> >>>>>>>>>
> >>>>>>>>> Thoughts?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 5/6/19 5:15 PM, John Roesler wrote:
> >>>>>>>>>> Ivan,
> >>>>>>>>>>
> >>>>>>>>>> That's a very good point about the "start" operator in the
> >>>>>>>>>> dynamic case.
> >>>>>>>>>> I had no problem with "split()"; I was just questioning the
> >>>>>>>>>> necessity.
> >>>>>>>>>> Since you've provided a proof of necessity, I'm in favor of the
> >>>>>>>>>> "split()" start operator. Thanks!
> >>>>>>>>>>
> >>>>>>>>>> Separately, I'm interested to see where the present discussion
> >>>>>>>>>> leads.
> >>>>>>>>>> I've written enough Javascript code in my life to be 
> >>>>>>>>>> suspicious of
> >>>>>>>>>> nested closures. You have a good point about using method
> >>>>>>>>>> references (or
> >>>>>>>>>> indeed function literals also work). It should be validating
> >>>>>>>>>> that this
> >>>>>>>>>> was also the JS community's first approach to flattening the
> >>>>>>>>>> logic when
> >>>>>>>>>> their nested closure situation got out of hand. Unfortunately, 
> >>>>>>>>>> it's
> >>>>>>>>>> replacing nesting with redirection, both of which disrupt code
> >>>>>>>>>> readability (but in different ways for different reasons). In 
> >>>>>>>>>> other
> >>>>>>>>>> words, I agree that function references is *the* first-order
> >>>>>>>>>> solution if
> >>>>>>>>>> the nested code does indeed become a problem.
> >>>>>>>>>>
> >>>>>>>>>> However, the history of JS also tells us that function
> >>>>>>>>>> references aren't
> >>>>>>>>>> the end of the story either, and you can see that by observing 
> >>>>>>>>>> that
> >>>>>>>>>> there have been two follow-on eras, as they continue trying to
> >>>>>>>>>> cope with
> >>>>>>>>>> the consequences of living in such a callback-heavy language.
> >>>>>>>>>> First, you
> >>>>>>>>>> have Futures/Promises, which essentially let you convert nested
> >>>>>>>>>> code to
> >>>>>>>>>> method-chained code (Observables/FP is a popular variation on
> >>>>>>>>>> this).
> >>>>>>>>>> Most lately, you have async/await, which is an effort to apply
> >>>>>>>>>> language
> >>>>>>>>>> (not just API) syntax to the problem, and offer the "flattest"
> >>>>>>>>>> possible
> >>>>>>>>>> programming style to solve the problem (because you get back to
> >>>>>>>>>> just one
> >>>>>>>>>> code block per functional unit).
> >>>>>>>>>>
> >>>>>>>>>> Stream-processing is a different domain, and Java+KStreams is
> >>>>>>>>>> nowhere
> >>>>>>>>>> near as callback heavy as JS, so I don't think we have to take
> >>>>>>>>>> the JS
> >>>>>>>>>> story for granted, but then again, I think we can derive some
> >>>>>>>>>> valuable
> >>>>>>>>>> lessons by looking sideways to adjacent domains. I'm just
> >>>>>>>>>> bringing this
> >>>>>>>>>> up to inspire further/deeper discussion. At the same time, just
> >>>>>>>>>> like JS,
> >>>>>>>>>> we can afford to take an iterative approach to the problem.
> >>>>>>>>>>
> >>>>>>>>>> Separately again, I'm interested in the post-branch merge (and
> >>>>>>>>>> I'd also
> >>>>>>>>>> add join) problem that Paul brought up. We can clearly punt on
> >>>>>>>>>> it, by
> >>>>>>>>>> terminating the nested branches with sink operators. But is
> >>>>>>>>>> there a DSL
> >>>>>>>>>> way to do it?
> >>>>>>>>>>
> >>>>>>>>>> Thanks again for your driving this,
> >>>>>>>>>> -John
> >>>>>>>>>>
> >>>>>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com
> >>>>>>>>>> <mailto:pgwha...@gmail.com>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>       Ivan, I’ll definitely forfeit my point on the clumsiness of
> >>>>>>>>>> the
> >>>>>>>>>>       branch(predicate, consumer) solution, I don’t see any real
> >>>>>>>>>> drawbacks
> >>>>>>>>>>       for the dynamic case.
> >>>>>>>>>>
> >>>>>>>>>>       IMO the one trade off to consider at this point is the 
> >>>>>>>>>> scope
> >>>>>>>>>>       question. I don’t know if I totally agree that “we rarely
> >>>>>>>>>> need them
> >>>>>>>>>>       in the same scope” since merging the branches back together
> >>>>>>>>>> later
> >>>>>>>>>>       seems like a perfectly plausible use case that can be a lot
> >>>>>>>>>> nicer
> >>>>>>>>>>       when the branched streams are in the same scope. That being
> >>>>>>>>>> said,
> >>>>>>>>>>       for the reasons Ivan listed, I think it is overall the 
> >>>>>>>>>> better
> >>>>>>>>>>       solution - working around the scope thing is easy enough if
> >>>>>>>>>> you need
> >>>>>>>>>>       to.
> >>>>>>>>>>
> >>>>>>>>>>       > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
> >>>>>>>>>>       <iponoma...@mail.ru.invalid> wrote:
> >>>>>>>>>>       >
> >>>>>>>>>>       > Hello everyone, thank you all for joining the discussion!
> >>>>>>>>>>       >
> >>>>>>>>>>       > Well, I don't think the idea of named branches, be it a
> >>>>>>>>>>       LinkedHashMap (no other Map will do, because order of
> >>>>>>>>>> definition
> >>>>>>>>>>       matters) or `branch` method  taking name and Consumer 
> >>>>>>>>>> has more
> >>>>>>>>>>       advantages than drawbacks.
> >>>>>>>>>>       >
> >>>>>>>>>>       > In my opinion, the only real positive outcome from 
> >>>>>>>>>> Michael's
> >>>>>>>>>>       proposal is that all the returned branches are in the same
> >>>>>>>>>> scope.
> >>>>>>>>>>       But 1) we rarely need them in the same scope 2) there is a
> >>>>>>>>>>       workaround for the scope problem, described in the KIP.
> >>>>>>>>>>       >
> >>>>>>>>>>       > 'Inlining the complex logic' is not a problem, because we
> >>>>>>>>>> can use
> >>>>>>>>>>       method references instead of lambdas. In real world
> >>>>>>>>>> scenarios you
> >>>>>>>>>>       tend to split the complex logic to methods anyway, so the
> >>>>>>>>>> code is
> >>>>>>>>>>       going to be clean.
> >>>>>>>>>>       >
> >>>>>>>>>>       > The drawbacks are strong. The cohesion between predicates
> >>>>>>>>>> and
> >>>>>>>>>>       handlers is lost. We have to define predicates in one
> >>>>>>>>>> place, and
> >>>>>>>>>>       handlers in another. This opens the door for bugs:
> >>>>>>>>>>       >
> >>>>>>>>>>       > - what if we forget to define a handler for a name? or a
> >>>>>>>>>> name for
> >>>>>>>>>>       a handler?
> >>>>>>>>>>       > - what if we misspell a name?
> >>>>>>>>>>       > - what if we copy-paste and duplicate a name?
> >>>>>>>>>>       >
> >>>>>>>>>>       > What Michael propose would have been totally OK if we had
> >>>>>>>>>> been
> >>>>>>>>>>       writing the API in Lua, Ruby or Python. In those 
> >>>>>>>>>> languages the
> >>>>>>>>>>       "dynamic naming" approach would have looked most concise 
> >>>>>>>>>> and
> >>>>>>>>>>       beautiful. But in Java we expect all the problems 
> >>>>>>>>>> related to
> >>>>>>>>>>       identifiers to be eliminated in compile time.
> >>>>>>>>>>       >
> >>>>>>>>>>       > Do we have to invent duck-typing for the Java API?
> >>>>>>>>>>       >
> >>>>>>>>>>       > And if we do, what advantage are we supposed to get
> >>>>>>>>>> besides having
> >>>>>>>>>>       all the branches in the same scope? Michael, maybe I'm
> >>>>>>>>>> missing your
> >>>>>>>>>>       point?
> >>>>>>>>>>       >
> >>>>>>>>>>       > ---
> >>>>>>>>>>       >
> >>>>>>>>>>       > Earlier in this discussion John Roesler also proposed 
> >>>>>>>>>> to do
> >>>>>>>>>>       without "start branching" operator, and later Paul
> >>>>>>>>>> mentioned that in
> >>>>>>>>>>       the case when we have to add a dynamic number of 
> >>>>>>>>>> branches, the
> >>>>>>>>>>       current KIP is 'clumsier' compared to Michael's 'Map'
> >>>>>>>>>> solution. Let
> >>>>>>>>>>       me address both comments here.
> >>>>>>>>>>       >
> >>>>>>>>>>       > 1) "Start branching" operator (I think that *split* is a
> >>>>>>>>>> good name
> >>>>>>>>>>       for it indeed) is critical when we need to do a dynamic
> >>>>>>>>>> branching,
> >>>>>>>>>>       see example below.
> >>>>>>>>>>       >
> >>>>>>>>>>       > 2) No, dynamic branching in current KIP is not clumsy at
> >>>>>>>>>> all.
> >>>>>>>>>>       Imagine a real-world scenario when you need one branch per
> >>>>>>>>>> enum
> >>>>>>>>>>       value (say, RecordType). You can have something like this:
> >>>>>>>>>>       >
> >>>>>>>>>>       > /*John:if we had to start with stream.branch(...) here,
> >>>>>>>>>> it would
> >>>>>>>>>>       have been much messier.*/
> >>>>>>>>>>       > KBranchedStream branched = stream.split();
> >>>>>>>>>>       >
> >>>>>>>>>>       > /*Not clumsy at all :-)*/
> >>>>>>>>>>       > for (RecordType recordType : RecordType.values())
> >>>>>>>>>>       >             branched = branched.branch((k, v) ->
> >>>>>>>>>> v.getRecType() ==
> >>>>>>>>>>       recordType,
> >>>>>>>>>>       >                     recordType::processRecords);
> >>>>>>>>>>       >
> >>>>>>>>>>       > Regards,
> >>>>>>>>>>       >
> >>>>>>>>>>       > Ivan
> >>>>>>>>>>       >
> >>>>>>>>>>       >
> >>>>>>>>>>       > 02.05.2019 14:40, Matthias J. Sax пишет:
> >>>>>>>>>>       >> I also agree with Michael's observation about the core
> >>>>>>>>>> problem of
> >>>>>>>>>>       >> current `branch()` implementation.
> >>>>>>>>>>       >>
> >>>>>>>>>>       >> However, I also don't like to pass in a clumsy Map
> >>>>>>>>>> object. My
> >>>>>>>>>>       thinking
> >>>>>>>>>>       >> was more aligned with Paul's proposal to just add a name
> >>>>>>>>>> to each
> >>>>>>>>>>       >> `branch()` statement and return a `Map<String,KStream>`.
> >>>>>>>>>>       >>
> >>>>>>>>>>       >> It makes the code easier to read, and also make the
> >>>>>>>>>> order of
> >>>>>>>>>>       >> `Predicates` (that is essential) easier to grasp.
> >>>>>>>>>>       >>
> >>>>>>>>>>       >>>>>> Map<String, KStream<K, V>> branches = stream.split()
> >>>>>>>>>>       >>>>>>    .branch("branchOne", Predicate<K, V>)
> >>>>>>>>>>       >>>>>>    .branch( "branchTwo", Predicate<K, V>)
> >>>>>>>>>>       >>>>>>    .defaultBranch("defaultBranch");
> >>>>>>>>>>       >> An open question is the case for which no
> >>>>>>>>>> defaultBranch() should
> >>>>>>>>> be
> >>>>>>>>>>       >> specified. Atm, `split()` and `branch()` would return
> >>>>>>>>>>       `BranchedKStream`
> >>>>>>>>>>       >> and the call to `defaultBranch()` that returns the 
> >>>>>>>>>> `Map` is
> >>>>>>>>> mandatory
> >>>>>>>>>>       >> (what is not the case atm). Or is this actually not a 
> >>>>>>>>>> real
> >>>>>>>>> problem,
> >>>>>>>>>>       >> because users can just ignore the branch returned by
> >>>>>>>>>>       `defaultBranch()`
> >>>>>>>>>>       >> in the result `Map` ?
> >>>>>>>>>>       >>
> >>>>>>>>>>       >>
> >>>>>>>>>>       >> About "inlining": So far, it seems to be a matter of
> >>>>>>>>>> personal
> >>>>>>>>>>       >> preference. I can see arguments for both, but no "killer
> >>>>>>>>>>       argument" yet
> >>>>>>>>>>       >> that clearly make the case for one or the other.
> >>>>>>>>>>       >>
> >>>>>>>>>>       >>
> >>>>>>>>>>       >> -Matthias
> >>>>>>>>>>       >>
> >>>>>>>>>>       >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
> >>>>>>>>>>       >>> Perhaps inlining is the wrong terminology. It doesn’t
> >>>>>>>>>> require
> >>>>>>>>>>       that a lambda with the full downstream topology be defined
> >>>>>>>>>> inline -
> >>>>>>>>>>       it can be a method reference as with Ivan’s original
> >>>>>>>>>> suggestion.
> >>>>>>>>>>       The advantage of putting the predicate and its downstream
> >>>>>>>>>> logic
> >>>>>>>>>>       (Consumer) together in branch() is that they are required
> >>>>>>>>>> to be near
> >>>>>>>>>>       to each other.
> >>>>>>>>>>       >>>
> >>>>>>>>>>       >>> Ultimately the downstream code has to live somewhere,
> >>>>>>>>>> and deep
> >>>>>>>>>>       branch trees will be hard to read regardless.
> >>>>>>>>>>       >>>
> >>>>>>>>>>       >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
> >>>>>>>>>>       <michael.droga...@confluent.io
> >>>>>>>>>>       <mailto:michael.droga...@confluent.io>> wrote:
> >>>>>>>>>>       >>>>
> >>>>>>>>>>       >>>> I'm less enthusiastic about inlining the branch logic
> >>>>>>>>>> with its
> >>>>>>>>>>       downstream
> >>>>>>>>>>       >>>> functionality. Programs that have deep branch trees 
> >>>>>>>>>> will
> >>>>>>>>>>       quickly become
> >>>>>>>>>>       >>>> harder to read as a single unit.
> >>>>>>>>>>       >>>>
> >>>>>>>>>>       >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
> >>>>>>>>>>       <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote:
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Also +1 on the issues/goals as Michael outlined them,
> >>>>>>>>>> I think
> >>>>>>>>>>       that sets a
> >>>>>>>>>>       >>>>> great framework for the discussion.
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Regarding the SortedMap solution, my understanding is
> >>>>>>>>>> that the
> >>>>>>>>>>       current
> >>>>>>>>>>       >>>>> proposal in the KIP is what is in my PR which
> >>>>>>>>>> (pending naming
> >>>>>>>>>>       decisions) is
> >>>>>>>>>>       >>>>> roughly this:
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> stream.split()
> >>>>>>>>>>       >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >>>>>>>>>>       >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >>>>>>>>>>       >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Obviously some ordering is necessary, since branching
> >>>>>>>>>> as a
> >>>>>>>>>>       construct
> >>>>>>>>>>       >>>>> doesn't work without it, but this solution seems 
> >>>>>>>>>> like it
> >>>>>>>>>>       provides as much
> >>>>>>>>>>       >>>>> associativity as the SortedMap solution, because each
> >>>>>>>>>> branch()
> >>>>>>>>>>       call
> >>>>>>>>>>       >>>>> directly associates the "conditional" with the "code
> >>>>>>>>>> block."
> >>>>>>>>>>       The value it
> >>>>>>>>>>       >>>>> provides over the KIP solution is the accessing of
> >>>>>>>>>> streams in
> >>>>>>>>>>       the same
> >>>>>>>>>>       >>>>> scope.
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> The KIP solution is less "dynamic" than the SortedMap
> >>>>>>>>>> solution
> >>>>>>>>>>       in the sense
> >>>>>>>>>>       >>>>> that it is slightly clumsier to add a dynamic 
> >>>>>>>>>> number of
> >>>>>>>>>>       branches, but it is
> >>>>>>>>>>       >>>>> certainly possible.  It seems to me like the API
> >>>>>>>>>> should favor
> >>>>>>>>>>       the "static"
> >>>>>>>>>>       >>>>> case anyway, and should make it simple and 
> >>>>>>>>>> readable to
> >>>>>>>>>>       fluently declare and
> >>>>>>>>>>       >>>>> access your branches in-line.  It also makes it
> >>>>>>>>>> impossible to
> >>>>>>>>>>       ignore a
> >>>>>>>>>>       >>>>> branch, and it is possible to build an (almost)
> >>>>>>>>>> identical
> >>>>>>>>>>       SortedMap
> >>>>>>>>>>       >>>>> solution on top of it.
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> I could also see a middle ground where instead of 
> >>>>>>>>>> a raw
> >>>>>>>>>>       SortedMap being
> >>>>>>>>>>       >>>>> taken in, branch() takes a name and not a Consumer.
> >>>>>>>>>> Something
> >>>>>>>>>>       like this:
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Map<String, KStream<K, V>> branches = stream.split()
> >>>>>>>>>>       >>>>>    .branch("branchOne", Predicate<K, V>)
> >>>>>>>>>>       >>>>>    .branch( "branchTwo", Predicate<K, V>)
> >>>>>>>>>>       >>>>>    .defaultBranch("defaultBranch",
> >>>>>>>>>> Consumer<KStream<K, V>>);
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Pros for that solution:
> >>>>>>>>>>       >>>>> - accessing branched KStreams in same scope
> >>>>>>>>>>       >>>>> - no double brace initialization, hopefully slightly
> >>>>>>>>>> more
> >>>>>>>>>>       readable than
> >>>>>>>>>>       >>>>> SortedMap
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Cons
> >>>>>>>>>>       >>>>> - downstream branch logic cannot be specified inline
> >>>>>>>>>> which
> >>>>>>>>>>       makes it harder
> >>>>>>>>>>       >>>>> to read top to bottom (like existing API and
> >>>>>>>>>> SortedMap, but
> >>>>>>>>>>       unlike the KIP)
> >>>>>>>>>>       >>>>> - you can forget to "handle" one of the branched
> >>>>>>>>>> streams (like
> >>>>>>>>>>       existing
> >>>>>>>>>>       >>>>> API and SortedMap, but unlike the KIP)
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> (KBranchedStreams could even work *both* ways but
> >>>>>>>>>> perhaps
> >>>>>>>>>>       that's overdoing
> >>>>>>>>>>       >>>>> it).
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Overall I'm curious how important it is to be able to
> >>>>>>>>>> easily
> >>>>>>>>>>       access the
> >>>>>>>>>>       >>>>> branched KStream in the same scope as the original.
> >>>>>>>>>> It's
> >>>>>>>>>>       possible that it
> >>>>>>>>>>       >>>>> doesn't need to be handled directly by the API, but
> >>>>>>>>>> instead
> >>>>>>>>>>       left up to the
> >>>>>>>>>>       >>>>> user.  I'm sort of in the middle on it.
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> Paul
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
> >>>>>>>>>>       <sop...@confluent.io <mailto:sop...@confluent.io>>
> >>>>>>>>>>       >>>>> wrote:
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>       >>>>>> I'd like to +1 what Michael said about the issues
> >>>>>>>>>> with the
> >>>>>>>>>>       existing
> >>>>>>>>>>       >>>>> branch
> >>>>>>>>>>       >>>>>> method, I agree with what he's outlined and I think
> >>>>>>>>>> we should
> >>>>>>>>>>       proceed by
> >>>>>>>>>>       >>>>>> trying to alleviate these problems. Specifically it
> >>>>>>>>>> seems
> >>>>>>>>>>       important to be
> >>>>>>>>>>       >>>>>> able to cleanly access the individual branches (eg
> >>>>>>>>>> by mapping
> >>>>>>>>>>       >>>>>> name->stream), which I thought was the original
> >>>>>>>>>> intention of
> >>>>>>>>>>       this KIP.
> >>>>>>>>>>       >>>>>>
> >>>>>>>>>>       >>>>>> That said, I don't think we should so easily give in
> >>>>>>>>>> to the
> >>>>>>>>>>       double brace
> >>>>>>>>>>       >>>>>> anti-pattern or force ours users into it if at all
> >>>>>>>>>> possible to
> >>>>>>>>>>       >>>>> avoid...just
> >>>>>>>>>>       >>>>>> my two cents.
> >>>>>>>>>>       >>>>>>
> >>>>>>>>>>       >>>>>> Cheers,
> >>>>>>>>>>       >>>>>> Sophie
> >>>>>>>>>>       >>>>>>
> >>>>>>>>>>       >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> >>>>>>>>>>       >>>>>> michael.droga...@confluent.io
> >>>>>>>>>>       <mailto:michael.droga...@confluent.io>> wrote:
> >>>>>>>>>>       >>>>>>
> >>>>>>>>>>       >>>>>>> I’d like to propose a different way of thinking
> >>>>>>>>>> about this.
> >>>>>>>>>>       To me,
> >>>>>>>>>>       >>>>> there
> >>>>>>>>>>       >>>>>>> are three problems with the existing branch 
> >>>>>>>>>> signature:
> >>>>>>>>>>       >>>>>>>
> >>>>>>>>>>       >>>>>>> 1. If you use it the way most people do, Java
> >>>>>>>>>> raises unsafe
> >>>>>>>>> type
> >>>>>>>>>>       >>>>>> warnings.
> >>>>>>>>>>       >>>>>>> 2. The way in which you use the stream branches is
> >>>>>>>>>>       positionally coupled
> >>>>>>>>>>       >>>>>> to
> >>>>>>>>>>       >>>>>>> the ordering of the conditionals.
> >>>>>>>>>>       >>>>>>> 3. It is brittle to extend existing branch calls 
> >>>>>>>>>> with
> >>>>>>>>>>       additional code
> >>>>>>>>>>       >>>>>>> paths.
> >>>>>>>>>>       >>>>>>>
> >>>>>>>>>>       >>>>>>> Using associative constructs instead of relying on
> >>>>>>>>>> ordered
> >>>>>>>>>>       constructs
> >>>>>>>>>>       >>>>>> would
> >>>>>>>>>>       >>>>>>> be a stronger approach. Consider a signature that
> >>>>>>>>>> instead
> >>>>>>>>>>       looks like
> >>>>>>>>>>       >>>>>> this:
> >>>>>>>>>>       >>>>>>> Map<String, KStream<K,V>>
> >>>>>>>>>> KStream#branch(SortedMap<String,
> >>>>>>>>>>       Predicate<?
> >>>>>>>>>>       >>>>>>> super K,? super V>>);
> >>>>>>>>>>       >>>>>>>
> >>>>>>>>>>       >>>>>>> Branches are given names in a map, and as a result,
> >>>>>>>>>> the API
> >>>>>>>>>>       returns a
> >>>>>>>>>>       >>>>>>> mapping of names to streams. The ordering of the
> >>>>>>>>> conditionals is
> >>>>>>>>>>       >>>>>> maintained
> >>>>>>>>>>       >>>>>>> because it’s a sorted map. Insert order determines
> >>>>>>>>>> the order
> >>>>>>>>> of
> >>>>>>>>>>       >>>>>> evaluation.
> >>>>>>>>>>       >>>>>>> This solves problem 1 because there are no more
> >>>>>>>>>> varargs. It
> >>>>>>>>>>       solves
> >>>>>>>>>>       >>>>>> problem
> >>>>>>>>>>       >>>>>>> 2 because you no longer lean on ordering to 
> >>>>>>>>>> access the
> >>>>>>>>>>       branch you’re
> >>>>>>>>>>       >>>>>>> interested in. It solves problem 3 because you can
> >>>>>>>>>> introduce
> >>>>>>>>>>       another
> >>>>>>>>>>       >>>>>>> conditional by simply attaching another name to the
> >>>>>>>>>>       structure, rather
> >>>>>>>>>>       >>>>>> than
> >>>>>>>>>>       >>>>>>> messing with the existing indices.
> >>>>>>>>>>       >>>>>>>
> >>>>>>>>>>       >>>>>>> One of the drawbacks is that creating the map
> >>>>>>>>>> inline is
> >>>>>>>>>>       historically
> >>>>>>>>>>       >>>>>>> awkward in Java. I know it’s an anti-pattern to use
> >>>>>>>>>>       voluminously, but
> >>>>>>>>>>       >>>>>>> double brace initialization would clean up the
> >>>>>>>>>> aesthetics.
> >>>>>>>>>>       >>>>>>>
> >>>>>>>>>>       >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
> >>>>>>>>>>       <j...@confluent.io <mailto:j...@confluent.io>>
> >>>>>>>>>>       >>>>> wrote:
> >>>>>>>>>>       >>>>>>>> Hi Ivan,
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>> Thanks for the update.
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>> FWIW, I agree with Matthias that the current 
> >>>>>>>>>> "start
> >>>>>>>>> branching"
> >>>>>>>>>>       >>>>> operator
> >>>>>>>>>>       >>>>>>> is
> >>>>>>>>>>       >>>>>>>> confusing when named the same way as the actual
> >>>>>>>>>> branches.
> >>>>>>>>>>       "Split"
> >>>>>>>>>>       >>>>> seems
> >>>>>>>>>>       >>>>>>>> like a good name. Alternatively, we can do without
> >>>>>>>>>> a "start
> >>>>>>>>>>       >>>>> branching"
> >>>>>>>>>>       >>>>>>>> operator at all, and just do:
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>> stream
> >>>>>>>>>>       >>>>>>>>      .branch(Predicate)
> >>>>>>>>>>       >>>>>>>>      .branch(Predicate)
> >>>>>>>>>>       >>>>>>>>      .defaultBranch();
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>> Tentatively, I think that this branching operation
> >>>>>>>>>> should be
> >>>>>>>>>>       >>>>> terminal.
> >>>>>>>>>>       >>>>>>> That
> >>>>>>>>>>       >>>>>>>> way, we don't create ambiguity about how to use
> >>>>>>>>>> it. That
> >>>>>>>>>>       is, `branch`
> >>>>>>>>>>       >>>>>>>> should return `KBranchedStream`, while
> >>>>>>>>>> `defaultBranch` is
> >>>>>>>>>>       `void`, to
> >>>>>>>>>>       >>>>>>>> enforce that it comes last, and that there is only
> >>>>>>>>>> one
> >>>>>>>>>>       definition of
> >>>>>>>>>>       >>>>>> the
> >>>>>>>>>>       >>>>>>>> default branch. Potentially, we should log a
> >>>>>>>>>> warning if
> >>>>>>>>>>       there's no
> >>>>>>>>>>       >>>>>>> default,
> >>>>>>>>>>       >>>>>>>> and additionally log a warning (or throw an
> >>>>>>>>>> exception) if a
> >>>>>>>>>>       record
> >>>>>>>>>>       >>>>>> falls
> >>>>>>>>>>       >>>>>>>> though with no default.
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>> Thoughts?
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>> Thanks,
> >>>>>>>>>>       >>>>>>>> -John
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
> >>>>>>>>>>       >>>>> matth...@confluent.io <mailto:matth...@confluent.io>
> >>>>>>>>>>       >>>>>>>> wrote:
> >>>>>>>>>>       >>>>>>>>
> >>>>>>>>>>       >>>>>>>>> Thanks for updating the KIP and your answers.
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> this is to make the name similar to String#split
> >>>>>>>>>>       >>>>>>>>>>> that also returns an array, right?
> >>>>>>>>>>       >>>>>>>>> The intend was to avoid name duplication. The
> >>>>>>>>>> return type
> >>>>>>>>>>       should
> >>>>>>>>>>       >>>>>> _not_
> >>>>>>>>>>       >>>>>>>>> be an array.
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> The current proposal is
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> stream.branch()
> >>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>>       >>>>>>>>>      .defaultBranch();
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> IMHO, this reads a little odd, because the first
> >>>>>>>>>>       `branch()` does
> >>>>>>>>>>       >>>>> not
> >>>>>>>>>>       >>>>>>>>> take any parameters and has different semantics
> >>>>>>>>>> than the
> >>>>>>>>> later
> >>>>>>>>>>       >>>>>>>>> `branch()` calls. Note, that from the code
> >>>>>>>>>> snippet above,
> >>>>>>>>> it's
> >>>>>>>>>>       >>>>> hidden
> >>>>>>>>>>       >>>>>>>>> that the first call is `KStream#branch()` while
> >>>>>>>>>> the others
> >>>>>>>>> are
> >>>>>>>>>>       >>>>>>>>> `KBranchedStream#branch()` what makes reading the
> >>>>>>>>>> code
> >>>>>>>>> harder.
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> Because I suggested to rename `addBranch()` ->
> >>>>>>>>>> `branch()`,
> >>>>>>>>>>       I though
> >>>>>>>>>>       >>>>>> it
> >>>>>>>>>>       >>>>>>>>> might be better to also rename `KStream#branch()`
> >>>>>>>>>> to avoid
> >>>>>>>>> the
> >>>>>>>>>>       >>>>> naming
> >>>>>>>>>>       >>>>>>>>> overlap that seems to be confusing. The following
> >>>>>>>>>> reads
> >>>>>>>>> much
> >>>>>>>>>>       >>>>> cleaner
> >>>>>>>>>>       >>>>>> to
> >>>>>>>>>>       >>>>>>>> me:
> >>>>>>>>>>       >>>>>>>>> stream.split()
> >>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>>       >>>>>>>>>      .defaultBranch();
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> Maybe there is a better alternative to `split()`
> >>>>>>>>>> though to
> >>>>>>>>>>       avoid
> >>>>>>>>>>       >>>>> the
> >>>>>>>>>>       >>>>>>>>> naming overlap.
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> 'default' is, however, a reserved word, so
> >>>>>>>>>> unfortunately
> >>>>>>>>> we
> >>>>>>>>>>       >>>>> cannot
> >>>>>>>>>>       >>>>>>> have
> >>>>>>>>>>       >>>>>>>>> a method with such name :-)
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> Bummer. Didn't consider this. Maybe we can still
> >>>>>>>>>> come up
> >>>>>>>>>>       with a
> >>>>>>>>>>       >>>>> short
> >>>>>>>>>>       >>>>>>>> name?
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> Can you add the interface `KBranchedStream` to
> >>>>>>>>>> the KIP
> >>>>>>>>>>       with all
> >>>>>>>>>>       >>>>> it's
> >>>>>>>>>>       >>>>>>>>> methods? It will be part of public API and 
> >>>>>>>>>> should be
> >>>>>>>>>>       contained in
> >>>>>>>>>>       >>>>> the
> >>>>>>>>>>       >>>>>>>>> KIP. For example, it's unclear atm, what the
> >>>>>>>>>> return type of
> >>>>>>>>>>       >>>>>>>>> `defaultBranch()` is.
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> You did not comment on the idea to add a
> >>>>>>>>>>       `KBranchedStream#get(int
> >>>>>>>>>>       >>>>>>> index)
> >>>>>>>>>>       >>>>>>>>> -> KStream` method to get the individually
> >>>>>>>>>>       branched-KStreams. Would
> >>>>>>>>>>       >>>>>> be
> >>>>>>>>>>       >>>>>>>>> nice to get your feedback about it. It seems you
> >>>>>>>>>> suggest
> >>>>>>>>>>       that users
> >>>>>>>>>>       >>>>>>>>> would need to write custom utility code
> >>>>>>>>>> otherwise, to
> >>>>>>>>>>       access them.
> >>>>>>>>>>       >>>>> We
> >>>>>>>>>>       >>>>>>>>> should discuss the pros and cons of both
> >>>>>>>>>> approaches. It
> >>>>>>>>> feels
> >>>>>>>>>>       >>>>>>>>> "incomplete" to me atm, if the API has no
> >>>>>>>>>> built-in support
> >>>>>>>>>>       to get
> >>>>>>>>>>       >>>>> the
> >>>>>>>>>>       >>>>>>>>> branched-KStreams directly.
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>> -Matthias
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> >>>>>>>>>>       >>>>>>>>>> Hi all!
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> I have updated the KIP-418 according to the new
> >>>>>>>>>> vision.
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> Matthias, thanks for your comment!
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> Renaming KStream#branch() -> #split()
> >>>>>>>>>>       >>>>>>>>>> I can see your point: this is to make the name
> >>>>>>>>>> similar to
> >>>>>>>>>>       >>>>>>> String#split
> >>>>>>>>>>       >>>>>>>>>> that also returns an array, right? But is it
> >>>>>>>>>> worth the
> >>>>>>>>>>       loss of
> >>>>>>>>>>       >>>>>>>> backwards
> >>>>>>>>>>       >>>>>>>>>> compatibility? We can have overloaded branch()
> >>>>>>>>>> as well
> >>>>>>>>>>       without
> >>>>>>>>>>       >>>>>>>> affecting
> >>>>>>>>>>       >>>>>>>>>> the existing code. Maybe the old array-based
> >>>>>>>>>> `branch`
> >>>>>>>>> method
> >>>>>>>>>>       >>>>> should
> >>>>>>>>>>       >>>>>>> be
> >>>>>>>>>>       >>>>>>>>>> deprecated, but this is a subject for 
> >>>>>>>>>> discussion.
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
> >>>>>>>>>>       >>>>> BranchingKStream#branch(),
> >>>>>>>>>>       >>>>>>>>>> KBranchedStream#defaultBranch() ->
> >>>>>>>>> BranchingKStream#default()
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> Totally agree with 'addBranch->branch' rename.
> >>>>>>>>>> 'default'
> >>>>>>>>> is,
> >>>>>>>>>>       >>>>>>> however, a
> >>>>>>>>>>       >>>>>>>>>> reserved word, so unfortunately we cannot have a
> >>>>>>>>>> method
> >>>>>>>>>>       with such
> >>>>>>>>>>       >>>>>>> name
> >>>>>>>>>>       >>>>>>>>> :-)
> >>>>>>>>>>       >>>>>>>>>>> defaultBranch() does take an `Predicate` as
> >>>>>>>>>> argument,
> >>>>>>>>> but I
> >>>>>>>>>>       >>>>> think
> >>>>>>>>>>       >>>>>>> that
> >>>>>>>>>>       >>>>>>>>>> is not required?
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> Absolutely! I think that was just copy-paste
> >>>>>>>>>> error or
> >>>>>>>>>>       something.
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> Dear colleagues,
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> please revise the new version of the KIP and
> >>>>>>>>>> Paul's PR
> >>>>>>>>>>       >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> Any new suggestions/objections?
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> Ivan
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
> >>>>>>>>>>       >>>>>>>>>>> Thanks for driving the discussion of this KIP.
> >>>>>>>>>> It seems
> >>>>>>>>> that
> >>>>>>>>>>       >>>>>>> everybody
> >>>>>>>>>>       >>>>>>>>>>> agrees that the current branch() method using
> >>>>>>>>>> arrays is
> >>>>>>>>> not
> >>>>>>>>>>       >>>>>> optimal.
> >>>>>>>>>>       >>>>>>>>>>> I had a quick look into the PR and I like the
> >>>>>>>>>> overall
> >>>>>>>>>>       proposal.
> >>>>>>>>>>       >>>>>>> There
> >>>>>>>>>>       >>>>>>>>>>> are some minor things we need to consider. I 
> >>>>>>>>>> would
> >>>>>>>>>>       recommend the
> >>>>>>>>>>       >>>>>>>>>>> following renaming:
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> KStream#branch() -> #split()
> >>>>>>>>>>       >>>>>>>>>>> KBranchedStream#addBranch() ->
> >>>>>>>>>> BranchingKStream#branch()
> >>>>>>>>>>       >>>>>>>>>>> KBranchedStream#defaultBranch() ->
> >>>>>>>>>>       BranchingKStream#default()
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> It's just a suggestion to get slightly shorter
> >>>>>>>>>> method
> >>>>>>>>> names.
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> In the current PR, defaultBranch() does take an
> >>>>>>>>>>       `Predicate` as
> >>>>>>>>>>       >>>>>>>> argument,
> >>>>>>>>>>       >>>>>>>>>>> but I think that is not required?
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> Also, we should consider KIP-307, that was
> >>>>>>>>>> recently
> >>>>>>>>>>       accepted and
> >>>>>>>>>>       >>>>>> is
> >>>>>>>>>>       >>>>>>>>>>> currently implemented:
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> >>>>>>>>>  
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> Ie, we should add overloads that accepted a
> >>>>>>>>>> `Named`
> >>>>>>>>>>       parameter.
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> For the issue that the created `KStream` object
> >>>>>>>>>> are in
> >>>>>>>>>>       different
> >>>>>>>>>>       >>>>>>>> scopes:
> >>>>>>>>>>       >>>>>>>>>>> could we extend `KBranchedStream` with a 
> >>>>>>>>>> `get(int
> >>>>>>>>>>       index)` method
> >>>>>>>>>>       >>>>>>> that
> >>>>>>>>>>       >>>>>>>>>>> returns the corresponding "branched" result
> >>>>>>>>>> `KStream`
> >>>>>>>>>>       object?
> >>>>>>>>>>       >>>>>> Maybe,
> >>>>>>>>>>       >>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>> second argument of `addBranch()` should not 
> >>>>>>>>>> be a
> >>>>>>>>>>       >>>>>> `Consumer<KStream>`
> >>>>>>>>>>       >>>>>>>> but
> >>>>>>>>>>       >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could
> >>>>>>>>>> return
> >>>>>>>>>>       whatever
> >>>>>>>>>>       >>>>>> the
> >>>>>>>>>>       >>>>>>>>>>> `Function` returns?
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> Finally, I would also suggest to update the KIP
> >>>>>>>>>> with the
> >>>>>>>>>>       current
> >>>>>>>>>>       >>>>>>>>>>> proposal. That makes it easier to review.
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>> -Matthias
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
> >>>>>>>>>>       >>>>>>>>>>>> Ivan,
> >>>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>> I'm a bit of a novice here as well, but I
> >>>>>>>>>> think it
> >>>>>>>>>>       makes sense
> >>>>>>>>>>       >>>>>> for
> >>>>>>>>>>       >>>>>>>> you
> >>>>>>>>>>       >>>>>>>>> to
> >>>>>>>>>>       >>>>>>>>>>>> revise the KIP and continue the discussion.
> >>>>>>>>>> Obviously
> >>>>>>>>>>       we'll
> >>>>>>>>>>       >>>>> need
> >>>>>>>>>>       >>>>>>>> some
> >>>>>>>>>>       >>>>>>>>>>>> buy-in from committers that have actual
> >>>>>>>>>> binding votes on
> >>>>>>>>>>       >>>>> whether
> >>>>>>>>>>       >>>>>>> the
> >>>>>>>>>>       >>>>>>>>> KIP
> >>>>>>>>>>       >>>>>>>>>>>> could be adopted.  It would be great to hear
> >>>>>>>>>> if they
> >>>>>>>>>>       think this
> >>>>>>>>>>       >>>>>> is
> >>>>>>>>>>       >>>>>>> a
> >>>>>>>>>>       >>>>>>>>> good
> >>>>>>>>>>       >>>>>>>>>>>> idea overall.  I'm not sure if that happens
> >>>>>>>>>> just by
> >>>>>>>>>>       starting a
> >>>>>>>>>>       >>>>>>> vote,
> >>>>>>>>>>       >>>>>>>>> or if
> >>>>>>>>>>       >>>>>>>>>>>> there is generally some indication of interest
> >>>>>>>>> beforehand.
> >>>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>> That being said, I'll continue the discussion
> >>>>>>>>>> a bit:
> >>>>>>>>>>       assuming
> >>>>>>>>>>       >>>>> we
> >>>>>>>>>>       >>>>>> do
> >>>>>>>>>>       >>>>>>>>> move
> >>>>>>>>>>       >>>>>>>>>>>> forward the solution of "stream.branch() 
> >>>>>>>>>> returns
> >>>>>>>>>>       >>>>>> KBranchedStream",
> >>>>>>>>>>       >>>>>>> do
> >>>>>>>>>>       >>>>>>>>> we
> >>>>>>>>>>       >>>>>>>>>>>> deprecate "stream.branch(...) returns
> >>>>>>>>>> KStream[]"?  I
> >>>>>>>>> would
> >>>>>>>>>>       >>>>> favor
> >>>>>>>>>>       >>>>>>>>>>>> deprecating, since having two mutually
> >>>>>>>>>> exclusive APIs
> >>>>>>>>> that
> >>>>>>>>>>       >>>>>>> accomplish
> >>>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>> same thing is confusing, especially when
> >>>>>>>>>> they're fairly
> >>>>>>>>>>       similar
> >>>>>>>>>>       >>>>>>>>> anyway.  We
> >>>>>>>>>>       >>>>>>>>>>>> just need to be sure we're not making 
> >>>>>>>>>> something
> >>>>>>>>>>       >>>>>>> impossible/difficult
> >>>>>>>>>>       >>>>>>>>> that
> >>>>>>>>>>       >>>>>>>>>>>> is currently possible/easy.
> >>>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>> Regarding my PR - I think the general
> >>>>>>>>>> structure would
> >>>>>>>>> work,
> >>>>>>>>>>       >>>>> it's
> >>>>>>>>>>       >>>>>>>> just a
> >>>>>>>>>>       >>>>>>>>>>>> little sloppy overall in terms of naming and
> >>>>>>>>>> clarity. In
> >>>>>>>>>>       >>>>>>> particular,
> >>>>>>>>>>       >>>>>>>>>>>> passing in the "predicates" and "children"
> >>>>>>>>>> lists which
> >>>>>>>>> get
> >>>>>>>>>>       >>>>>> modified
> >>>>>>>>>>       >>>>>>>> in
> >>>>>>>>>>       >>>>>>>>>>>> KBranchedStream but read from all the way
> >>>>>>>>>>       KStreamLazyBranch is
> >>>>>>>>>>       >>>>> a
> >>>>>>>>>>       >>>>>>> bit
> >>>>>>>>>>       >>>>>>>>>>>> complicated to follow.
> >>>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>> Thanks,
> >>>>>>>>>>       >>>>>>>>>>>> Paul
> >>>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan 
> >>>>>>>>>> Ponomarev <
> >>>>>>>>>>       >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >>>>>>>>>>       >>>>>>>>> wrote:
> >>>>>>>>>>       >>>>>>>>>>>>> Hi Paul!
> >>>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>> I read your code carefully and now I am fully
> >>>>>>>>>>       convinced: your
> >>>>>>>>>>       >>>>>>>> proposal
> >>>>>>>>>>       >>>>>>>>>>>>> looks better and should work. We just have to
> >>>>>>>>>> document
> >>>>>>>>> the
> >>>>>>>>>>       >>>>>> crucial
> >>>>>>>>>>       >>>>>>>>> fact
> >>>>>>>>>>       >>>>>>>>>>>>> that KStream consumers are invoked as they're
> >>>>>>>>>> added.
> >>>>>>>>>>       And then
> >>>>>>>>>>       >>>>>> it's
> >>>>>>>>>>       >>>>>>>> all
> >>>>>>>>>>       >>>>>>>>>>>>> going to be very nice.
> >>>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>> What shall we do now? I should re-write the
> >>>>>>>>>> KIP and
> >>>>>>>>>>       resume the
> >>>>>>>>>>       >>>>>>>>>>>>> discussion here, right?
> >>>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>> Why are you telling that your PR 'should not
> >>>>>>>>>> be even a
> >>>>>>>>>>       >>>>> starting
> >>>>>>>>>>       >>>>>>>> point
> >>>>>>>>>>       >>>>>>>>> if
> >>>>>>>>>>       >>>>>>>>>>>>> we go in this direction'? To me it looks like
> >>>>>>>>>> a good
> >>>>>>>>>>       starting
> >>>>>>>>>>       >>>>>>> point.
> >>>>>>>>>>       >>>>>>>>> But
> >>>>>>>>>>       >>>>>>>>>>>>> as a novice in this project I might miss some
> >>>>>>>>>> important
> >>>>>>>>>>       >>>>> details.
> >>>>>>>>>>       >>>>>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>> Ivan
> >>>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
> >>>>>>>>>>       >>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>       >>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>> Maybe I’m missing the point, but I 
> >>>>>>>>>> believe the
> >>>>>>>>>>       >>>>> stream.branch()
> >>>>>>>>>>       >>>>>>>>> solution
> >>>>>>>>>>       >>>>>>>>>>>>> supports this. The couponIssuer::set*
> >>>>>>>>>> consumers will be
> >>>>>>>>>>       >>>>> invoked
> >>>>>>>>>>       >>>>>> as
> >>>>>>>>>>       >>>>>>>>> they’re
> >>>>>>>>>>       >>>>>>>>>>>>> added, not during streamsBuilder.build(). So
> >>>>>>>>>> the user
> >>>>>>>>>>       still
> >>>>>>>>>>       >>>>>> ought
> >>>>>>>>>>       >>>>>>> to
> >>>>>>>>>>       >>>>>>>>> be
> >>>>>>>>>>       >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward
> >>>>>>>>>> and
> >>>>>>>>>>       depend on
> >>>>>>>>>>       >>>>> the
> >>>>>>>>>>       >>>>>>>>> branched
> >>>>>>>>>>       >>>>>>>>>>>>> streams having been set.
> >>>>>>>>>>       >>>>>>>>>>>>>> The issue I mean to point out is that it is
> >>>>>>>>>> hard to
> >>>>>>>>>>       access
> >>>>>>>>>>       >>>>> the
> >>>>>>>>>>       >>>>>>>>> branched
> >>>>>>>>>>       >>>>>>>>>>>>> streams in the same scope as the original
> >>>>>>>>>> stream (that
> >>>>>>>>>>       is, not
> >>>>>>>>>>       >>>>>>>> inside
> >>>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>> couponIssuer), which is a problem with both
> >>>>>>>>>> proposed
> >>>>>>>>>>       >>>>> solutions.
> >>>>>>>>>>       >>>>>> It
> >>>>>>>>>>       >>>>>>>>> can be
> >>>>>>>>>>       >>>>>>>>>>>>> worked around though.
> >>>>>>>>>>       >>>>>>>>>>>>>> [Also, great to hear additional interest in
> >>>>>>>>>> 401, I’m
> >>>>>>>>>>       excited
> >>>>>>>>>>       >>>>> to
> >>>>>>>>>>       >>>>>>>> hear
> >>>>>>>>>>       >>>>>>>>>>>>> your thoughts!]
> >>>>>>>>>>       >>>>>>>>>>>>>> Paul
> >>>>>>>>>>       >>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan 
> >>>>>>>>>> Ponomarev <
> >>>>>>>>>>       >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >>>>>>>>>>       >>>>>>>>> wrote:
> >>>>>>>>>>       >>>>>>>>>>>>>>> Hi Paul!
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> The idea to postpone the wiring of branches
> >>>>>>>>>> to the
> >>>>>>>>>>       >>>>>>>>>>>>> streamsBuilder.build() also looked great for
> >>>>>>>>>> me at
> >>>>>>>>> first
> >>>>>>>>>>       >>>>> glance,
> >>>>>>>>>>       >>>>>>> but
> >>>>>>>>>>       >>>>>>>>> ---
> >>>>>>>>>>       >>>>>>>>>>>>>>>> the newly branched streams are not
> >>>>>>>>>> available in the
> >>>>>>>>>>       same
> >>>>>>>>>>       >>>>>> scope
> >>>>>>>>>>       >>>>>>> as
> >>>>>>>>>>       >>>>>>>>> each
> >>>>>>>>>>       >>>>>>>>>>>>> other.  That is, if we wanted to merge 
> >>>>>>>>>> them back
> >>>>>>>>> together
> >>>>>>>>>>       >>>>> again
> >>>>>>>>>>       >>>>>> I
> >>>>>>>>>>       >>>>>>>>> don't see
> >>>>>>>>>>       >>>>>>>>>>>>> a way to do that.
> >>>>>>>>>>       >>>>>>>>>>>>>>> You just took the words right out of my
> >>>>>>>>>> mouth, I was
> >>>>>>>>>>       just
> >>>>>>>>>>       >>>>>> going
> >>>>>>>>>>       >>>>>>> to
> >>>>>>>>>>       >>>>>>>>>>>>> write in details about this issue.
> >>>>>>>>>>       >>>>>>>>>>>>>>> Consider the example from Bill's book, p.
> >>>>>>>>>> 101: say
> >>>>>>>>>>       we need
> >>>>>>>>>>       >>>>> to
> >>>>>>>>>>       >>>>>>>>> identify
> >>>>>>>>>>       >>>>>>>>>>>>> customers who have bought coffee and made a
> >>>>>>>>>> purchase
> >>>>>>>>>>       in the
> >>>>>>>>>>       >>>>>>>>> electronics
> >>>>>>>>>>       >>>>>>>>>>>>> store to give them coupons.
> >>>>>>>>>>       >>>>>>>>>>>>>>> This is the code I usually write under 
> >>>>>>>>>> these
> >>>>>>>>>>       circumstances
> >>>>>>>>>>       >>>>>> using
> >>>>>>>>>>       >>>>>>>> my
> >>>>>>>>>>       >>>>>>>>>>>>> 'brancher' class:
> >>>>>>>>>>       >>>>>>>>>>>>>>> @Setter
> >>>>>>>>>>       >>>>>>>>>>>>>>> class CouponIssuer{
> >>>>>>>>>>       >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
> >>>>>>>>>>       >>>>>>>>>>>>>>>   private KStream<....> 
> >>>>>>>>>> electronicsPurchases;
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>   KStream<...> coupons(){
> >>>>>>>>>>       >>>>>>>>>>>>>>>       return
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
> >>>>>>>>>>       >>>>>>>>>>>>>>>       /*In the real world the code here 
> >>>>>>>>>> can be
> >>>>>>>>>>       complex, so
> >>>>>>>>>>       >>>>>>>>> creation of
> >>>>>>>>>>       >>>>>>>>>>>>> a separate CouponIssuer class is fully
> >>>>>>>>>> justified, in
> >>>>>>>>>>       order to
> >>>>>>>>>>       >>>>>>>> separate
> >>>>>>>>>>       >>>>>>>>>>>>> classes' responsibilities.*/
> >>>>>>>>>>       >>>>>>>>>>>>>>>  }
> >>>>>>>>>>       >>>>>>>>>>>>>>> }
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new
> >>>>>>>>>> CouponIssuer();
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
> >>>>>>>>>>       >>>>>>>>>>>>>>>     .branch(predicate1,
> >>>>>>>>> couponIssuer::setCoffePurchases)
> >>>>>>>>>>       >>>>>>>>>>>>>>>     .branch(predicate2,
> >>>>>>>>>>       >>>>>> couponIssuer::setElectronicsPurchases)
> >>>>>>>>>>       >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to
> >>>>>>>>>> wire up
> >>>>>>>>>>       everything
> >>>>>>>>>>       >>>>>>>> later,
> >>>>>>>>>>       >>>>>>>>>>>>> without the terminal operation!!!*/
> >>>>>>>>>>       >>>>>>>>>>>>>>> couponIssuer.coupons()...
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> Does this make sense?  In order to properly
> >>>>>>>>>>       initialize the
> >>>>>>>>>>       >>>>>>>>> CouponIssuer
> >>>>>>>>>>       >>>>>>>>>>>>> we need the terminal operation to be called
> >>>>>>>>>> before
> >>>>>>>>>>       >>>>>>>>> streamsBuilder.build()
> >>>>>>>>>>       >>>>>>>>>>>>> is called.
> >>>>>>>>>>       >>>>>>>>>>>>>>> [BTW Paul, I just found out that your
> >>>>>>>>>> KIP-401 is
> >>>>>>>>>>       essentially
> >>>>>>>>>>       >>>>>> the
> >>>>>>>>>>       >>>>>>>>> next
> >>>>>>>>>>       >>>>>>>>>>>>> KIP I was going to write here. I have some
> >>>>>>>>>> thoughts
> >>>>>>>>>>       based on
> >>>>>>>>>>       >>>>> my
> >>>>>>>>>>       >>>>>>>>> experience,
> >>>>>>>>>>       >>>>>>>>>>>>> so I will join the discussion on KIP-401 
> >>>>>>>>>> soon.]
> >>>>>>>>>>       >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> >>>>>>>>>>       >>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>       >>>>>>>>>>>>>>>> I tried to make a very rough proof of
> >>>>>>>>>> concept of a
> >>>>>>>>>>       fluent
> >>>>>>>>>>       >>>>> API
> >>>>>>>>>>       >>>>>>>> based
> >>>>>>>>>>       >>>>>>>>>>>>> off of
> >>>>>>>>>>       >>>>>>>>>>>>>>>> KStream here
> >>>>>>>>>>       (https://github.com/apache/kafka/pull/6512),
> >>>>>>>>>>       >>>>>> and
> >>>>>>>>>>       >>>>>>> I
> >>>>>>>>>>       >>>>>>>>> think
> >>>>>>>>>>       >>>>>>>>>>>>> I
> >>>>>>>>>>       >>>>>>>>>>>>>>>> succeeded at removing both cons.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect
> >>>>>>>>>> earlier about
> >>>>>>>>>>       >>>>>>> compatibility
> >>>>>>>>>>       >>>>>>>>>>>>> issues,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was
> >>>>>>>>>> unaware
> >>>>>>>>>>       that Java
> >>>>>>>>>>       >>>>> is
> >>>>>>>>>>       >>>>>>>> smart
> >>>>>>>>>>       >>>>>>>>>>>>> enough to
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    distinguish between a 
> >>>>>>>>>> branch(varargs...)
> >>>>>>>>>>       returning one
> >>>>>>>>>>       >>>>>>> thing
> >>>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>>       >>>>>>>>>>>>> branch()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    with no arguments returning another 
> >>>>>>>>>> thing.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't
> >>>>>>>>>> actually
> >>>>>>>>>>       need
> >>>>>>>>>>       >>>>> it.
> >>>>>>>>>>       >>>>>>> We
> >>>>>>>>>>       >>>>>>>>> can
> >>>>>>>>>>       >>>>>>>>>>>>> just
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    build up the branches in the
> >>>>>>>>>> KBranchedStream who
> >>>>>>>>>>       shares
> >>>>>>>>>>       >>>>>> its
> >>>>>>>>>>       >>>>>>>>> state
> >>>>>>>>>>       >>>>>>>>>>>>> with the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do
> >>>>>>>>>> the
> >>>>>>>>>>       branching.
> >>>>>>>>>>       >>>>>>> It's
> >>>>>>>>>>       >>>>>>>>> not
> >>>>>>>>>>       >>>>>>>>>>>>> terribly
> >>>>>>>>>>       >>>>>>>>>>>>>>>>    pretty in its current form, but I 
> >>>>>>>>>> think it
> >>>>>>>>>>       demonstrates
> >>>>>>>>>>       >>>>>> its
> >>>>>>>>>>       >>>>>>>>>>>>> feasibility.
> >>>>>>>>>>       >>>>>>>>>>>>>>>> To be clear, I don't think that pull
> >>>>>>>>>> request should
> >>>>>>>>> be
> >>>>>>>>>>       >>>>> final
> >>>>>>>>>>       >>>>>> or
> >>>>>>>>>>       >>>>>>>>> even a
> >>>>>>>>>>       >>>>>>>>>>>>>>>> starting point if we go in this direction,
> >>>>>>>>>> I just
> >>>>>>>>>>       wanted to
> >>>>>>>>>>       >>>>>> see
> >>>>>>>>>>       >>>>>>>> how
> >>>>>>>>>>       >>>>>>>>>>>>>>>> challenging it would be to get the API
> >>>>>>>>>> working.
> >>>>>>>>>>       >>>>>>>>>>>>>>>> I will say though, that I'm not sure the
> >>>>>>>>>> existing
> >>>>>>>>>>       solution
> >>>>>>>>>>       >>>>>>> could
> >>>>>>>>>>       >>>>>>>> be
> >>>>>>>>>>       >>>>>>>>>>>>>>>> deprecated in favor of this, which I had
> >>>>>>>>>> originally
> >>>>>>>>>>       >>>>> suggested
> >>>>>>>>>>       >>>>>>>> was a
> >>>>>>>>>>       >>>>>>>>>>>>>>>> possibility.  The reason is that the newly
> >>>>>>>>>> branched
> >>>>>>>>>>       streams
> >>>>>>>>>>       >>>>>> are
> >>>>>>>>>>       >>>>>>>> not
> >>>>>>>>>>       >>>>>>>>>>>>>>>> available in the same scope as each
> >>>>>>>>>> other.  That
> >>>>>>>>>>       is, if we
> >>>>>>>>>>       >>>>>>> wanted
> >>>>>>>>>>       >>>>>>>>> to
> >>>>>>>>>>       >>>>>>>>>>>>> merge
> >>>>>>>>>>       >>>>>>>>>>>>>>>> them back together again I don't see a way
> >>>>>>>>>> to do
> >>>>>>>>>>       that.  The
> >>>>>>>>>>       >>>>>> KIP
> >>>>>>>>>>       >>>>>>>>>>>>> proposal
> >>>>>>>>>>       >>>>>>>>>>>>>>>> has the same issue, though - all this
> >>>>>>>>>> means is that
> >>>>>>>>> for
> >>>>>>>>>>       >>>>>> either
> >>>>>>>>>>       >>>>>>>>>>>>> solution,
> >>>>>>>>>>       >>>>>>>>>>>>>>>> deprecating the existing branch(...) is
> >>>>>>>>>> not on the
> >>>>>>>>>>       table.
> >>>>>>>>>>       >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>       >>>>>>>>>>>>>>>> Paul
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan
> >>>>>>>>>> Ponomarev <
> >>>>>>>>>>       >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>>
> >>>>>>>>>>       >>>>>>>>>>>>> wrote:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> OK, let me summarize what we have
> >>>>>>>>>> discussed up to
> >>>>>>>>> this
> >>>>>>>>>>       >>>>>> point.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed
> >>>>>>>>>> that
> >>>>>>>>>>       branch API
> >>>>>>>>>>       >>>>>>> needs
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> improvement. Motivation is given in 
> >>>>>>>>>> the KIP.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> There are two potential ways to do it:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...)....
> >>>>>>>>>> //onTopOf
> >>>>>>>>>>       returns
> >>>>>>>>>>       >>>>>> its
> >>>>>>>>>>       >>>>>>>>> argument
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2)
> >>>>>>>>>> The code
> >>>>>>>>> won't
> >>>>>>>>>>       >>>>> make
> >>>>>>>>>>       >>>>>>>> sense
> >>>>>>>>>>       >>>>>>>>>>>>> until
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> all the necessary ingredients are 
> >>>>>>>>>> provided.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> CONS: The need to create a
> >>>>>>>>>> KafkaStreamsBrancher
> >>>>>>>>>>       instance
> >>>>>>>>>>       >>>>>>>>> contrasts the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> fluency of other KStream methods.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> stream
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or
> >>>>>>>>>> noDefault(). Both
> >>>>>>>>>>       >>>>>>>>> defaultBranch(..)
> >>>>>>>>>>       >>>>>>>>>>>>> and
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> noDefault() return void
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams
> >>>>>>>>>> interface
> >>>>>>>>> is
> >>>>>>>>>>       >>>>>> defined.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> CONS: We need to define two terminal 
> >>>>>>>>>> methods
> >>>>>>>>>>       >>>>>>>> (defaultBranch(ks->)
> >>>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very
> >>>>>>>>>> easy to
> >>>>>>>>>>       miss the
> >>>>>>>>>>       >>>>>> fact
> >>>>>>>>>>       >>>>>>>>> that one
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> of the terminal methods should be called.
> >>>>>>>>>> If these
> >>>>>>>>>>       methods
> >>>>>>>>>>       >>>>>> are
> >>>>>>>>>>       >>>>>>>> not
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> called, we can throw an exception in
> >>>>>>>>>> runtime.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can
> >>>>>>>>>> we do
> >>>>>>>>> better?
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Paul,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> I see your point when you are talking
> >>>>>>>>>> about
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot 
> >>>>>>>>>> not be
> >>>>>>>>>>       implemented the
> >>>>>>>>>>       >>>>>>> easy
> >>>>>>>>>>       >>>>>>>>> way.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> user could specify a terminal 
> >>>>>>>>>> method that
> >>>>>>>>> assumes
> >>>>>>>>>>       >>>>> nothing
> >>>>>>>>>>       >>>>>>>> will
> >>>>>>>>>>       >>>>>>>>>>>>> reach
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> the default branch,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> throwing an exception if such a case
> >>>>>>>>>> occurs.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be
> >>>>>>>>>> the only
> >>>>>>>>> option
> >>>>>>>>>>       >>>>>> besides
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios
> >>>>>>>>>> when we
> >>>>>>>>>>       want to
> >>>>>>>>>>       >>>>>> just
> >>>>>>>>>>       >>>>>>>>> silently
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
> >>>>>>>>>>       predicate. 2)
> >>>>>>>>>>       >>>>>>> Throwing
> >>>>>>>>>>       >>>>>>>>> an
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> exception in the middle of data flow
> >>>>>>>>>> processing
> >>>>>>>>>>       looks
> >>>>>>>>>>       >>>>>> like a
> >>>>>>>>>>       >>>>>>>> bad
> >>>>>>>>>>       >>>>>>>>>>>>> idea.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would
> >>>>>>>>>> prefer to
> >>>>>>>>>>       emit a
> >>>>>>>>>>       >>>>>>>> special
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is
> >>>>>>>>>> exactly
> >>>>>>>>> where
> >>>>>>>>>>       >>>>>>> `default`
> >>>>>>>>>>       >>>>>>>>> can
> >>>>>>>>>>       >>>>>>>>>>>>> be
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> used.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >>>>>>>>>>       >>>>> InternalTopologyBuilder
> >>>>>>>>>>       >>>>>>> to
> >>>>>>>>>>       >>>>>>>>> track
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> dangling
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> branches that haven't been terminated
> >>>>>>>>>> and raise
> >>>>>>>>>>       a clear
> >>>>>>>>>>       >>>>>>> error
> >>>>>>>>>>       >>>>>>>>>>>>> before it
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> becomes an issue.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the
> >>>>>>>>>> program is
> >>>>>>>>>>       >>>>> compiled
> >>>>>>>>>>       >>>>>>> and
> >>>>>>>>>>       >>>>>>>>> run?
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply 
> >>>>>>>>>> won't
> >>>>>>>>>>       compile if
> >>>>>>>>>>       >>>>> used
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an 
> >>>>>>>>>> API as a
> >>>>>>>>>>       method chain
> >>>>>>>>>>       >>>>>>>> starting
> >>>>>>>>>>       >>>>>>>>>>>>> from
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost
> >>>>>>>>>> difference
> >>>>>>>>>>       between
> >>>>>>>>>>       >>>>>>>> runtime
> >>>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure
> >>>>>>>>>> uncovers
> >>>>>>>>>>       >>>>> instantly
> >>>>>>>>>>       >>>>>> on
> >>>>>>>>>>       >>>>>>>>> unit
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> tests, it costs more for the project
> >>>>>>>>>> than a
> >>>>>>>>>>       compilation
> >>>>>>>>>>       >>>>>>>> failure.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Good point about the terminal
> >>>>>>>>>> operation being
> >>>>>>>>>>       required.
> >>>>>>>>>>       >>>>>>> But
> >>>>>>>>>>       >>>>>>>> is
> >>>>>>>>>>       >>>>>>>>>>>>> that
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> really
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't
> >>>>>>>>>> want a
> >>>>>>>>>>       >>>>>> defaultBranch
> >>>>>>>>>>       >>>>>>>>> they
> >>>>>>>>>>       >>>>>>>>>>>>> can
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> call
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> some other terminal method
> >>>>>>>>>> (noDefaultBranch()?)
> >>>>>>>>>>       just as
> >>>>>>>>>>       >>>>>>>>> easily.  In
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> fact I
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a
> >>>>>>>>>> nicer API
> >>>>>>>>> - a
> >>>>>>>>>>       >>>>> user
> >>>>>>>>>>       >>>>>>>> could
> >>>>>>>>>>       >>>>>>>>>>>>> specify
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing
> >>>>>>>>>> will reach
> >>>>>>>>> the
> >>>>>>>>>>       >>>>>> default
> >>>>>>>>>>       >>>>>>>>> branch,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case
> >>>>>>>>>> occurs.
> >>>>>>>>> That
> >>>>>>>>>>       >>>>> seems
> >>>>>>>>>>       >>>>>>> like
> >>>>>>>>>>       >>>>>>>>> an
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> improvement over the current branch()
> >>>>>>>>>> API,
> >>>>>>>>>>       which allows
> >>>>>>>>>>       >>>>>> for
> >>>>>>>>>>       >>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>> more
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> subtle
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly 
> >>>>>>>>>> getting
> >>>>>>>>> dropped.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> The need for a terminal operation
> >>>>>>>>>> certainly has
> >>>>>>>>>>       to be
> >>>>>>>>>>       >>>>>> well
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> documented, but
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >>>>>>>>>>       >>>>> InternalTopologyBuilder
> >>>>>>>>>>       >>>>>>> to
> >>>>>>>>>>       >>>>>>>>> track
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> dangling
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated
> >>>>>>>>>> and raise
> >>>>>>>>>>       a clear
> >>>>>>>>>>       >>>>>>> error
> >>>>>>>>>>       >>>>>>>>>>>>> before it
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that
> >>>>>>>>>> there is
> >>>>>>>>> a
> >>>>>>>>>>       >>>>> "build
> >>>>>>>>>>       >>>>>>>> step"
> >>>>>>>>>>       >>>>>>>>>>>>> where
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
> >>>>>>>>>>       >>>>>> StreamsBuilder.build()
> >>>>>>>>>>       >>>>>>> is
> >>>>>>>>>>       >>>>>>>>>>>>> called.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its
> >>>>>>>>>> argument, I
> >>>>>>>>> agree
> >>>>>>>>>>       >>>>> that
> >>>>>>>>>>       >>>>>>> it's
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> critical to
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> allow users to do other operations on
> >>>>>>>>>> the input
> >>>>>>>>>>       stream.
> >>>>>>>>>>       >>>>>>> With
> >>>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> fluent
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same
> >>>>>>>>>> way all
> >>>>>>>>> other
> >>>>>>>>>>       >>>>>>> operations
> >>>>>>>>>>       >>>>>>>>> do -
> >>>>>>>>>>       >>>>>>>>>>>>> if
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> want to process off the original 
> >>>>>>>>>> KStream
> >>>>>>>>> multiple
> >>>>>>>>>>       >>>>> times,
> >>>>>>>>>>       >>>>>>> you
> >>>>>>>>>>       >>>>>>>>> just
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> need the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call
> >>>>>>>>>> as many
> >>>>>>>>>>       operations
> >>>>>>>>>>       >>>>>> on
> >>>>>>>>>>       >>>>>>> it
> >>>>>>>>>>       >>>>>>>>> as
> >>>>>>>>>>       >>>>>>>>>>>>> you
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> desire.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Thoughts?
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Paul
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan
> >>>>>>>>>> Ponomarev <
> >>>>>>>>>>       >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Hello Paul,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we
> >>>>>>>>>> do not
> >>>>>>>>>>       always need
> >>>>>>>>>>       >>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
> >>>>>>>>> operation we
> >>>>>>>>>>       >>>>> don't
> >>>>>>>>>>       >>>>>>>> know
> >>>>>>>>>>       >>>>>>>>>>>>> when to
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch 
> >>>>>>>>>> switch'.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its
> >>>>>>>>>> argument,
> >>>>>>>>>>       so we
> >>>>>>>>>>       >>>>> can
> >>>>>>>>>>       >>>>>> do
> >>>>>>>>>>       >>>>>>>>>>>>> something
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> more with the original branch after
> >>>>>>>>>> branching.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> I understand your point that the 
> >>>>>>>>>> need of
> >>>>>>>>> special
> >>>>>>>>>>       >>>>> object
> >>>>>>>>>>       >>>>>>>>>>>>> construction
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream
> >>>>>>>>>> methods.
> >>>>>>>>> But
> >>>>>>>>>>       >>>>> here
> >>>>>>>>>>       >>>>>> we
> >>>>>>>>>>       >>>>>>>>> have a
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to
> >>>>>>>>>> split the
> >>>>>>>>>>       flow,
> >>>>>>>>>>       >>>>> so
> >>>>>>>>>>       >>>>>> I
> >>>>>>>>>>       >>>>>>>>> think
> >>>>>>>>>>       >>>>>>>>>>>>> this
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> still idiomatic.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve
> >>>>>>>>>> this
> >>>>>>>>>>       API, but I
> >>>>>>>>>>       >>>>>> find
> >>>>>>>>>>       >>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> onTopOff()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing 
> >>>>>>>>>> since it
> >>>>>>>>>>       contrasts the
> >>>>>>>>>>       >>>>>>> fluency
> >>>>>>>>>>       >>>>>>>>> of
> >>>>>>>>>>       >>>>>>>>>>>>> other
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd
> >>>>>>>>>> like to
> >>>>>>>>>>       just call
> >>>>>>>>>>       >>>>> a
> >>>>>>>>>>       >>>>>>>>> method on
> >>>>>>>>>>       >>>>>>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> stream
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if
> >>>>>>>>>> the branch
> >>>>>>>>>>       cases
> >>>>>>>>>>       >>>>> are
> >>>>>>>>>>       >>>>>>>>> defined
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> fluently.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate,
> >>>>>>>>>> handleCase)
> >>>>>>>>>>       is very
> >>>>>>>>>>       >>>>>> nice
> >>>>>>>>>>       >>>>>>>>> and the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> right
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped
> >>>>>>>>>> around
> >>>>>>>>>>       how we
> >>>>>>>>>>       >>>>>>> specify
> >>>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>> source
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> stream.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Like:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> stream.branch()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
> >>>>>>>>> this::handle1)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
> >>>>>>>>> this::handle2)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> .defaultBranch(this::handleDefault);
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a
> >>>>>>>>>> KBranchedStreams or
> >>>>>>>>>>       >>>>>>>> KStreamBrancher
> >>>>>>>>>>       >>>>>>>>> or
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> something,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
> >>>>>>>>>>       terminated by
> >>>>>>>>>>       >>>>>>>>>>>>> defaultBranch()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> (which
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
> >>>>>>>>>>       incompatible with
> >>>>>>>>>>       >>>>> the
> >>>>>>>>>>       >>>>>>>>> current
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> API, so
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to
> >>>>>>>>>> have a
> >>>>>>>>>>       different
> >>>>>>>>>>       >>>>>> name,
> >>>>>>>>>>       >>>>>>>> but
> >>>>>>>>>>       >>>>>>>>> that
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we
> >>>>>>>>>> could call it
> >>>>>>>>>>       >>>>>> something
> >>>>>>>>>>       >>>>>>>> like
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> branched()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the
> >>>>>>>>>> old API.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of
> >>>>>>>>>> your
> >>>>>>>>>>       KIP?  It
> >>>>>>>>>>       >>>>>> seems
> >>>>>>>>>>       >>>>>>>>> like it
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> does to
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line
> >>>>>>>>>> branching
> >>>>>>>>>>       while also
> >>>>>>>>>>       >>>>>>>> allowing
> >>>>>>>>>>       >>>>>>>>> you
> >>>>>>>>>>       >>>>>>>>>>>>> to
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
> >>>>>>>>>>       KBranchedStreams
> >>>>>>>>>>       >>>>>> if
> >>>>>>>>>>       >>>>>>>>> desired.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Paul
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan
> >>>>>>>>>> Ponomarev
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> void
> >>>>>>>>>> handleFirstCase(KStream<String, String>
> >>>>>>>>>>       ks){
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> ks.filter(....).mapValues(...)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> void 
> >>>>>>>>>> handleSecondCase(KStream<String,
> >>>>>>>>>>       String> ks){
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> ks.selectKey(...).groupByKey()...
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> ......
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String,
> >>>>>>>>>> String>()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
> >>>>>>>>>>       this::handleFirstCase)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
> >>>>>>>>>>       this::handleSecondCase)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
> >>>>>>>>> KafkaStreamsBrancher
> >>>>>>>>>>       >>>>> takes a
> >>>>>>>>>>       >>>>>>>>> Consumer
> >>>>>>>>>>       >>>>>>>>>>>>> as a
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> second
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing,
> >>>>>>>>>> and the
> >>>>>>>>>>       example in
> >>>>>>>>>>       >>>>>> the
> >>>>>>>>>>       >>>>>>>> KIP
> >>>>>>>>>>       >>>>>>>>>>>>> shows
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> each
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a
> >>>>>>>>>> terminal node
> >>>>>>>>>>       >>>>>>>>> (KafkaStreams#to()
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> in this
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> case).
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but
> >>>>>>>>>> how would
> >>>>>>>>> we
> >>>>>>>>>>       >>>>> handle
> >>>>>>>>>>       >>>>>>> the
> >>>>>>>>>>       >>>>>>>>> case
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> where the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but
> >>>>>>>>>> wants to
> >>>>>>>>> continue
> >>>>>>>>>>       >>>>>>>> processing
> >>>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>>       >>>>>>>>>>>>> not
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on
> >>>>>>>>>> the
> >>>>>>>>> branched
> >>>>>>>>>>       >>>>>> stream
> >>>>>>>>>>       >>>>>>>>>>>>> immediately?
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic
> >>>>>>>>>> as is if
> >>>>>>>>>>       we had
> >>>>>>>>>>       >>>>>>>> something
> >>>>>>>>>>       >>>>>>>>> like
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> this:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] 
> >>>>>>>>>> branches =
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> branches[0].filter(....).mapValues(...)..
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM
> >>>>>>>>>> Bill Bejeck
> >>>>>>>>> <
> >>>>>>>>>>       >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> All,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the
> >>>>>>>>>> discussion for
> >>>>>>>>> KIP-
> >>>>>>>>>>       >>>>> 418.
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion 
> >>>>>>>>>> about
> >>>>>>>>> KIP-418.
> >>>>>>>>>>       >>>>> Please
> >>>>>>>>>>       >>>>>>>> take
> >>>>>>>>>>       >>>>>>>>> a
> >>>>>>>>>>       >>>>>>>>>>>>> look
> >>>>>>>>>>       >>>>>>>>>>>>>>>>> at
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would
> >>>>>>>>>> appreciate any
> >>>>>>>>>>       feedback :)
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
> >>>>>>>>>>       >>>>>
> >>>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>>>>>>>>  
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> >>>>>>>>>>       >>>>>>>>>>>>> 
> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
> >>>>>>>>>>       >>>>> https://github.com/apache/kafka/pull/6164
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>       >>>>>>>>>
> >>>>>>>>>>       >
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> > 
> 
> 
>

Reply via email to