Thanks for updating the KIP!

I also have some minor comment:



(1) We should rename `KBranchedStream` -> `BranchedKStream`

(Most classed follow this naming pattern now, eg, CoGroupedKStream,
TimeWindowedKStream etc -- there is just the "legacy" `KGroupedStream`
and `KGroupedKTable` that we cannot rename without a breaking change...
so we just keep them.)



(2) Quote:

> Both branch and defaultBranch operations also have overloaded parameterless 
> alternatives.

I think `branch()` always needs to take a `Predicate` and assume you
meant that `Branched` is optional. Can you maybe rephrase it accordingly
as `branch()` would not be "parameterless".



(3) Can you maybe add an overview in the "Public Interface" section) of
newly added and deprecated methods/classes (cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup#KIP-150-Kafka-StreamsCogroup-PublicInterfaces)



(4) What is unclear from the KIP is the interaction of `withConsumer()`
and the finally returned `Map<String, KStream>`. This related to John's
4th comment:

> It seems like there are really two disjoint use cases: EITHER using chain and 
> the result map OR using just the sink.

I don't think that using both `withChain()` and `withConsumer()` is the
issue though, as the KIP clearly states that the result of `withChain()`
will be given to the `Consumer`. The issue is really with the `Consumer`
and the returned `Map` of `defautBranch()` and `noDefaultBranch()`.

Maybe a reasonable implementation would be to not add the "branch" to
the result map if `withConsumer` is used? As long as we clearly document
it in the JavaDocs, this might be fine?



(5) Reply to John's comments:

> 3. Reading the KIP, I found ‘withConsumer’ confusing; I thought you were 
> talking about the kafka Consumer interface (which doesn’t make sense, of 
> course). I get that you were referring to the java Consumer interface, but we 
> should still probably to to avoid the ambiguity. Just throwing out a 
> suggestion, how about ‘withSink’?

IMHO, `withSink` has the issue that it might be confused with a "sink
node", ie., writing the KStream to a topic.

Maybe `withJavaConsumer` would make it less ambiguous?




-Matthias




On 5/8/20 7:13 AM, John Roesler wrote:
> Hi Ivan,
> 
> It looks like you missed my reply on Apr 23rd. I think it’s close, but I had 
> a few last comments. 
> 
> Thanks,
> John
> 
> On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote:
>> Hello everyone,
>>
>> will someone please take a look at the reworked KIP?
>>
>> I believe that now it follows design principles and takes into account 
>> all the arguments discussed here.
>>
>>
>> Regards,
>>
>> Ivan
>>
>>
>> 23.04.2020 2:45, Ivan Ponomarev пишет:
>>> Hi,
>>>
>>> I have read the John's "DSL design principles" and have completely 
>>> rewritten the KIP, see 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>  
>>>
>>>
>>>
>>> This version includes all the previous discussion results and follows 
>>> the design principles, with one exception.
>>>
>>> The exception is
>>>
>>> branch(Predicate<K,V> predicate, Branched<K,V> branched)
>>>
>>> which formally violates 'no more than one parameter' rule, but I think 
>>> here it is justified.
>>>
>>> We must provide a predicate for a branch and don't need to provide one 
>>> for the default branch. Thus for both operations we may use a single 
>>> Branched parameter class, with an extra method parameter for `branch`.
>>>
>>> Since predicate is a natural, necessary part of a branch, no 
>>> 'proliferation of overloads, deprecations, etc.' is expected here as it 
>>> is said in the rationale for the 'single parameter rule'.
>>>
>>> WDYT, is this KIP mature enough to begin voting?
>>>
>>> Regards,
>>>
>>> Ivan
>>>
>>> 21.04.2020 2:09, Matthias J. Sax пишет:
>>>> Ivan,
>>>>
>>>> no worries about getting side tracked. Glad to have you back!
>>>>
>>>> The DSL improved further in the meantime and we already have a `Named`
>>>> config object to name operators. It seems reasonable to me to build on 
>>>> this.
>>>>
>>>> Furthermore, John did a writeup about "DSL design principles" that we
>>>> want to follow:
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
>>>>  
>>>>
>>>> -- might be worth to checkout.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
>>>>> Hi everyone!
>>>>>
>>>>> Let me revive the discussion of this KIP.
>>>>>
>>>>> I'm very sorry for stopping my participation in the discussion in June
>>>>> 2019. My project work was very intensive then and it didn't leave me
>>>>> spare time. But I think I must finish this, because we invested
>>>>> substantial effort into this discussion and I'm not feel entitled to
>>>>> propose other things before this one is finalized.
>>>>>
>>>>> During these months I proceeded with writing and reviewing Kafka
>>>>> Streams-related code. Every time I needed branching, Spring-Kafka's
>>>>> KafkaStreamBrancher class of my invention (the original idea for this
>>>>> KIP) worked for me -- that's another reason why I gave up pushing the
>>>>> KIP forward. When I was coming across the problem with the scope of
>>>>> branches, I worked around it this way:
>>>>>
>>>>> AtomicReference<KStream<...>> result = new AtomicReference<>();
>>>>> new KafkaStreamBrancher<....>()
>>>>>      .branch(....)
>>>>>      .defaultBranch(result::set)
>>>>>      .onTopOf(someStream);
>>>>> result.get()...
>>>>>
>>>>>
>>>>> And yes, of course I don't feel very happy with this approach.
>>>>>
>>>>> I think that Matthias came up with a bright solution in his post from
>>>>> May, 24th 2019. Let me quote it:
>>>>>
>>>>> KStream#split() -> KBranchedStream
>>>>> // branch is not easily accessible in current scope
>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
>>>>>    -> KBranchedStream
>>>>> // assign a name to the branch and
>>>>> // return the sub-stream to the current scope later
>>>>> //
>>>>> // can be simple as `#branch(p, s->s, "name")`
>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>>>>>    -> KBranchedStream
>>>>> // default branch is not easily accessible
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#default(Cosumer<KStream>)
>>>>>    -> Map<String,KStream>
>>>>> // assign custom name to default-branch
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#default(Function<KStream,KStream>, String)
>>>>>    -> Map<String,KStream>
>>>>> // assign a default name for default
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>>>>>    -> Map<String,KStream>
>>>>> // return map of all names sub-stream into current scope
>>>>> KBranchedStream#noDefaultBranch()
>>>>>    -> Map<String,KStream>
>>>>>
>>>>> I believe this would satisfy everyone. Optional names seems to be a good
>>>>> idea: when you don't need to have the branches in the same scope, you
>>>>> just don't use names and you don't risk making your code brittle. Or,
>>>>> you might want to add names just for debugging purposes. Or, finally,
>>>>> you might use the returned Map to have the named branches in the
>>>>> original scope.
>>>>>
>>>>> There also was an input from John Roesler on June 4th, 2019, who
>>>>> suggested using Named class. I can't comment on this. The idea seems
>>>>> reasonable, but in this matter I'd rather trust people who are more
>>>>> familiar with Streams API design principles than me.
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ivan
>>>>>
>>>>>
>>>>>
>>>>> 08.10.2019 1:38, Matthias J. Sax пишет:
>>>>>> I am moving this KIP into "inactive status". Feel free to resume the 
>>>>>> KIP
>>>>>> at any point.
>>>>>>
>>>>>> If anybody else is interested in picking up this KIP, feel free to 
>>>>>> do so.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 7/11/19 4:00 PM, Matthias J. Sax wrote:
>>>>>>> Ivan,
>>>>>>>
>>>>>>> did you see my last reply? What do you think about my proposal to mix
>>>>>>> both approaches and try to get best-of-both worlds?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
>>>>>>>> Thanks for the input John!
>>>>>>>>
>>>>>>>>> under your suggestion, it seems that the name is required
>>>>>>>>
>>>>>>>> If you want to get the `KStream` as part of the `Map` back using a
>>>>>>>> `Function`, yes. If you follow the "embedded chaining" pattern 
>>>>>>>> using a
>>>>>>>> `Consumer`, no.
>>>>>>>>
>>>>>>>> Allowing for a default name via `split()` can of course be done.
>>>>>>>> Similarly, using `Named` instead of `String` is possible.
>>>>>>>>
>>>>>>>> I wanted to sketch out a high level proposal to merge both patterns
>>>>>>>> only. Your suggestions to align the new API with the existing API 
>>>>>>>> make
>>>>>>>> totally sense.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> One follow up question: Would `Named` be optional or required in
>>>>>>>> `split()` and `branch()`? It's unclear from your example.
>>>>>>>>
>>>>>>>> If both are mandatory, what do we gain by it? The returned `Map` only
>>>>>>>> contains the corresponding branches, so why should we prefix all of
>>>>>>>> them? If only `Named` is mandatory in `branch()`, but optional in
>>>>>>>> `split()`, the same question raises?
>>>>>>>>
>>>>>>>> Requiring `Named` in `split()` seems only to make sense, if 
>>>>>>>> `Named` is
>>>>>>>> optional in `branch()` and we generate `-X` suffix using a counter 
>>>>>>>> for
>>>>>>>> different branch name. However, this might lead to the problem of
>>>>>>>> changing names if branches are added/removed. Also, how would the 
>>>>>>>> names
>>>>>>>> be generated if `Consumer` is mixed in (ie, not all branches are
>>>>>>>> returned in the `Map`).
>>>>>>>>
>>>>>>>> If `Named` is optional for both, it could happen that a user 
>>>>>>>> misses to
>>>>>>>> specify a name for a branch what would lead to runtime issues.
>>>>>>>>
>>>>>>>>
>>>>>>>> Hence, I am actually in favor to not allow a default name but keep
>>>>>>>> `split()` without parameter and make `Named` in `branch()` required
>>>>>>>> if a
>>>>>>>> `Function` is used. This makes it explicit to the user that
>>>>>>>> specifying a
>>>>>>>> name is required if a `Function` is used.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> About
>>>>>>>>
>>>>>>>>> KBranchedStream#branch(BranchConfig)
>>>>>>>>
>>>>>>>> I don't think that the branching predicate is a configuration and 
>>>>>>>> hence
>>>>>>>> would not include it in a configuration object.
>>>>>>>>
>>>>>>>>>       withChain(...);
>>>>>>>>
>>>>>>>> Similar, `withChain()` (that would only take a `Consumer`?) does not
>>>>>>>> seem to be a configuration. We can also not prevent a user to call
>>>>>>>> `withName()` in combination of `withChain()` what does not make sense
>>>>>>>> IMHO. We could of course throw an RTE but not have a compile time 
>>>>>>>> check
>>>>>>>> seems less appealing. Also, it could happen that neither 
>>>>>>>> `withChain()`
>>>>>>>> not `withName()` is called and the branch is missing in the returned
>>>>>>>> `Map` what lead to runtime issues, too.
>>>>>>>>
>>>>>>>> Hence, I don't think that we should add `BranchConfig`. A config 
>>>>>>>> object
>>>>>>>> is helpful if each configuration can be set independently of all
>>>>>>>> others,
>>>>>>>> but this seems not to be the case here. If we add new configuration
>>>>>>>> later, we can also just move forward by deprecating the methods that
>>>>>>>> accept `Named` and add new methods that accepted `BranchConfig` (that
>>>>>>>> would of course implement `Named`).
>>>>>>>>
>>>>>>>>
>>>>>>>> Thoughts?
>>>>>>>>
>>>>>>>>
>>>>>>>> @Ivan, what do you think about the general idea to blend the two main
>>>>>>>> approaches of returning a `Map` plus an "embedded chaining"?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 6/4/19 10:33 AM, John Roesler wrote:
>>>>>>>>> Thanks for the idea, Matthias, it does seem like this would satisfy
>>>>>>>>> everyone. Returning the map from the terminal operations also solves
>>>>>>>>> the problem of merging/joining the branched streams, if we want 
>>>>>>>>> to add
>>>>>>>>> support for the compliment later on.
>>>>>>>>>
>>>>>>>>> Under your suggestion, it seems that the name is required. 
>>>>>>>>> Otherwise,
>>>>>>>>> we wouldn't have keys for the map to return. I this this is actually
>>>>>>>>> not too bad, since experience has taught us that, although names for
>>>>>>>>> operations are not required to define stream processing logic, it 
>>>>>>>>> does
>>>>>>>>> significantly improve the operational experience when you can map 
>>>>>>>>> the
>>>>>>>>> topology, logs, metrics, etc. back to the source code. Since you
>>>>>>>>> wouldn't (have to) reference the name to chain extra processing onto
>>>>>>>>> the branch (thanks to the second argument), you can avoid the
>>>>>>>>> "unchecked name" problem that Ivan pointed out.
>>>>>>>>>
>>>>>>>>> In the current implementation of Branch, you can name the branch
>>>>>>>>> operator itself, and then all the branches get index-suffixed names
>>>>>>>>> built from the branch operator name. I guess under this proposal, we
>>>>>>>>> could naturally append the branch name to the branching operator 
>>>>>>>>> name,
>>>>>>>>> like this:
>>>>>>>>>
>>>>>>>>>      stream.split(Named.withName("mysplit")) //creates node 
>>>>>>>>> "mysplit"
>>>>>>>>>                 .branch(..., ..., "abranch") // creates node
>>>>>>>>> "mysplit-abranch"
>>>>>>>>>                 .defaultBranch(...) // creates node 
>>>>>>>>> "mysplit-default"
>>>>>>>>>
>>>>>>>>> It does make me wonder about the DSL syntax itself, though.
>>>>>>>>>
>>>>>>>>> We don't have a defined grammar, so there's plenty of room to debate
>>>>>>>>> the "best" syntax in the context of each operation, but in general,
>>>>>>>>> the KStream DSL operators follow this pattern:
>>>>>>>>>
>>>>>>>>>       operator(function, config_object?) OR operator(config_object)
>>>>>>>>>
>>>>>>>>> where config_object is often just Named in the "function" variant.
>>>>>>>>> Even when the config_object isn't a Named, but some other config
>>>>>>>>> class, that config class _always_ implements NamedOperation.
>>>>>>>>>
>>>>>>>>> Here, we're introducing a totally different pattern:
>>>>>>>>>
>>>>>>>>>     operator(function, function, string)
>>>>>>>>>
>>>>>>>>> where the string is the name.
>>>>>>>>> My first question is whether the name should instead be specified 
>>>>>>>>> with
>>>>>>>>> the NamedOperation interface.
>>>>>>>>>
>>>>>>>>> My second question is whether we should just roll all these 
>>>>>>>>> arguments
>>>>>>>>> up into a config object like:
>>>>>>>>>
>>>>>>>>>      KBranchedStream#branch(BranchConfig)
>>>>>>>>>
>>>>>>>>>      interface BranchConfig extends NamedOperation {
>>>>>>>>>       withPredicate(...);
>>>>>>>>>       withChain(...);
>>>>>>>>>       withName(...);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> Although I guess we'd like to call BranchConfig something more like
>>>>>>>>> "Branched", even if I don't particularly like that pattern.
>>>>>>>>>
>>>>>>>>> This makes the source code a little noisier, but it also makes us 
>>>>>>>>> more
>>>>>>>>> future-proof, as we can deal with a wide range of alternatives 
>>>>>>>>> purely
>>>>>>>>> in the config interface, and never have to deal with adding 
>>>>>>>>> overloads
>>>>>>>>> to the KBranchedStream if/when we decide we want the name to be
>>>>>>>>> optional, or the KStream->KStream to be optional.
>>>>>>>>>
>>>>>>>>> WDYT?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> -John
>>>>>>>>>
>>>>>>>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
>>>>>>>>> <michael.droga...@confluent.io> wrote:
>>>>>>>>>>
>>>>>>>>>> Matthias: I think that's pretty reasonable from my point of view.
>>>>>>>>>> Good
>>>>>>>>>> suggestion.
>>>>>>>>>>
>>>>>>>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax
>>>>>>>>>> <matth...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Interesting discussion.
>>>>>>>>>>>
>>>>>>>>>>> I am wondering, if we cannot unify the advantage of both 
>>>>>>>>>>> approaches:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> KStream#split() -> KBranchedStream
>>>>>>>>>>>
>>>>>>>>>>> // branch is not easily accessible in current scope
>>>>>>>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
>>>>>>>>>>>     -> KBranchedStream
>>>>>>>>>>>
>>>>>>>>>>> // assign a name to the branch and
>>>>>>>>>>> // return the sub-stream to the current scope later
>>>>>>>>>>> //
>>>>>>>>>>> // can be simple as `#branch(p, s->s, "name")`
>>>>>>>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
>>>>>>>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, 
>>>>>>>>>>> String)
>>>>>>>>>>>     -> KBranchedStream
>>>>>>>>>>>
>>>>>>>>>>> // default branch is not easily accessible
>>>>>>>>>>> // return map of all named sub-stream into current scope
>>>>>>>>>>> KBranchedStream#default(Cosumer<KStream>)
>>>>>>>>>>>     -> Map<String,KStream>
>>>>>>>>>>>
>>>>>>>>>>> // assign custom name to default-branch
>>>>>>>>>>> // return map of all named sub-stream into current scope
>>>>>>>>>>> KBranchedStream#default(Function<KStream,KStream>, String)
>>>>>>>>>>>     -> Map<String,KStream>
>>>>>>>>>>>
>>>>>>>>>>> // assign a default name for default
>>>>>>>>>>> // return map of all named sub-stream into current scope
>>>>>>>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>>>>>>>>>>>     -> Map<String,KStream>
>>>>>>>>>>>
>>>>>>>>>>> // return map of all names sub-stream into current scope
>>>>>>>>>>> KBranchedStream#noDefaultBranch()
>>>>>>>>>>>     -> Map<String,KStream>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hence, for each sub-stream, the user can pick to add a name and
>>>>>>>>>>> return
>>>>>>>>>>> the branch "result" to the calling scope or not. The
>>>>>>>>>>> implementation can
>>>>>>>>>>> also check at runtime that all returned names are unique. The
>>>>>>>>>>> returned
>>>>>>>>>>> Map can be empty and it's also optional to use the Map.
>>>>>>>>>>>
>>>>>>>>>>> To me, it seems like a good way to get best of both worlds.
>>>>>>>>>>>
>>>>>>>>>>> Thoughts?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 5/6/19 5:15 PM, John Roesler wrote:
>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>
>>>>>>>>>>>> That's a very good point about the "start" operator in the
>>>>>>>>>>>> dynamic case.
>>>>>>>>>>>> I had no problem with "split()"; I was just questioning the
>>>>>>>>>>>> necessity.
>>>>>>>>>>>> Since you've provided a proof of necessity, I'm in favor of the
>>>>>>>>>>>> "split()" start operator. Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> Separately, I'm interested to see where the present discussion
>>>>>>>>>>>> leads.
>>>>>>>>>>>> I've written enough Javascript code in my life to be 
>>>>>>>>>>>> suspicious of
>>>>>>>>>>>> nested closures. You have a good point about using method
>>>>>>>>>>>> references (or
>>>>>>>>>>>> indeed function literals also work). It should be validating
>>>>>>>>>>>> that this
>>>>>>>>>>>> was also the JS community's first approach to flattening the
>>>>>>>>>>>> logic when
>>>>>>>>>>>> their nested closure situation got out of hand. Unfortunately, 
>>>>>>>>>>>> it's
>>>>>>>>>>>> replacing nesting with redirection, both of which disrupt code
>>>>>>>>>>>> readability (but in different ways for different reasons). In 
>>>>>>>>>>>> other
>>>>>>>>>>>> words, I agree that function references is *the* first-order
>>>>>>>>>>>> solution if
>>>>>>>>>>>> the nested code does indeed become a problem.
>>>>>>>>>>>>
>>>>>>>>>>>> However, the history of JS also tells us that function
>>>>>>>>>>>> references aren't
>>>>>>>>>>>> the end of the story either, and you can see that by observing 
>>>>>>>>>>>> that
>>>>>>>>>>>> there have been two follow-on eras, as they continue trying to
>>>>>>>>>>>> cope with
>>>>>>>>>>>> the consequences of living in such a callback-heavy language.
>>>>>>>>>>>> First, you
>>>>>>>>>>>> have Futures/Promises, which essentially let you convert nested
>>>>>>>>>>>> code to
>>>>>>>>>>>> method-chained code (Observables/FP is a popular variation on
>>>>>>>>>>>> this).
>>>>>>>>>>>> Most lately, you have async/await, which is an effort to apply
>>>>>>>>>>>> language
>>>>>>>>>>>> (not just API) syntax to the problem, and offer the "flattest"
>>>>>>>>>>>> possible
>>>>>>>>>>>> programming style to solve the problem (because you get back to
>>>>>>>>>>>> just one
>>>>>>>>>>>> code block per functional unit).
>>>>>>>>>>>>
>>>>>>>>>>>> Stream-processing is a different domain, and Java+KStreams is
>>>>>>>>>>>> nowhere
>>>>>>>>>>>> near as callback heavy as JS, so I don't think we have to take
>>>>>>>>>>>> the JS
>>>>>>>>>>>> story for granted, but then again, I think we can derive some
>>>>>>>>>>>> valuable
>>>>>>>>>>>> lessons by looking sideways to adjacent domains. I'm just
>>>>>>>>>>>> bringing this
>>>>>>>>>>>> up to inspire further/deeper discussion. At the same time, just
>>>>>>>>>>>> like JS,
>>>>>>>>>>>> we can afford to take an iterative approach to the problem.
>>>>>>>>>>>>
>>>>>>>>>>>> Separately again, I'm interested in the post-branch merge (and
>>>>>>>>>>>> I'd also
>>>>>>>>>>>> add join) problem that Paul brought up. We can clearly punt on
>>>>>>>>>>>> it, by
>>>>>>>>>>>> terminating the nested branches with sink operators. But is
>>>>>>>>>>>> there a DSL
>>>>>>>>>>>> way to do it?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks again for your driving this,
>>>>>>>>>>>> -John
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com
>>>>>>>>>>>> <mailto:pgwha...@gmail.com>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>       Ivan, I’ll definitely forfeit my point on the clumsiness of
>>>>>>>>>>>> the
>>>>>>>>>>>>       branch(predicate, consumer) solution, I don’t see any real
>>>>>>>>>>>> drawbacks
>>>>>>>>>>>>       for the dynamic case.
>>>>>>>>>>>>
>>>>>>>>>>>>       IMO the one trade off to consider at this point is the 
>>>>>>>>>>>> scope
>>>>>>>>>>>>       question. I don’t know if I totally agree that “we rarely
>>>>>>>>>>>> need them
>>>>>>>>>>>>       in the same scope” since merging the branches back together
>>>>>>>>>>>> later
>>>>>>>>>>>>       seems like a perfectly plausible use case that can be a lot
>>>>>>>>>>>> nicer
>>>>>>>>>>>>       when the branched streams are in the same scope. That being
>>>>>>>>>>>> said,
>>>>>>>>>>>>       for the reasons Ivan listed, I think it is overall the 
>>>>>>>>>>>> better
>>>>>>>>>>>>       solution - working around the scope thing is easy enough if
>>>>>>>>>>>> you need
>>>>>>>>>>>>       to.
>>>>>>>>>>>>
>>>>>>>>>>>>       > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
>>>>>>>>>>>>       <iponoma...@mail.ru.invalid> wrote:
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > Hello everyone, thank you all for joining the discussion!
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > Well, I don't think the idea of named branches, be it a
>>>>>>>>>>>>       LinkedHashMap (no other Map will do, because order of
>>>>>>>>>>>> definition
>>>>>>>>>>>>       matters) or `branch` method  taking name and Consumer 
>>>>>>>>>>>> has more
>>>>>>>>>>>>       advantages than drawbacks.
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > In my opinion, the only real positive outcome from 
>>>>>>>>>>>> Michael's
>>>>>>>>>>>>       proposal is that all the returned branches are in the same
>>>>>>>>>>>> scope.
>>>>>>>>>>>>       But 1) we rarely need them in the same scope 2) there is a
>>>>>>>>>>>>       workaround for the scope problem, described in the KIP.
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > 'Inlining the complex logic' is not a problem, because we
>>>>>>>>>>>> can use
>>>>>>>>>>>>       method references instead of lambdas. In real world
>>>>>>>>>>>> scenarios you
>>>>>>>>>>>>       tend to split the complex logic to methods anyway, so the
>>>>>>>>>>>> code is
>>>>>>>>>>>>       going to be clean.
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > The drawbacks are strong. The cohesion between predicates
>>>>>>>>>>>> and
>>>>>>>>>>>>       handlers is lost. We have to define predicates in one
>>>>>>>>>>>> place, and
>>>>>>>>>>>>       handlers in another. This opens the door for bugs:
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > - what if we forget to define a handler for a name? or a
>>>>>>>>>>>> name for
>>>>>>>>>>>>       a handler?
>>>>>>>>>>>>       > - what if we misspell a name?
>>>>>>>>>>>>       > - what if we copy-paste and duplicate a name?
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > What Michael propose would have been totally OK if we had
>>>>>>>>>>>> been
>>>>>>>>>>>>       writing the API in Lua, Ruby or Python. In those 
>>>>>>>>>>>> languages the
>>>>>>>>>>>>       "dynamic naming" approach would have looked most concise 
>>>>>>>>>>>> and
>>>>>>>>>>>>       beautiful. But in Java we expect all the problems 
>>>>>>>>>>>> related to
>>>>>>>>>>>>       identifiers to be eliminated in compile time.
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > Do we have to invent duck-typing for the Java API?
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > And if we do, what advantage are we supposed to get
>>>>>>>>>>>> besides having
>>>>>>>>>>>>       all the branches in the same scope? Michael, maybe I'm
>>>>>>>>>>>> missing your
>>>>>>>>>>>>       point?
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > ---
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > Earlier in this discussion John Roesler also proposed 
>>>>>>>>>>>> to do
>>>>>>>>>>>>       without "start branching" operator, and later Paul
>>>>>>>>>>>> mentioned that in
>>>>>>>>>>>>       the case when we have to add a dynamic number of 
>>>>>>>>>>>> branches, the
>>>>>>>>>>>>       current KIP is 'clumsier' compared to Michael's 'Map'
>>>>>>>>>>>> solution. Let
>>>>>>>>>>>>       me address both comments here.
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > 1) "Start branching" operator (I think that *split* is a
>>>>>>>>>>>> good name
>>>>>>>>>>>>       for it indeed) is critical when we need to do a dynamic
>>>>>>>>>>>> branching,
>>>>>>>>>>>>       see example below.
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > 2) No, dynamic branching in current KIP is not clumsy at
>>>>>>>>>>>> all.
>>>>>>>>>>>>       Imagine a real-world scenario when you need one branch per
>>>>>>>>>>>> enum
>>>>>>>>>>>>       value (say, RecordType). You can have something like this:
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > /*John:if we had to start with stream.branch(...) here,
>>>>>>>>>>>> it would
>>>>>>>>>>>>       have been much messier.*/
>>>>>>>>>>>>       > KBranchedStream branched = stream.split();
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > /*Not clumsy at all :-)*/
>>>>>>>>>>>>       > for (RecordType recordType : RecordType.values())
>>>>>>>>>>>>       >             branched = branched.branch((k, v) ->
>>>>>>>>>>>> v.getRecType() ==
>>>>>>>>>>>>       recordType,
>>>>>>>>>>>>       >                     recordType::processRecords);
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > Regards,
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > Ivan
>>>>>>>>>>>>       >
>>>>>>>>>>>>       >
>>>>>>>>>>>>       > 02.05.2019 14:40, Matthias J. Sax пишет:
>>>>>>>>>>>>       >> I also agree with Michael's observation about the core
>>>>>>>>>>>> problem of
>>>>>>>>>>>>       >> current `branch()` implementation.
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >> However, I also don't like to pass in a clumsy Map
>>>>>>>>>>>> object. My
>>>>>>>>>>>>       thinking
>>>>>>>>>>>>       >> was more aligned with Paul's proposal to just add a name
>>>>>>>>>>>> to each
>>>>>>>>>>>>       >> `branch()` statement and return a `Map<String,KStream>`.
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >> It makes the code easier to read, and also make the
>>>>>>>>>>>> order of
>>>>>>>>>>>>       >> `Predicates` (that is essential) easier to grasp.
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>>>>>>>>       >>>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>>>>>>>>       >>>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>>>>>>>>       >>>>>>    .defaultBranch("defaultBranch");
>>>>>>>>>>>>       >> An open question is the case for which no
>>>>>>>>>>>> defaultBranch() should
>>>>>>>>>>> be
>>>>>>>>>>>>       >> specified. Atm, `split()` and `branch()` would return
>>>>>>>>>>>>       `BranchedKStream`
>>>>>>>>>>>>       >> and the call to `defaultBranch()` that returns the 
>>>>>>>>>>>> `Map` is
>>>>>>>>>>> mandatory
>>>>>>>>>>>>       >> (what is not the case atm). Or is this actually not a 
>>>>>>>>>>>> real
>>>>>>>>>>> problem,
>>>>>>>>>>>>       >> because users can just ignore the branch returned by
>>>>>>>>>>>>       `defaultBranch()`
>>>>>>>>>>>>       >> in the result `Map` ?
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >> About "inlining": So far, it seems to be a matter of
>>>>>>>>>>>> personal
>>>>>>>>>>>>       >> preference. I can see arguments for both, but no "killer
>>>>>>>>>>>>       argument" yet
>>>>>>>>>>>>       >> that clearly make the case for one or the other.
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >> -Matthias
>>>>>>>>>>>>       >>
>>>>>>>>>>>>       >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>>>>>>>>>>>       >>> Perhaps inlining is the wrong terminology. It doesn’t
>>>>>>>>>>>> require
>>>>>>>>>>>>       that a lambda with the full downstream topology be defined
>>>>>>>>>>>> inline -
>>>>>>>>>>>>       it can be a method reference as with Ivan’s original
>>>>>>>>>>>> suggestion.
>>>>>>>>>>>>       The advantage of putting the predicate and its downstream
>>>>>>>>>>>> logic
>>>>>>>>>>>>       (Consumer) together in branch() is that they are required
>>>>>>>>>>>> to be near
>>>>>>>>>>>>       to each other.
>>>>>>>>>>>>       >>>
>>>>>>>>>>>>       >>> Ultimately the downstream code has to live somewhere,
>>>>>>>>>>>> and deep
>>>>>>>>>>>>       branch trees will be hard to read regardless.
>>>>>>>>>>>>       >>>
>>>>>>>>>>>>       >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
>>>>>>>>>>>>       <michael.droga...@confluent.io
>>>>>>>>>>>>       <mailto:michael.droga...@confluent.io>> wrote:
>>>>>>>>>>>>       >>>>
>>>>>>>>>>>>       >>>> I'm less enthusiastic about inlining the branch logic
>>>>>>>>>>>> with its
>>>>>>>>>>>>       downstream
>>>>>>>>>>>>       >>>> functionality. Programs that have deep branch trees 
>>>>>>>>>>>> will
>>>>>>>>>>>>       quickly become
>>>>>>>>>>>>       >>>> harder to read as a single unit.
>>>>>>>>>>>>       >>>>
>>>>>>>>>>>>       >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
>>>>>>>>>>>>       <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote:
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Also +1 on the issues/goals as Michael outlined them,
>>>>>>>>>>>> I think
>>>>>>>>>>>>       that sets a
>>>>>>>>>>>>       >>>>> great framework for the discussion.
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Regarding the SortedMap solution, my understanding is
>>>>>>>>>>>> that the
>>>>>>>>>>>>       current
>>>>>>>>>>>>       >>>>> proposal in the KIP is what is in my PR which
>>>>>>>>>>>> (pending naming
>>>>>>>>>>>>       decisions) is
>>>>>>>>>>>>       >>>>> roughly this:
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> stream.split()
>>>>>>>>>>>>       >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>>>>>>>>       >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>>>>>>>>       >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Obviously some ordering is necessary, since branching
>>>>>>>>>>>> as a
>>>>>>>>>>>>       construct
>>>>>>>>>>>>       >>>>> doesn't work without it, but this solution seems 
>>>>>>>>>>>> like it
>>>>>>>>>>>>       provides as much
>>>>>>>>>>>>       >>>>> associativity as the SortedMap solution, because each
>>>>>>>>>>>> branch()
>>>>>>>>>>>>       call
>>>>>>>>>>>>       >>>>> directly associates the "conditional" with the "code
>>>>>>>>>>>> block."
>>>>>>>>>>>>       The value it
>>>>>>>>>>>>       >>>>> provides over the KIP solution is the accessing of
>>>>>>>>>>>> streams in
>>>>>>>>>>>>       the same
>>>>>>>>>>>>       >>>>> scope.
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> The KIP solution is less "dynamic" than the SortedMap
>>>>>>>>>>>> solution
>>>>>>>>>>>>       in the sense
>>>>>>>>>>>>       >>>>> that it is slightly clumsier to add a dynamic 
>>>>>>>>>>>> number of
>>>>>>>>>>>>       branches, but it is
>>>>>>>>>>>>       >>>>> certainly possible.  It seems to me like the API
>>>>>>>>>>>> should favor
>>>>>>>>>>>>       the "static"
>>>>>>>>>>>>       >>>>> case anyway, and should make it simple and 
>>>>>>>>>>>> readable to
>>>>>>>>>>>>       fluently declare and
>>>>>>>>>>>>       >>>>> access your branches in-line.  It also makes it
>>>>>>>>>>>> impossible to
>>>>>>>>>>>>       ignore a
>>>>>>>>>>>>       >>>>> branch, and it is possible to build an (almost)
>>>>>>>>>>>> identical
>>>>>>>>>>>>       SortedMap
>>>>>>>>>>>>       >>>>> solution on top of it.
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> I could also see a middle ground where instead of 
>>>>>>>>>>>> a raw
>>>>>>>>>>>>       SortedMap being
>>>>>>>>>>>>       >>>>> taken in, branch() takes a name and not a Consumer.
>>>>>>>>>>>> Something
>>>>>>>>>>>>       like this:
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>>>>>>>>       >>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>>>>>>>>       >>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>>>>>>>>       >>>>>    .defaultBranch("defaultBranch",
>>>>>>>>>>>> Consumer<KStream<K, V>>);
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Pros for that solution:
>>>>>>>>>>>>       >>>>> - accessing branched KStreams in same scope
>>>>>>>>>>>>       >>>>> - no double brace initialization, hopefully slightly
>>>>>>>>>>>> more
>>>>>>>>>>>>       readable than
>>>>>>>>>>>>       >>>>> SortedMap
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Cons
>>>>>>>>>>>>       >>>>> - downstream branch logic cannot be specified inline
>>>>>>>>>>>> which
>>>>>>>>>>>>       makes it harder
>>>>>>>>>>>>       >>>>> to read top to bottom (like existing API and
>>>>>>>>>>>> SortedMap, but
>>>>>>>>>>>>       unlike the KIP)
>>>>>>>>>>>>       >>>>> - you can forget to "handle" one of the branched
>>>>>>>>>>>> streams (like
>>>>>>>>>>>>       existing
>>>>>>>>>>>>       >>>>> API and SortedMap, but unlike the KIP)
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> (KBranchedStreams could even work *both* ways but
>>>>>>>>>>>> perhaps
>>>>>>>>>>>>       that's overdoing
>>>>>>>>>>>>       >>>>> it).
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Overall I'm curious how important it is to be able to
>>>>>>>>>>>> easily
>>>>>>>>>>>>       access the
>>>>>>>>>>>>       >>>>> branched KStream in the same scope as the original.
>>>>>>>>>>>> It's
>>>>>>>>>>>>       possible that it
>>>>>>>>>>>>       >>>>> doesn't need to be handled directly by the API, but
>>>>>>>>>>>> instead
>>>>>>>>>>>>       left up to the
>>>>>>>>>>>>       >>>>> user.  I'm sort of in the middle on it.
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> Paul
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
>>>>>>>>>>>>       <sop...@confluent.io <mailto:sop...@confluent.io>>
>>>>>>>>>>>>       >>>>> wrote:
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>       >>>>>> I'd like to +1 what Michael said about the issues
>>>>>>>>>>>> with the
>>>>>>>>>>>>       existing
>>>>>>>>>>>>       >>>>> branch
>>>>>>>>>>>>       >>>>>> method, I agree with what he's outlined and I think
>>>>>>>>>>>> we should
>>>>>>>>>>>>       proceed by
>>>>>>>>>>>>       >>>>>> trying to alleviate these problems. Specifically it
>>>>>>>>>>>> seems
>>>>>>>>>>>>       important to be
>>>>>>>>>>>>       >>>>>> able to cleanly access the individual branches (eg
>>>>>>>>>>>> by mapping
>>>>>>>>>>>>       >>>>>> name->stream), which I thought was the original
>>>>>>>>>>>> intention of
>>>>>>>>>>>>       this KIP.
>>>>>>>>>>>>       >>>>>>
>>>>>>>>>>>>       >>>>>> That said, I don't think we should so easily give in
>>>>>>>>>>>> to the
>>>>>>>>>>>>       double brace
>>>>>>>>>>>>       >>>>>> anti-pattern or force ours users into it if at all
>>>>>>>>>>>> possible to
>>>>>>>>>>>>       >>>>> avoid...just
>>>>>>>>>>>>       >>>>>> my two cents.
>>>>>>>>>>>>       >>>>>>
>>>>>>>>>>>>       >>>>>> Cheers,
>>>>>>>>>>>>       >>>>>> Sophie
>>>>>>>>>>>>       >>>>>>
>>>>>>>>>>>>       >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>>>>>>>>>>       >>>>>> michael.droga...@confluent.io
>>>>>>>>>>>>       <mailto:michael.droga...@confluent.io>> wrote:
>>>>>>>>>>>>       >>>>>>
>>>>>>>>>>>>       >>>>>>> I’d like to propose a different way of thinking
>>>>>>>>>>>> about this.
>>>>>>>>>>>>       To me,
>>>>>>>>>>>>       >>>>> there
>>>>>>>>>>>>       >>>>>>> are three problems with the existing branch 
>>>>>>>>>>>> signature:
>>>>>>>>>>>>       >>>>>>>
>>>>>>>>>>>>       >>>>>>> 1. If you use it the way most people do, Java
>>>>>>>>>>>> raises unsafe
>>>>>>>>>>> type
>>>>>>>>>>>>       >>>>>> warnings.
>>>>>>>>>>>>       >>>>>>> 2. The way in which you use the stream branches is
>>>>>>>>>>>>       positionally coupled
>>>>>>>>>>>>       >>>>>> to
>>>>>>>>>>>>       >>>>>>> the ordering of the conditionals.
>>>>>>>>>>>>       >>>>>>> 3. It is brittle to extend existing branch calls 
>>>>>>>>>>>> with
>>>>>>>>>>>>       additional code
>>>>>>>>>>>>       >>>>>>> paths.
>>>>>>>>>>>>       >>>>>>>
>>>>>>>>>>>>       >>>>>>> Using associative constructs instead of relying on
>>>>>>>>>>>> ordered
>>>>>>>>>>>>       constructs
>>>>>>>>>>>>       >>>>>> would
>>>>>>>>>>>>       >>>>>>> be a stronger approach. Consider a signature that
>>>>>>>>>>>> instead
>>>>>>>>>>>>       looks like
>>>>>>>>>>>>       >>>>>> this:
>>>>>>>>>>>>       >>>>>>> Map<String, KStream<K,V>>
>>>>>>>>>>>> KStream#branch(SortedMap<String,
>>>>>>>>>>>>       Predicate<?
>>>>>>>>>>>>       >>>>>>> super K,? super V>>);
>>>>>>>>>>>>       >>>>>>>
>>>>>>>>>>>>       >>>>>>> Branches are given names in a map, and as a result,
>>>>>>>>>>>> the API
>>>>>>>>>>>>       returns a
>>>>>>>>>>>>       >>>>>>> mapping of names to streams. The ordering of the
>>>>>>>>>>> conditionals is
>>>>>>>>>>>>       >>>>>> maintained
>>>>>>>>>>>>       >>>>>>> because it’s a sorted map. Insert order determines
>>>>>>>>>>>> the order
>>>>>>>>>>> of
>>>>>>>>>>>>       >>>>>> evaluation.
>>>>>>>>>>>>       >>>>>>> This solves problem 1 because there are no more
>>>>>>>>>>>> varargs. It
>>>>>>>>>>>>       solves
>>>>>>>>>>>>       >>>>>> problem
>>>>>>>>>>>>       >>>>>>> 2 because you no longer lean on ordering to 
>>>>>>>>>>>> access the
>>>>>>>>>>>>       branch you’re
>>>>>>>>>>>>       >>>>>>> interested in. It solves problem 3 because you can
>>>>>>>>>>>> introduce
>>>>>>>>>>>>       another
>>>>>>>>>>>>       >>>>>>> conditional by simply attaching another name to the
>>>>>>>>>>>>       structure, rather
>>>>>>>>>>>>       >>>>>> than
>>>>>>>>>>>>       >>>>>>> messing with the existing indices.
>>>>>>>>>>>>       >>>>>>>
>>>>>>>>>>>>       >>>>>>> One of the drawbacks is that creating the map
>>>>>>>>>>>> inline is
>>>>>>>>>>>>       historically
>>>>>>>>>>>>       >>>>>>> awkward in Java. I know it’s an anti-pattern to use
>>>>>>>>>>>>       voluminously, but
>>>>>>>>>>>>       >>>>>>> double brace initialization would clean up the
>>>>>>>>>>>> aesthetics.
>>>>>>>>>>>>       >>>>>>>
>>>>>>>>>>>>       >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
>>>>>>>>>>>>       <j...@confluent.io <mailto:j...@confluent.io>>
>>>>>>>>>>>>       >>>>> wrote:
>>>>>>>>>>>>       >>>>>>>> Hi Ivan,
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>> Thanks for the update.
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>> FWIW, I agree with Matthias that the current 
>>>>>>>>>>>> "start
>>>>>>>>>>> branching"
>>>>>>>>>>>>       >>>>> operator
>>>>>>>>>>>>       >>>>>>> is
>>>>>>>>>>>>       >>>>>>>> confusing when named the same way as the actual
>>>>>>>>>>>> branches.
>>>>>>>>>>>>       "Split"
>>>>>>>>>>>>       >>>>> seems
>>>>>>>>>>>>       >>>>>>>> like a good name. Alternatively, we can do without
>>>>>>>>>>>> a "start
>>>>>>>>>>>>       >>>>> branching"
>>>>>>>>>>>>       >>>>>>>> operator at all, and just do:
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>> stream
>>>>>>>>>>>>       >>>>>>>>      .branch(Predicate)
>>>>>>>>>>>>       >>>>>>>>      .branch(Predicate)
>>>>>>>>>>>>       >>>>>>>>      .defaultBranch();
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>> Tentatively, I think that this branching operation
>>>>>>>>>>>> should be
>>>>>>>>>>>>       >>>>> terminal.
>>>>>>>>>>>>       >>>>>>> That
>>>>>>>>>>>>       >>>>>>>> way, we don't create ambiguity about how to use
>>>>>>>>>>>> it. That
>>>>>>>>>>>>       is, `branch`
>>>>>>>>>>>>       >>>>>>>> should return `KBranchedStream`, while
>>>>>>>>>>>> `defaultBranch` is
>>>>>>>>>>>>       `void`, to
>>>>>>>>>>>>       >>>>>>>> enforce that it comes last, and that there is only
>>>>>>>>>>>> one
>>>>>>>>>>>>       definition of
>>>>>>>>>>>>       >>>>>> the
>>>>>>>>>>>>       >>>>>>>> default branch. Potentially, we should log a
>>>>>>>>>>>> warning if
>>>>>>>>>>>>       there's no
>>>>>>>>>>>>       >>>>>>> default,
>>>>>>>>>>>>       >>>>>>>> and additionally log a warning (or throw an
>>>>>>>>>>>> exception) if a
>>>>>>>>>>>>       record
>>>>>>>>>>>>       >>>>>> falls
>>>>>>>>>>>>       >>>>>>>> though with no default.
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>> Thoughts?
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>> Thanks,
>>>>>>>>>>>>       >>>>>>>> -John
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>>>>>>>>>>>       >>>>> matth...@confluent.io <mailto:matth...@confluent.io>
>>>>>>>>>>>>       >>>>>>>> wrote:
>>>>>>>>>>>>       >>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> Thanks for updating the KIP and your answers.
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> this is to make the name similar to String#split
>>>>>>>>>>>>       >>>>>>>>>>> that also returns an array, right?
>>>>>>>>>>>>       >>>>>>>>> The intend was to avoid name duplication. The
>>>>>>>>>>>> return type
>>>>>>>>>>>>       should
>>>>>>>>>>>>       >>>>>> _not_
>>>>>>>>>>>>       >>>>>>>>> be an array.
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> The current proposal is
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> stream.branch()
>>>>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
>>>>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
>>>>>>>>>>>>       >>>>>>>>>      .defaultBranch();
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> IMHO, this reads a little odd, because the first
>>>>>>>>>>>>       `branch()` does
>>>>>>>>>>>>       >>>>> not
>>>>>>>>>>>>       >>>>>>>>> take any parameters and has different semantics
>>>>>>>>>>>> than the
>>>>>>>>>>> later
>>>>>>>>>>>>       >>>>>>>>> `branch()` calls. Note, that from the code
>>>>>>>>>>>> snippet above,
>>>>>>>>>>> it's
>>>>>>>>>>>>       >>>>> hidden
>>>>>>>>>>>>       >>>>>>>>> that the first call is `KStream#branch()` while
>>>>>>>>>>>> the others
>>>>>>>>>>> are
>>>>>>>>>>>>       >>>>>>>>> `KBranchedStream#branch()` what makes reading the
>>>>>>>>>>>> code
>>>>>>>>>>> harder.
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> Because I suggested to rename `addBranch()` ->
>>>>>>>>>>>> `branch()`,
>>>>>>>>>>>>       I though
>>>>>>>>>>>>       >>>>>> it
>>>>>>>>>>>>       >>>>>>>>> might be better to also rename `KStream#branch()`
>>>>>>>>>>>> to avoid
>>>>>>>>>>> the
>>>>>>>>>>>>       >>>>> naming
>>>>>>>>>>>>       >>>>>>>>> overlap that seems to be confusing. The following
>>>>>>>>>>>> reads
>>>>>>>>>>> much
>>>>>>>>>>>>       >>>>> cleaner
>>>>>>>>>>>>       >>>>>> to
>>>>>>>>>>>>       >>>>>>>> me:
>>>>>>>>>>>>       >>>>>>>>> stream.split()
>>>>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
>>>>>>>>>>>>       >>>>>>>>>      .branch(Predicate)
>>>>>>>>>>>>       >>>>>>>>>      .defaultBranch();
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> Maybe there is a better alternative to `split()`
>>>>>>>>>>>> though to
>>>>>>>>>>>>       avoid
>>>>>>>>>>>>       >>>>> the
>>>>>>>>>>>>       >>>>>>>>> naming overlap.
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> 'default' is, however, a reserved word, so
>>>>>>>>>>>> unfortunately
>>>>>>>>>>> we
>>>>>>>>>>>>       >>>>> cannot
>>>>>>>>>>>>       >>>>>>> have
>>>>>>>>>>>>       >>>>>>>>> a method with such name :-)
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> Bummer. Didn't consider this. Maybe we can still
>>>>>>>>>>>> come up
>>>>>>>>>>>>       with a
>>>>>>>>>>>>       >>>>> short
>>>>>>>>>>>>       >>>>>>>> name?
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> Can you add the interface `KBranchedStream` to
>>>>>>>>>>>> the KIP
>>>>>>>>>>>>       with all
>>>>>>>>>>>>       >>>>> it's
>>>>>>>>>>>>       >>>>>>>>> methods? It will be part of public API and 
>>>>>>>>>>>> should be
>>>>>>>>>>>>       contained in
>>>>>>>>>>>>       >>>>> the
>>>>>>>>>>>>       >>>>>>>>> KIP. For example, it's unclear atm, what the
>>>>>>>>>>>> return type of
>>>>>>>>>>>>       >>>>>>>>> `defaultBranch()` is.
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> You did not comment on the idea to add a
>>>>>>>>>>>>       `KBranchedStream#get(int
>>>>>>>>>>>>       >>>>>>> index)
>>>>>>>>>>>>       >>>>>>>>> -> KStream` method to get the individually
>>>>>>>>>>>>       branched-KStreams. Would
>>>>>>>>>>>>       >>>>>> be
>>>>>>>>>>>>       >>>>>>>>> nice to get your feedback about it. It seems you
>>>>>>>>>>>> suggest
>>>>>>>>>>>>       that users
>>>>>>>>>>>>       >>>>>>>>> would need to write custom utility code
>>>>>>>>>>>> otherwise, to
>>>>>>>>>>>>       access them.
>>>>>>>>>>>>       >>>>> We
>>>>>>>>>>>>       >>>>>>>>> should discuss the pros and cons of both
>>>>>>>>>>>> approaches. It
>>>>>>>>>>> feels
>>>>>>>>>>>>       >>>>>>>>> "incomplete" to me atm, if the API has no
>>>>>>>>>>>> built-in support
>>>>>>>>>>>>       to get
>>>>>>>>>>>>       >>>>> the
>>>>>>>>>>>>       >>>>>>>>> branched-KStreams directly.
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>> -Matthias
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>>>>>>>>>       >>>>>>>>>> Hi all!
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> I have updated the KIP-418 according to the new
>>>>>>>>>>>> vision.
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> Matthias, thanks for your comment!
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>>>>>>>>>       >>>>>>>>>> I can see your point: this is to make the name
>>>>>>>>>>>> similar to
>>>>>>>>>>>>       >>>>>>> String#split
>>>>>>>>>>>>       >>>>>>>>>> that also returns an array, right? But is it
>>>>>>>>>>>> worth the
>>>>>>>>>>>>       loss of
>>>>>>>>>>>>       >>>>>>>> backwards
>>>>>>>>>>>>       >>>>>>>>>> compatibility? We can have overloaded branch()
>>>>>>>>>>>> as well
>>>>>>>>>>>>       without
>>>>>>>>>>>>       >>>>>>>> affecting
>>>>>>>>>>>>       >>>>>>>>>> the existing code. Maybe the old array-based
>>>>>>>>>>>> `branch`
>>>>>>>>>>> method
>>>>>>>>>>>>       >>>>> should
>>>>>>>>>>>>       >>>>>>> be
>>>>>>>>>>>>       >>>>>>>>>> deprecated, but this is a subject for 
>>>>>>>>>>>> discussion.
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>>>>>>>>>>>       >>>>> BranchingKStream#branch(),
>>>>>>>>>>>>       >>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>>>>>>>> BranchingKStream#default()
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> Totally agree with 'addBranch->branch' rename.
>>>>>>>>>>>> 'default'
>>>>>>>>>>> is,
>>>>>>>>>>>>       >>>>>>> however, a
>>>>>>>>>>>>       >>>>>>>>>> reserved word, so unfortunately we cannot have a
>>>>>>>>>>>> method
>>>>>>>>>>>>       with such
>>>>>>>>>>>>       >>>>>>> name
>>>>>>>>>>>>       >>>>>>>>> :-)
>>>>>>>>>>>>       >>>>>>>>>>> defaultBranch() does take an `Predicate` as
>>>>>>>>>>>> argument,
>>>>>>>>>>> but I
>>>>>>>>>>>>       >>>>> think
>>>>>>>>>>>>       >>>>>>> that
>>>>>>>>>>>>       >>>>>>>>>> is not required?
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> Absolutely! I think that was just copy-paste
>>>>>>>>>>>> error or
>>>>>>>>>>>>       something.
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> Dear colleagues,
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> please revise the new version of the KIP and
>>>>>>>>>>>> Paul's PR
>>>>>>>>>>>>       >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> Any new suggestions/objections?
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> Ivan
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>>>>>>>>>       >>>>>>>>>>> Thanks for driving the discussion of this KIP.
>>>>>>>>>>>> It seems
>>>>>>>>>>> that
>>>>>>>>>>>>       >>>>>>> everybody
>>>>>>>>>>>>       >>>>>>>>>>> agrees that the current branch() method using
>>>>>>>>>>>> arrays is
>>>>>>>>>>> not
>>>>>>>>>>>>       >>>>>> optimal.
>>>>>>>>>>>>       >>>>>>>>>>> I had a quick look into the PR and I like the
>>>>>>>>>>>> overall
>>>>>>>>>>>>       proposal.
>>>>>>>>>>>>       >>>>>>> There
>>>>>>>>>>>>       >>>>>>>>>>> are some minor things we need to consider. I 
>>>>>>>>>>>> would
>>>>>>>>>>>>       recommend the
>>>>>>>>>>>>       >>>>>>>>>>> following renaming:
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> KStream#branch() -> #split()
>>>>>>>>>>>>       >>>>>>>>>>> KBranchedStream#addBranch() ->
>>>>>>>>>>>> BranchingKStream#branch()
>>>>>>>>>>>>       >>>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>>>>>>>>>       BranchingKStream#default()
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> It's just a suggestion to get slightly shorter
>>>>>>>>>>>> method
>>>>>>>>>>> names.
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> In the current PR, defaultBranch() does take an
>>>>>>>>>>>>       `Predicate` as
>>>>>>>>>>>>       >>>>>>>> argument,
>>>>>>>>>>>>       >>>>>>>>>>> but I think that is not required?
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> Also, we should consider KIP-307, that was
>>>>>>>>>>>> recently
>>>>>>>>>>>>       accepted and
>>>>>>>>>>>>       >>>>>> is
>>>>>>>>>>>>       >>>>>>>>>>> currently implemented:
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>>>>>>  
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> Ie, we should add overloads that accepted a
>>>>>>>>>>>> `Named`
>>>>>>>>>>>>       parameter.
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> For the issue that the created `KStream` object
>>>>>>>>>>>> are in
>>>>>>>>>>>>       different
>>>>>>>>>>>>       >>>>>>>> scopes:
>>>>>>>>>>>>       >>>>>>>>>>> could we extend `KBranchedStream` with a 
>>>>>>>>>>>> `get(int
>>>>>>>>>>>>       index)` method
>>>>>>>>>>>>       >>>>>>> that
>>>>>>>>>>>>       >>>>>>>>>>> returns the corresponding "branched" result
>>>>>>>>>>>> `KStream`
>>>>>>>>>>>>       object?
>>>>>>>>>>>>       >>>>>> Maybe,
>>>>>>>>>>>>       >>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>> second argument of `addBranch()` should not 
>>>>>>>>>>>> be a
>>>>>>>>>>>>       >>>>>> `Consumer<KStream>`
>>>>>>>>>>>>       >>>>>>>> but
>>>>>>>>>>>>       >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could
>>>>>>>>>>>> return
>>>>>>>>>>>>       whatever
>>>>>>>>>>>>       >>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>> `Function` returns?
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> Finally, I would also suggest to update the KIP
>>>>>>>>>>>> with the
>>>>>>>>>>>>       current
>>>>>>>>>>>>       >>>>>>>>>>> proposal. That makes it easier to review.
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>> -Matthias
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>>>>>>>>>       >>>>>>>>>>>> Ivan,
>>>>>>>>>>>>       >>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>> I'm a bit of a novice here as well, but I
>>>>>>>>>>>> think it
>>>>>>>>>>>>       makes sense
>>>>>>>>>>>>       >>>>>> for
>>>>>>>>>>>>       >>>>>>>> you
>>>>>>>>>>>>       >>>>>>>>> to
>>>>>>>>>>>>       >>>>>>>>>>>> revise the KIP and continue the discussion.
>>>>>>>>>>>> Obviously
>>>>>>>>>>>>       we'll
>>>>>>>>>>>>       >>>>> need
>>>>>>>>>>>>       >>>>>>>> some
>>>>>>>>>>>>       >>>>>>>>>>>> buy-in from committers that have actual
>>>>>>>>>>>> binding votes on
>>>>>>>>>>>>       >>>>> whether
>>>>>>>>>>>>       >>>>>>> the
>>>>>>>>>>>>       >>>>>>>>> KIP
>>>>>>>>>>>>       >>>>>>>>>>>> could be adopted.  It would be great to hear
>>>>>>>>>>>> if they
>>>>>>>>>>>>       think this
>>>>>>>>>>>>       >>>>>> is
>>>>>>>>>>>>       >>>>>>> a
>>>>>>>>>>>>       >>>>>>>>> good
>>>>>>>>>>>>       >>>>>>>>>>>> idea overall.  I'm not sure if that happens
>>>>>>>>>>>> just by
>>>>>>>>>>>>       starting a
>>>>>>>>>>>>       >>>>>>> vote,
>>>>>>>>>>>>       >>>>>>>>> or if
>>>>>>>>>>>>       >>>>>>>>>>>> there is generally some indication of interest
>>>>>>>>>>> beforehand.
>>>>>>>>>>>>       >>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>> That being said, I'll continue the discussion
>>>>>>>>>>>> a bit:
>>>>>>>>>>>>       assuming
>>>>>>>>>>>>       >>>>> we
>>>>>>>>>>>>       >>>>>> do
>>>>>>>>>>>>       >>>>>>>>> move
>>>>>>>>>>>>       >>>>>>>>>>>> forward the solution of "stream.branch() 
>>>>>>>>>>>> returns
>>>>>>>>>>>>       >>>>>> KBranchedStream",
>>>>>>>>>>>>       >>>>>>> do
>>>>>>>>>>>>       >>>>>>>>> we
>>>>>>>>>>>>       >>>>>>>>>>>> deprecate "stream.branch(...) returns
>>>>>>>>>>>> KStream[]"?  I
>>>>>>>>>>> would
>>>>>>>>>>>>       >>>>> favor
>>>>>>>>>>>>       >>>>>>>>>>>> deprecating, since having two mutually
>>>>>>>>>>>> exclusive APIs
>>>>>>>>>>> that
>>>>>>>>>>>>       >>>>>>> accomplish
>>>>>>>>>>>>       >>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>> same thing is confusing, especially when
>>>>>>>>>>>> they're fairly
>>>>>>>>>>>>       similar
>>>>>>>>>>>>       >>>>>>>>> anyway.  We
>>>>>>>>>>>>       >>>>>>>>>>>> just need to be sure we're not making 
>>>>>>>>>>>> something
>>>>>>>>>>>>       >>>>>>> impossible/difficult
>>>>>>>>>>>>       >>>>>>>>> that
>>>>>>>>>>>>       >>>>>>>>>>>> is currently possible/easy.
>>>>>>>>>>>>       >>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>> Regarding my PR - I think the general
>>>>>>>>>>>> structure would
>>>>>>>>>>> work,
>>>>>>>>>>>>       >>>>> it's
>>>>>>>>>>>>       >>>>>>>> just a
>>>>>>>>>>>>       >>>>>>>>>>>> little sloppy overall in terms of naming and
>>>>>>>>>>>> clarity. In
>>>>>>>>>>>>       >>>>>>> particular,
>>>>>>>>>>>>       >>>>>>>>>>>> passing in the "predicates" and "children"
>>>>>>>>>>>> lists which
>>>>>>>>>>> get
>>>>>>>>>>>>       >>>>>> modified
>>>>>>>>>>>>       >>>>>>>> in
>>>>>>>>>>>>       >>>>>>>>>>>> KBranchedStream but read from all the way
>>>>>>>>>>>>       KStreamLazyBranch is
>>>>>>>>>>>>       >>>>> a
>>>>>>>>>>>>       >>>>>>> bit
>>>>>>>>>>>>       >>>>>>>>>>>> complicated to follow.
>>>>>>>>>>>>       >>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>> Thanks,
>>>>>>>>>>>>       >>>>>>>>>>>> Paul
>>>>>>>>>>>>       >>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan 
>>>>>>>>>>>> Ponomarev <
>>>>>>>>>>>>       >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>>>>>>>       >>>>>>>>> wrote:
>>>>>>>>>>>>       >>>>>>>>>>>>> Hi Paul!
>>>>>>>>>>>>       >>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>> I read your code carefully and now I am fully
>>>>>>>>>>>>       convinced: your
>>>>>>>>>>>>       >>>>>>>> proposal
>>>>>>>>>>>>       >>>>>>>>>>>>> looks better and should work. We just have to
>>>>>>>>>>>> document
>>>>>>>>>>> the
>>>>>>>>>>>>       >>>>>> crucial
>>>>>>>>>>>>       >>>>>>>>> fact
>>>>>>>>>>>>       >>>>>>>>>>>>> that KStream consumers are invoked as they're
>>>>>>>>>>>> added.
>>>>>>>>>>>>       And then
>>>>>>>>>>>>       >>>>>> it's
>>>>>>>>>>>>       >>>>>>>> all
>>>>>>>>>>>>       >>>>>>>>>>>>> going to be very nice.
>>>>>>>>>>>>       >>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>> What shall we do now? I should re-write the
>>>>>>>>>>>> KIP and
>>>>>>>>>>>>       resume the
>>>>>>>>>>>>       >>>>>>>>>>>>> discussion here, right?
>>>>>>>>>>>>       >>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>> Why are you telling that your PR 'should not
>>>>>>>>>>>> be even a
>>>>>>>>>>>>       >>>>> starting
>>>>>>>>>>>>       >>>>>>>> point
>>>>>>>>>>>>       >>>>>>>>> if
>>>>>>>>>>>>       >>>>>>>>>>>>> we go in this direction'? To me it looks like
>>>>>>>>>>>> a good
>>>>>>>>>>>>       starting
>>>>>>>>>>>>       >>>>>>> point.
>>>>>>>>>>>>       >>>>>>>>> But
>>>>>>>>>>>>       >>>>>>>>>>>>> as a novice in this project I might miss some
>>>>>>>>>>>> important
>>>>>>>>>>>>       >>>>> details.
>>>>>>>>>>>>       >>>>>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>> Ivan
>>>>>>>>>>>>       >>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>>>>>>>>>       >>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>       >>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>> Maybe I’m missing the point, but I 
>>>>>>>>>>>> believe the
>>>>>>>>>>>>       >>>>> stream.branch()
>>>>>>>>>>>>       >>>>>>>>> solution
>>>>>>>>>>>>       >>>>>>>>>>>>> supports this. The couponIssuer::set*
>>>>>>>>>>>> consumers will be
>>>>>>>>>>>>       >>>>> invoked
>>>>>>>>>>>>       >>>>>> as
>>>>>>>>>>>>       >>>>>>>>> they’re
>>>>>>>>>>>>       >>>>>>>>>>>>> added, not during streamsBuilder.build(). So
>>>>>>>>>>>> the user
>>>>>>>>>>>>       still
>>>>>>>>>>>>       >>>>>> ought
>>>>>>>>>>>>       >>>>>>> to
>>>>>>>>>>>>       >>>>>>>>> be
>>>>>>>>>>>>       >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward
>>>>>>>>>>>> and
>>>>>>>>>>>>       depend on
>>>>>>>>>>>>       >>>>> the
>>>>>>>>>>>>       >>>>>>>>> branched
>>>>>>>>>>>>       >>>>>>>>>>>>> streams having been set.
>>>>>>>>>>>>       >>>>>>>>>>>>>> The issue I mean to point out is that it is
>>>>>>>>>>>> hard to
>>>>>>>>>>>>       access
>>>>>>>>>>>>       >>>>> the
>>>>>>>>>>>>       >>>>>>>>> branched
>>>>>>>>>>>>       >>>>>>>>>>>>> streams in the same scope as the original
>>>>>>>>>>>> stream (that
>>>>>>>>>>>>       is, not
>>>>>>>>>>>>       >>>>>>>> inside
>>>>>>>>>>>>       >>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>> couponIssuer), which is a problem with both
>>>>>>>>>>>> proposed
>>>>>>>>>>>>       >>>>> solutions.
>>>>>>>>>>>>       >>>>>> It
>>>>>>>>>>>>       >>>>>>>>> can be
>>>>>>>>>>>>       >>>>>>>>>>>>> worked around though.
>>>>>>>>>>>>       >>>>>>>>>>>>>> [Also, great to hear additional interest in
>>>>>>>>>>>> 401, I’m
>>>>>>>>>>>>       excited
>>>>>>>>>>>>       >>>>> to
>>>>>>>>>>>>       >>>>>>>> hear
>>>>>>>>>>>>       >>>>>>>>>>>>> your thoughts!]
>>>>>>>>>>>>       >>>>>>>>>>>>>> Paul
>>>>>>>>>>>>       >>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan 
>>>>>>>>>>>> Ponomarev <
>>>>>>>>>>>>       >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>>>>>>>       >>>>>>>>> wrote:
>>>>>>>>>>>>       >>>>>>>>>>>>>>> Hi Paul!
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> The idea to postpone the wiring of branches
>>>>>>>>>>>> to the
>>>>>>>>>>>>       >>>>>>>>>>>>> streamsBuilder.build() also looked great for
>>>>>>>>>>>> me at
>>>>>>>>>>> first
>>>>>>>>>>>>       >>>>> glance,
>>>>>>>>>>>>       >>>>>>> but
>>>>>>>>>>>>       >>>>>>>>> ---
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> the newly branched streams are not
>>>>>>>>>>>> available in the
>>>>>>>>>>>>       same
>>>>>>>>>>>>       >>>>>> scope
>>>>>>>>>>>>       >>>>>>> as
>>>>>>>>>>>>       >>>>>>>>> each
>>>>>>>>>>>>       >>>>>>>>>>>>> other.  That is, if we wanted to merge 
>>>>>>>>>>>> them back
>>>>>>>>>>> together
>>>>>>>>>>>>       >>>>> again
>>>>>>>>>>>>       >>>>>> I
>>>>>>>>>>>>       >>>>>>>>> don't see
>>>>>>>>>>>>       >>>>>>>>>>>>> a way to do that.
>>>>>>>>>>>>       >>>>>>>>>>>>>>> You just took the words right out of my
>>>>>>>>>>>> mouth, I was
>>>>>>>>>>>>       just
>>>>>>>>>>>>       >>>>>> going
>>>>>>>>>>>>       >>>>>>> to
>>>>>>>>>>>>       >>>>>>>>>>>>> write in details about this issue.
>>>>>>>>>>>>       >>>>>>>>>>>>>>> Consider the example from Bill's book, p.
>>>>>>>>>>>> 101: say
>>>>>>>>>>>>       we need
>>>>>>>>>>>>       >>>>> to
>>>>>>>>>>>>       >>>>>>>>> identify
>>>>>>>>>>>>       >>>>>>>>>>>>> customers who have bought coffee and made a
>>>>>>>>>>>> purchase
>>>>>>>>>>>>       in the
>>>>>>>>>>>>       >>>>>>>>> electronics
>>>>>>>>>>>>       >>>>>>>>>>>>> store to give them coupons.
>>>>>>>>>>>>       >>>>>>>>>>>>>>> This is the code I usually write under 
>>>>>>>>>>>> these
>>>>>>>>>>>>       circumstances
>>>>>>>>>>>>       >>>>>> using
>>>>>>>>>>>>       >>>>>>>> my
>>>>>>>>>>>>       >>>>>>>>>>>>> 'brancher' class:
>>>>>>>>>>>>       >>>>>>>>>>>>>>> @Setter
>>>>>>>>>>>>       >>>>>>>>>>>>>>> class CouponIssuer{
>>>>>>>>>>>>       >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
>>>>>>>>>>>>       >>>>>>>>>>>>>>>   private KStream<....> 
>>>>>>>>>>>> electronicsPurchases;
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>   KStream<...> coupons(){
>>>>>>>>>>>>       >>>>>>>>>>>>>>>       return
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>>>>>>>>>       >>>>>>>>>>>>>>>       /*In the real world the code here 
>>>>>>>>>>>> can be
>>>>>>>>>>>>       complex, so
>>>>>>>>>>>>       >>>>>>>>> creation of
>>>>>>>>>>>>       >>>>>>>>>>>>> a separate CouponIssuer class is fully
>>>>>>>>>>>> justified, in
>>>>>>>>>>>>       order to
>>>>>>>>>>>>       >>>>>>>> separate
>>>>>>>>>>>>       >>>>>>>>>>>>> classes' responsibilities.*/
>>>>>>>>>>>>       >>>>>>>>>>>>>>>  }
>>>>>>>>>>>>       >>>>>>>>>>>>>>> }
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new
>>>>>>>>>>>> CouponIssuer();
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>     .branch(predicate1,
>>>>>>>>>>> couponIssuer::setCoffePurchases)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>     .branch(predicate2,
>>>>>>>>>>>>       >>>>>> couponIssuer::setElectronicsPurchases)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to
>>>>>>>>>>>> wire up
>>>>>>>>>>>>       everything
>>>>>>>>>>>>       >>>>>>>> later,
>>>>>>>>>>>>       >>>>>>>>>>>>> without the terminal operation!!!*/
>>>>>>>>>>>>       >>>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> Does this make sense?  In order to properly
>>>>>>>>>>>>       initialize the
>>>>>>>>>>>>       >>>>>>>>> CouponIssuer
>>>>>>>>>>>>       >>>>>>>>>>>>> we need the terminal operation to be called
>>>>>>>>>>>> before
>>>>>>>>>>>>       >>>>>>>>> streamsBuilder.build()
>>>>>>>>>>>>       >>>>>>>>>>>>> is called.
>>>>>>>>>>>>       >>>>>>>>>>>>>>> [BTW Paul, I just found out that your
>>>>>>>>>>>> KIP-401 is
>>>>>>>>>>>>       essentially
>>>>>>>>>>>>       >>>>>> the
>>>>>>>>>>>>       >>>>>>>>> next
>>>>>>>>>>>>       >>>>>>>>>>>>> KIP I was going to write here. I have some
>>>>>>>>>>>> thoughts
>>>>>>>>>>>>       based on
>>>>>>>>>>>>       >>>>> my
>>>>>>>>>>>>       >>>>>>>>> experience,
>>>>>>>>>>>>       >>>>>>>>>>>>> so I will join the discussion on KIP-401 
>>>>>>>>>>>> soon.]
>>>>>>>>>>>>       >>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>       >>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> I tried to make a very rough proof of
>>>>>>>>>>>> concept of a
>>>>>>>>>>>>       fluent
>>>>>>>>>>>>       >>>>> API
>>>>>>>>>>>>       >>>>>>>> based
>>>>>>>>>>>>       >>>>>>>>>>>>> off of
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> KStream here
>>>>>>>>>>>>       (https://github.com/apache/kafka/pull/6512),
>>>>>>>>>>>>       >>>>>> and
>>>>>>>>>>>>       >>>>>>> I
>>>>>>>>>>>>       >>>>>>>>> think
>>>>>>>>>>>>       >>>>>>>>>>>>> I
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect
>>>>>>>>>>>> earlier about
>>>>>>>>>>>>       >>>>>>> compatibility
>>>>>>>>>>>>       >>>>>>>>>>>>> issues,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was
>>>>>>>>>>>> unaware
>>>>>>>>>>>>       that Java
>>>>>>>>>>>>       >>>>> is
>>>>>>>>>>>>       >>>>>>>> smart
>>>>>>>>>>>>       >>>>>>>>>>>>> enough to
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    distinguish between a 
>>>>>>>>>>>> branch(varargs...)
>>>>>>>>>>>>       returning one
>>>>>>>>>>>>       >>>>>>> thing
>>>>>>>>>>>>       >>>>>>>>> and
>>>>>>>>>>>>       >>>>>>>>>>>>> branch()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    with no arguments returning another 
>>>>>>>>>>>> thing.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't
>>>>>>>>>>>> actually
>>>>>>>>>>>>       need
>>>>>>>>>>>>       >>>>> it.
>>>>>>>>>>>>       >>>>>>> We
>>>>>>>>>>>>       >>>>>>>>> can
>>>>>>>>>>>>       >>>>>>>>>>>>> just
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    build up the branches in the
>>>>>>>>>>>> KBranchedStream who
>>>>>>>>>>>>       shares
>>>>>>>>>>>>       >>>>>> its
>>>>>>>>>>>>       >>>>>>>>> state
>>>>>>>>>>>>       >>>>>>>>>>>>> with the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do
>>>>>>>>>>>> the
>>>>>>>>>>>>       branching.
>>>>>>>>>>>>       >>>>>>> It's
>>>>>>>>>>>>       >>>>>>>>> not
>>>>>>>>>>>>       >>>>>>>>>>>>> terribly
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>    pretty in its current form, but I 
>>>>>>>>>>>> think it
>>>>>>>>>>>>       demonstrates
>>>>>>>>>>>>       >>>>>> its
>>>>>>>>>>>>       >>>>>>>>>>>>> feasibility.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> To be clear, I don't think that pull
>>>>>>>>>>>> request should
>>>>>>>>>>> be
>>>>>>>>>>>>       >>>>> final
>>>>>>>>>>>>       >>>>>> or
>>>>>>>>>>>>       >>>>>>>>> even a
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> starting point if we go in this direction,
>>>>>>>>>>>> I just
>>>>>>>>>>>>       wanted to
>>>>>>>>>>>>       >>>>>> see
>>>>>>>>>>>>       >>>>>>>> how
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> challenging it would be to get the API
>>>>>>>>>>>> working.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> I will say though, that I'm not sure the
>>>>>>>>>>>> existing
>>>>>>>>>>>>       solution
>>>>>>>>>>>>       >>>>>>> could
>>>>>>>>>>>>       >>>>>>>> be
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> deprecated in favor of this, which I had
>>>>>>>>>>>> originally
>>>>>>>>>>>>       >>>>> suggested
>>>>>>>>>>>>       >>>>>>>> was a
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> possibility.  The reason is that the newly
>>>>>>>>>>>> branched
>>>>>>>>>>>>       streams
>>>>>>>>>>>>       >>>>>> are
>>>>>>>>>>>>       >>>>>>>> not
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> available in the same scope as each
>>>>>>>>>>>> other.  That
>>>>>>>>>>>>       is, if we
>>>>>>>>>>>>       >>>>>>> wanted
>>>>>>>>>>>>       >>>>>>>>> to
>>>>>>>>>>>>       >>>>>>>>>>>>> merge
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> them back together again I don't see a way
>>>>>>>>>>>> to do
>>>>>>>>>>>>       that.  The
>>>>>>>>>>>>       >>>>>> KIP
>>>>>>>>>>>>       >>>>>>>>>>>>> proposal
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> has the same issue, though - all this
>>>>>>>>>>>> means is that
>>>>>>>>>>> for
>>>>>>>>>>>>       >>>>>> either
>>>>>>>>>>>>       >>>>>>>>>>>>> solution,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> deprecating the existing branch(...) is
>>>>>>>>>>>> not on the
>>>>>>>>>>>>       table.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan
>>>>>>>>>>>> Ponomarev <
>>>>>>>>>>>>       >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>>
>>>>>>>>>>>>       >>>>>>>>>>>>> wrote:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> OK, let me summarize what we have
>>>>>>>>>>>> discussed up to
>>>>>>>>>>> this
>>>>>>>>>>>>       >>>>>> point.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed
>>>>>>>>>>>> that
>>>>>>>>>>>>       branch API
>>>>>>>>>>>>       >>>>>>> needs
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> improvement. Motivation is given in 
>>>>>>>>>>>> the KIP.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...)....
>>>>>>>>>>>> //onTopOf
>>>>>>>>>>>>       returns
>>>>>>>>>>>>       >>>>>> its
>>>>>>>>>>>>       >>>>>>>>> argument
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2)
>>>>>>>>>>>> The code
>>>>>>>>>>> won't
>>>>>>>>>>>>       >>>>> make
>>>>>>>>>>>>       >>>>>>>> sense
>>>>>>>>>>>>       >>>>>>>>>>>>> until
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> all the necessary ingredients are 
>>>>>>>>>>>> provided.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> CONS: The need to create a
>>>>>>>>>>>> KafkaStreamsBrancher
>>>>>>>>>>>>       instance
>>>>>>>>>>>>       >>>>>>>>> contrasts the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or
>>>>>>>>>>>> noDefault(). Both
>>>>>>>>>>>>       >>>>>>>>> defaultBranch(..)
>>>>>>>>>>>>       >>>>>>>>>>>>> and
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> noDefault() return void
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams
>>>>>>>>>>>> interface
>>>>>>>>>>> is
>>>>>>>>>>>>       >>>>>> defined.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> CONS: We need to define two terminal 
>>>>>>>>>>>> methods
>>>>>>>>>>>>       >>>>>>>> (defaultBranch(ks->)
>>>>>>>>>>>>       >>>>>>>>> and
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very
>>>>>>>>>>>> easy to
>>>>>>>>>>>>       miss the
>>>>>>>>>>>>       >>>>>> fact
>>>>>>>>>>>>       >>>>>>>>> that one
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> of the terminal methods should be called.
>>>>>>>>>>>> If these
>>>>>>>>>>>>       methods
>>>>>>>>>>>>       >>>>>> are
>>>>>>>>>>>>       >>>>>>>> not
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> called, we can throw an exception in
>>>>>>>>>>>> runtime.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can
>>>>>>>>>>>> we do
>>>>>>>>>>> better?
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Paul,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> I see your point when you are talking
>>>>>>>>>>>> about
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot 
>>>>>>>>>>>> not be
>>>>>>>>>>>>       implemented the
>>>>>>>>>>>>       >>>>>>> easy
>>>>>>>>>>>>       >>>>>>>>> way.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> user could specify a terminal 
>>>>>>>>>>>> method that
>>>>>>>>>>> assumes
>>>>>>>>>>>>       >>>>> nothing
>>>>>>>>>>>>       >>>>>>>> will
>>>>>>>>>>>>       >>>>>>>>>>>>> reach
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> the default branch,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> throwing an exception if such a case
>>>>>>>>>>>> occurs.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be
>>>>>>>>>>>> the only
>>>>>>>>>>> option
>>>>>>>>>>>>       >>>>>> besides
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios
>>>>>>>>>>>> when we
>>>>>>>>>>>>       want to
>>>>>>>>>>>>       >>>>>> just
>>>>>>>>>>>>       >>>>>>>>> silently
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
>>>>>>>>>>>>       predicate. 2)
>>>>>>>>>>>>       >>>>>>> Throwing
>>>>>>>>>>>>       >>>>>>>>> an
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> exception in the middle of data flow
>>>>>>>>>>>> processing
>>>>>>>>>>>>       looks
>>>>>>>>>>>>       >>>>>> like a
>>>>>>>>>>>>       >>>>>>>> bad
>>>>>>>>>>>>       >>>>>>>>>>>>> idea.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would
>>>>>>>>>>>> prefer to
>>>>>>>>>>>>       emit a
>>>>>>>>>>>>       >>>>>>>> special
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is
>>>>>>>>>>>> exactly
>>>>>>>>>>> where
>>>>>>>>>>>>       >>>>>>> `default`
>>>>>>>>>>>>       >>>>>>>>> can
>>>>>>>>>>>>       >>>>>>>>>>>>> be
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>>>>>>>>       >>>>> InternalTopologyBuilder
>>>>>>>>>>>>       >>>>>>> to
>>>>>>>>>>>>       >>>>>>>>> track
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> branches that haven't been terminated
>>>>>>>>>>>> and raise
>>>>>>>>>>>>       a clear
>>>>>>>>>>>>       >>>>>>> error
>>>>>>>>>>>>       >>>>>>>>>>>>> before it
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> becomes an issue.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the
>>>>>>>>>>>> program is
>>>>>>>>>>>>       >>>>> compiled
>>>>>>>>>>>>       >>>>>>> and
>>>>>>>>>>>>       >>>>>>>>> run?
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply 
>>>>>>>>>>>> won't
>>>>>>>>>>>>       compile if
>>>>>>>>>>>>       >>>>> used
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an 
>>>>>>>>>>>> API as a
>>>>>>>>>>>>       method chain
>>>>>>>>>>>>       >>>>>>>> starting
>>>>>>>>>>>>       >>>>>>>>>>>>> from
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost
>>>>>>>>>>>> difference
>>>>>>>>>>>>       between
>>>>>>>>>>>>       >>>>>>>> runtime
>>>>>>>>>>>>       >>>>>>>>> and
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure
>>>>>>>>>>>> uncovers
>>>>>>>>>>>>       >>>>> instantly
>>>>>>>>>>>>       >>>>>> on
>>>>>>>>>>>>       >>>>>>>>> unit
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> tests, it costs more for the project
>>>>>>>>>>>> than a
>>>>>>>>>>>>       compilation
>>>>>>>>>>>>       >>>>>>>> failure.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Good point about the terminal
>>>>>>>>>>>> operation being
>>>>>>>>>>>>       required.
>>>>>>>>>>>>       >>>>>>> But
>>>>>>>>>>>>       >>>>>>>> is
>>>>>>>>>>>>       >>>>>>>>>>>>> that
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't
>>>>>>>>>>>> want a
>>>>>>>>>>>>       >>>>>> defaultBranch
>>>>>>>>>>>>       >>>>>>>>> they
>>>>>>>>>>>>       >>>>>>>>>>>>> can
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> some other terminal method
>>>>>>>>>>>> (noDefaultBranch()?)
>>>>>>>>>>>>       just as
>>>>>>>>>>>>       >>>>>>>>> easily.  In
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> fact I
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a
>>>>>>>>>>>> nicer API
>>>>>>>>>>> - a
>>>>>>>>>>>>       >>>>> user
>>>>>>>>>>>>       >>>>>>>> could
>>>>>>>>>>>>       >>>>>>>>>>>>> specify
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing
>>>>>>>>>>>> will reach
>>>>>>>>>>> the
>>>>>>>>>>>>       >>>>>> default
>>>>>>>>>>>>       >>>>>>>>> branch,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case
>>>>>>>>>>>> occurs.
>>>>>>>>>>> That
>>>>>>>>>>>>       >>>>> seems
>>>>>>>>>>>>       >>>>>>> like
>>>>>>>>>>>>       >>>>>>>>> an
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> improvement over the current branch()
>>>>>>>>>>>> API,
>>>>>>>>>>>>       which allows
>>>>>>>>>>>>       >>>>>> for
>>>>>>>>>>>>       >>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>> more
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> subtle
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly 
>>>>>>>>>>>> getting
>>>>>>>>>>> dropped.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> The need for a terminal operation
>>>>>>>>>>>> certainly has
>>>>>>>>>>>>       to be
>>>>>>>>>>>>       >>>>>> well
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> documented, but
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>>>>>>>>       >>>>> InternalTopologyBuilder
>>>>>>>>>>>>       >>>>>>> to
>>>>>>>>>>>>       >>>>>>>>> track
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated
>>>>>>>>>>>> and raise
>>>>>>>>>>>>       a clear
>>>>>>>>>>>>       >>>>>>> error
>>>>>>>>>>>>       >>>>>>>>>>>>> before it
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that
>>>>>>>>>>>> there is
>>>>>>>>>>> a
>>>>>>>>>>>>       >>>>> "build
>>>>>>>>>>>>       >>>>>>>> step"
>>>>>>>>>>>>       >>>>>>>>>>>>> where
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>>>>>>>>>>       >>>>>> StreamsBuilder.build()
>>>>>>>>>>>>       >>>>>>> is
>>>>>>>>>>>>       >>>>>>>>>>>>> called.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its
>>>>>>>>>>>> argument, I
>>>>>>>>>>> agree
>>>>>>>>>>>>       >>>>> that
>>>>>>>>>>>>       >>>>>>> it's
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> critical to
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> allow users to do other operations on
>>>>>>>>>>>> the input
>>>>>>>>>>>>       stream.
>>>>>>>>>>>>       >>>>>>> With
>>>>>>>>>>>>       >>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> fluent
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same
>>>>>>>>>>>> way all
>>>>>>>>>>> other
>>>>>>>>>>>>       >>>>>>> operations
>>>>>>>>>>>>       >>>>>>>>> do -
>>>>>>>>>>>>       >>>>>>>>>>>>> if
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> want to process off the original 
>>>>>>>>>>>> KStream
>>>>>>>>>>> multiple
>>>>>>>>>>>>       >>>>> times,
>>>>>>>>>>>>       >>>>>>> you
>>>>>>>>>>>>       >>>>>>>>> just
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> need the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call
>>>>>>>>>>>> as many
>>>>>>>>>>>>       operations
>>>>>>>>>>>>       >>>>>> on
>>>>>>>>>>>>       >>>>>>> it
>>>>>>>>>>>>       >>>>>>>>> as
>>>>>>>>>>>>       >>>>>>>>>>>>> you
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> desire.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan
>>>>>>>>>>>> Ponomarev <
>>>>>>>>>>>>       >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we
>>>>>>>>>>>> do not
>>>>>>>>>>>>       always need
>>>>>>>>>>>>       >>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
>>>>>>>>>>> operation we
>>>>>>>>>>>>       >>>>> don't
>>>>>>>>>>>>       >>>>>>>> know
>>>>>>>>>>>>       >>>>>>>>>>>>> when to
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch 
>>>>>>>>>>>> switch'.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its
>>>>>>>>>>>> argument,
>>>>>>>>>>>>       so we
>>>>>>>>>>>>       >>>>> can
>>>>>>>>>>>>       >>>>>> do
>>>>>>>>>>>>       >>>>>>>>>>>>> something
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> more with the original branch after
>>>>>>>>>>>> branching.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> I understand your point that the 
>>>>>>>>>>>> need of
>>>>>>>>>>> special
>>>>>>>>>>>>       >>>>> object
>>>>>>>>>>>>       >>>>>>>>>>>>> construction
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream
>>>>>>>>>>>> methods.
>>>>>>>>>>> But
>>>>>>>>>>>>       >>>>> here
>>>>>>>>>>>>       >>>>>> we
>>>>>>>>>>>>       >>>>>>>>> have a
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to
>>>>>>>>>>>> split the
>>>>>>>>>>>>       flow,
>>>>>>>>>>>>       >>>>> so
>>>>>>>>>>>>       >>>>>> I
>>>>>>>>>>>>       >>>>>>>>> think
>>>>>>>>>>>>       >>>>>>>>>>>>> this
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve
>>>>>>>>>>>> this
>>>>>>>>>>>>       API, but I
>>>>>>>>>>>>       >>>>>> find
>>>>>>>>>>>>       >>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing 
>>>>>>>>>>>> since it
>>>>>>>>>>>>       contrasts the
>>>>>>>>>>>>       >>>>>>> fluency
>>>>>>>>>>>>       >>>>>>>>> of
>>>>>>>>>>>>       >>>>>>>>>>>>> other
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd
>>>>>>>>>>>> like to
>>>>>>>>>>>>       just call
>>>>>>>>>>>>       >>>>> a
>>>>>>>>>>>>       >>>>>>>>> method on
>>>>>>>>>>>>       >>>>>>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if
>>>>>>>>>>>> the branch
>>>>>>>>>>>>       cases
>>>>>>>>>>>>       >>>>> are
>>>>>>>>>>>>       >>>>>>>>> defined
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> fluently.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate,
>>>>>>>>>>>> handleCase)
>>>>>>>>>>>>       is very
>>>>>>>>>>>>       >>>>>> nice
>>>>>>>>>>>>       >>>>>>>>> and the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> right
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped
>>>>>>>>>>>> around
>>>>>>>>>>>>       how we
>>>>>>>>>>>>       >>>>>>> specify
>>>>>>>>>>>>       >>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>> source
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> stream.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Like:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
>>>>>>>>>>> this::handle1)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
>>>>>>>>>>> this::handle2)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> .defaultBranch(this::handleDefault);
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a
>>>>>>>>>>>> KBranchedStreams or
>>>>>>>>>>>>       >>>>>>>> KStreamBrancher
>>>>>>>>>>>>       >>>>>>>>> or
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> something,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
>>>>>>>>>>>>       terminated by
>>>>>>>>>>>>       >>>>>>>>>>>>> defaultBranch()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> (which
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
>>>>>>>>>>>>       incompatible with
>>>>>>>>>>>>       >>>>> the
>>>>>>>>>>>>       >>>>>>>>> current
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> API, so
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to
>>>>>>>>>>>> have a
>>>>>>>>>>>>       different
>>>>>>>>>>>>       >>>>>> name,
>>>>>>>>>>>>       >>>>>>>> but
>>>>>>>>>>>>       >>>>>>>>> that
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we
>>>>>>>>>>>> could call it
>>>>>>>>>>>>       >>>>>> something
>>>>>>>>>>>>       >>>>>>>> like
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> branched()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the
>>>>>>>>>>>> old API.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of
>>>>>>>>>>>> your
>>>>>>>>>>>>       KIP?  It
>>>>>>>>>>>>       >>>>>> seems
>>>>>>>>>>>>       >>>>>>>>> like it
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> does to
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line
>>>>>>>>>>>> branching
>>>>>>>>>>>>       while also
>>>>>>>>>>>>       >>>>>>>> allowing
>>>>>>>>>>>>       >>>>>>>>> you
>>>>>>>>>>>>       >>>>>>>>>>>>> to
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
>>>>>>>>>>>>       KBranchedStreams
>>>>>>>>>>>>       >>>>>> if
>>>>>>>>>>>>       >>>>>>>>> desired.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan
>>>>>>>>>>>> Ponomarev
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> void
>>>>>>>>>>>> handleFirstCase(KStream<String, String>
>>>>>>>>>>>>       ks){
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> ks.filter(....).mapValues(...)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> void 
>>>>>>>>>>>> handleSecondCase(KStream<String,
>>>>>>>>>>>>       String> ks){
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> ks.selectKey(...).groupByKey()...
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> ......
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String,
>>>>>>>>>>>> String>()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
>>>>>>>>>>>>       this::handleFirstCase)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
>>>>>>>>>>>>       this::handleSecondCase)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
>>>>>>>>>>> KafkaStreamsBrancher
>>>>>>>>>>>>       >>>>> takes a
>>>>>>>>>>>>       >>>>>>>>> Consumer
>>>>>>>>>>>>       >>>>>>>>>>>>> as a
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing,
>>>>>>>>>>>> and the
>>>>>>>>>>>>       example in
>>>>>>>>>>>>       >>>>>> the
>>>>>>>>>>>>       >>>>>>>> KIP
>>>>>>>>>>>>       >>>>>>>>>>>>> shows
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a
>>>>>>>>>>>> terminal node
>>>>>>>>>>>>       >>>>>>>>> (KafkaStreams#to()
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> in this
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> case).
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but
>>>>>>>>>>>> how would
>>>>>>>>>>> we
>>>>>>>>>>>>       >>>>> handle
>>>>>>>>>>>>       >>>>>>> the
>>>>>>>>>>>>       >>>>>>>>> case
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> where the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but
>>>>>>>>>>>> wants to
>>>>>>>>>>> continue
>>>>>>>>>>>>       >>>>>>>> processing
>>>>>>>>>>>>       >>>>>>>>> and
>>>>>>>>>>>>       >>>>>>>>>>>>> not
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on
>>>>>>>>>>>> the
>>>>>>>>>>> branched
>>>>>>>>>>>>       >>>>>> stream
>>>>>>>>>>>>       >>>>>>>>>>>>> immediately?
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic
>>>>>>>>>>>> as is if
>>>>>>>>>>>>       we had
>>>>>>>>>>>>       >>>>>>>> something
>>>>>>>>>>>>       >>>>>>>>> like
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] 
>>>>>>>>>>>> branches =
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM
>>>>>>>>>>>> Bill Bejeck
>>>>>>>>>>> <
>>>>>>>>>>>>       >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the
>>>>>>>>>>>> discussion for
>>>>>>>>>>> KIP-
>>>>>>>>>>>>       >>>>> 418.
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion 
>>>>>>>>>>>> about
>>>>>>>>>>> KIP-418.
>>>>>>>>>>>>       >>>>> Please
>>>>>>>>>>>>       >>>>>>>> take
>>>>>>>>>>>>       >>>>>>>>> a
>>>>>>>>>>>>       >>>>>>>>>>>>> look
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would
>>>>>>>>>>>> appreciate any
>>>>>>>>>>>>       feedback :)
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>>>>>>>>>       >>>>>
>>>>>>>>>>>>
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>>>>>>>  
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>>>>>>>>>       >>>>>>>>>>>>> 
>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>>>>>>>>>>>       >>>>> https://github.com/apache/kafka/pull/6164
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>       >>>>>>>>>
>>>>>>>>>>>>       >
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to