Hi Ivan,

Welcome back! Thanks for the update. This KIP looks really nice. 

Thanks also for taking into account the grammar. I think you’ve done a good job 
of balancing the proposed grammar’s objectives with the current API’s idioms. 

I have just a few comments remaining:

1. Can you propose to deprecate (but not remove) the existing ‘branch’ method? 
Since the proposed API is presented as a superior alternative, we may as well 
prune so that the total surface area remains constant. 

2. There’s one example (the enum one) that says “this is why this API is 
superior to branching directly off of the parent KStream”, but it doesn’t show 
what it would look like to branch off the parent KStream. Can you add that 
counterexample? My one reservation to the idea of a branch operator at all is 
that you can implicitly branch off of any KTable or KStream. I think it would 
be nice to devote some text to explaining when either one or the other is 
preferable. 

3. Reading the KIP, I found ‘withConsumer’ confusing; I thought you were 
talking about the kafka Consumer interface (which doesn’t make sense, of 
course). I get that you were referring to the java Consumer interface, but we 
should still probably to to avoid the ambiguity. Just throwing out a 
suggestion, how about ‘withSink’?

4. More of an aside, it seems slightly convoluted to have withChain and 
withConsumer, since the sink operation could theoretically just go on the end 
of the chain. But I can see that you wanted to capture the chain and return it 
in the KStream-valued Map, which is incompatible with a terminal chain. It 
seems like there are really two disjoint use cases: EITHER using chain and the 
result map OR using just the sink. The API doesn’t codify this, though , which 
might be confusing. I really have no idea of what to do differently, though. 
Unless you’re feeling more creative than I am, maybe we’ll just plan to clarify 
this in the JavaDoc. 

Thanks again!
-John

On Wed, Apr 22, 2020, at 18:45, Ivan Ponomarev wrote:
> Hi,
> 
> I have read the John's "DSL design principles" and have completely 
> rewritten the KIP, see 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> 
> 
> This version includes all the previous discussion results and follows 
> the design principles, with one exception.
> 
> The exception is
> 
> branch(Predicate<K,V> predicate, Branched<K,V> branched)
> 
> which formally violates 'no more than one parameter' rule, but I think 
> here it is justified.
> 
> We must provide a predicate for a branch and don't need to provide one 
> for the default branch. Thus for both operations we may use a single 
> Branched parameter class, with an extra method parameter for `branch`.
> 
> Since predicate is a natural, necessary part of a branch, no 
> 'proliferation of overloads, deprecations, etc.' is expected here as it 
> is said in the rationale for the 'single parameter rule'.
> 
> WDYT, is this KIP mature enough to begin voting?
> 
> Regards,
> 
> Ivan
> 
> 21.04.2020 2:09, Matthias J. Sax пишет:
> > Ivan,
> > 
> > no worries about getting side tracked. Glad to have you back!
> > 
> > The DSL improved further in the meantime and we already have a `Named`
> > config object to name operators. It seems reasonable to me to build on this.
> > 
> > Furthermore, John did a writeup about "DSL design principles" that we
> > want to follow:
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> > -- might be worth to checkout.
> > 
> > 
> > -Matthias
> > 
> > 
> > On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
> >> Hi everyone!
> >>
> >> Let me revive the discussion of this KIP.
> >>
> >> I'm very sorry for stopping my participation in the discussion in June
> >> 2019. My project work was very intensive then and it didn't leave me
> >> spare time. But I think I must finish this, because we invested
> >> substantial effort into this discussion and I'm not feel entitled to
> >> propose other things before this one is finalized.
> >>
> >> During these months I proceeded with writing and reviewing Kafka
> >> Streams-related code. Every time I needed branching, Spring-Kafka's
> >> KafkaStreamBrancher class of my invention (the original idea for this
> >> KIP) worked for me -- that's another reason why I gave up pushing the
> >> KIP forward. When I was coming across the problem with the scope of
> >> branches, I worked around it this way:
> >>
> >> AtomicReference<KStream<...>> result = new AtomicReference<>();
> >> new KafkaStreamBrancher<....>()
> >>      .branch(....)
> >>      .defaultBranch(result::set)
> >>      .onTopOf(someStream);
> >> result.get()...
> >>
> >>
> >> And yes, of course I don't feel very happy with this approach.
> >>
> >> I think that Matthias came up with a bright solution in his post from
> >> May, 24th 2019. Let me quote it:
> >>
> >> KStream#split() -> KBranchedStream
> >> // branch is not easily accessible in current scope
> >> KBranchedStream#branch(Predicate, Consumer<KStream>)
> >>    -> KBranchedStream
> >> // assign a name to the branch and
> >> // return the sub-stream to the current scope later
> >> //
> >> // can be simple as `#branch(p, s->s, "name")`
> >> // or also complex as `#branch(p, s->s.filter(...), "name")`
> >> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
> >>    -> KBranchedStream
> >> // default branch is not easily accessible
> >> // return map of all named sub-stream into current scope
> >> KBranchedStream#default(Cosumer<KStream>)
> >>    -> Map<String,KStream>
> >> // assign custom name to default-branch
> >> // return map of all named sub-stream into current scope
> >> KBranchedStream#default(Function<KStream,KStream>, String)
> >>    -> Map<String,KStream>
> >> // assign a default name for default
> >> // return map of all named sub-stream into current scope
> >> KBranchedStream#defaultBranch(Function<KStream,KStream>)
> >>    -> Map<String,KStream>
> >> // return map of all names sub-stream into current scope
> >> KBranchedStream#noDefaultBranch()
> >>    -> Map<String,KStream>
> >>
> >> I believe this would satisfy everyone. Optional names seems to be a good
> >> idea: when you don't need to have the branches in the same scope, you
> >> just don't use names and you don't risk making your code brittle. Or,
> >> you might want to add names just for debugging purposes. Or, finally,
> >> you might use the returned Map to have the named branches in the
> >> original scope.
> >>
> >> There also was an input from John Roesler on June 4th, 2019, who
> >> suggested using Named class. I can't comment on this. The idea seems
> >> reasonable, but in this matter I'd rather trust people who are more
> >> familiar with Streams API design principles than me.
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >>
> >>
> >> 08.10.2019 1:38, Matthias J. Sax пишет:
> >>> I am moving this KIP into "inactive status". Feel free to resume the KIP
> >>> at any point.
> >>>
> >>> If anybody else is interested in picking up this KIP, feel free to do so.
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 7/11/19 4:00 PM, Matthias J. Sax wrote:
> >>>> Ivan,
> >>>>
> >>>> did you see my last reply? What do you think about my proposal to mix
> >>>> both approaches and try to get best-of-both worlds?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
> >>>>> Thanks for the input John!
> >>>>>
> >>>>>> under your suggestion, it seems that the name is required
> >>>>>
> >>>>> If you want to get the `KStream` as part of the `Map` back using a
> >>>>> `Function`, yes. If you follow the "embedded chaining" pattern using a
> >>>>> `Consumer`, no.
> >>>>>
> >>>>> Allowing for a default name via `split()` can of course be done.
> >>>>> Similarly, using `Named` instead of `String` is possible.
> >>>>>
> >>>>> I wanted to sketch out a high level proposal to merge both patterns
> >>>>> only. Your suggestions to align the new API with the existing API make
> >>>>> totally sense.
> >>>>>
> >>>>>
> >>>>>
> >>>>> One follow up question: Would `Named` be optional or required in
> >>>>> `split()` and `branch()`? It's unclear from your example.
> >>>>>
> >>>>> If both are mandatory, what do we gain by it? The returned `Map` only
> >>>>> contains the corresponding branches, so why should we prefix all of
> >>>>> them? If only `Named` is mandatory in `branch()`, but optional in
> >>>>> `split()`, the same question raises?
> >>>>>
> >>>>> Requiring `Named` in `split()` seems only to make sense, if `Named` is
> >>>>> optional in `branch()` and we generate `-X` suffix using a counter for
> >>>>> different branch name. However, this might lead to the problem of
> >>>>> changing names if branches are added/removed. Also, how would the names
> >>>>> be generated if `Consumer` is mixed in (ie, not all branches are
> >>>>> returned in the `Map`).
> >>>>>
> >>>>> If `Named` is optional for both, it could happen that a user misses to
> >>>>> specify a name for a branch what would lead to runtime issues.
> >>>>>
> >>>>>
> >>>>> Hence, I am actually in favor to not allow a default name but keep
> >>>>> `split()` without parameter and make `Named` in `branch()` required
> >>>>> if a
> >>>>> `Function` is used. This makes it explicit to the user that
> >>>>> specifying a
> >>>>> name is required if a `Function` is used.
> >>>>>
> >>>>>
> >>>>>
> >>>>> About
> >>>>>
> >>>>>> KBranchedStream#branch(BranchConfig)
> >>>>>
> >>>>> I don't think that the branching predicate is a configuration and hence
> >>>>> would not include it in a configuration object.
> >>>>>
> >>>>>>       withChain(...);
> >>>>>
> >>>>> Similar, `withChain()` (that would only take a `Consumer`?) does not
> >>>>> seem to be a configuration. We can also not prevent a user to call
> >>>>> `withName()` in combination of `withChain()` what does not make sense
> >>>>> IMHO. We could of course throw an RTE but not have a compile time check
> >>>>> seems less appealing. Also, it could happen that neither `withChain()`
> >>>>> not `withName()` is called and the branch is missing in the returned
> >>>>> `Map` what lead to runtime issues, too.
> >>>>>
> >>>>> Hence, I don't think that we should add `BranchConfig`. A config object
> >>>>> is helpful if each configuration can be set independently of all
> >>>>> others,
> >>>>> but this seems not to be the case here. If we add new configuration
> >>>>> later, we can also just move forward by deprecating the methods that
> >>>>> accept `Named` and add new methods that accepted `BranchConfig` (that
> >>>>> would of course implement `Named`).
> >>>>>
> >>>>>
> >>>>> Thoughts?
> >>>>>
> >>>>>
> >>>>> @Ivan, what do you think about the general idea to blend the two main
> >>>>> approaches of returning a `Map` plus an "embedded chaining"?
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 6/4/19 10:33 AM, John Roesler wrote:
> >>>>>> Thanks for the idea, Matthias, it does seem like this would satisfy
> >>>>>> everyone. Returning the map from the terminal operations also solves
> >>>>>> the problem of merging/joining the branched streams, if we want to add
> >>>>>> support for the compliment later on.
> >>>>>>
> >>>>>> Under your suggestion, it seems that the name is required. Otherwise,
> >>>>>> we wouldn't have keys for the map to return. I this this is actually
> >>>>>> not too bad, since experience has taught us that, although names for
> >>>>>> operations are not required to define stream processing logic, it does
> >>>>>> significantly improve the operational experience when you can map the
> >>>>>> topology, logs, metrics, etc. back to the source code. Since you
> >>>>>> wouldn't (have to) reference the name to chain extra processing onto
> >>>>>> the branch (thanks to the second argument), you can avoid the
> >>>>>> "unchecked name" problem that Ivan pointed out.
> >>>>>>
> >>>>>> In the current implementation of Branch, you can name the branch
> >>>>>> operator itself, and then all the branches get index-suffixed names
> >>>>>> built from the branch operator name. I guess under this proposal, we
> >>>>>> could naturally append the branch name to the branching operator name,
> >>>>>> like this:
> >>>>>>
> >>>>>>      stream.split(Named.withName("mysplit")) //creates node "mysplit"
> >>>>>>                 .branch(..., ..., "abranch") // creates node
> >>>>>> "mysplit-abranch"
> >>>>>>                 .defaultBranch(...) // creates node "mysplit-default"
> >>>>>>
> >>>>>> It does make me wonder about the DSL syntax itself, though.
> >>>>>>
> >>>>>> We don't have a defined grammar, so there's plenty of room to debate
> >>>>>> the "best" syntax in the context of each operation, but in general,
> >>>>>> the KStream DSL operators follow this pattern:
> >>>>>>
> >>>>>>       operator(function, config_object?) OR operator(config_object)
> >>>>>>
> >>>>>> where config_object is often just Named in the "function" variant.
> >>>>>> Even when the config_object isn't a Named, but some other config
> >>>>>> class, that config class _always_ implements NamedOperation.
> >>>>>>
> >>>>>> Here, we're introducing a totally different pattern:
> >>>>>>
> >>>>>>     operator(function, function, string)
> >>>>>>
> >>>>>> where the string is the name.
> >>>>>> My first question is whether the name should instead be specified with
> >>>>>> the NamedOperation interface.
> >>>>>>
> >>>>>> My second question is whether we should just roll all these arguments
> >>>>>> up into a config object like:
> >>>>>>
> >>>>>>      KBranchedStream#branch(BranchConfig)
> >>>>>>
> >>>>>>      interface BranchConfig extends NamedOperation {
> >>>>>>       withPredicate(...);
> >>>>>>       withChain(...);
> >>>>>>       withName(...);
> >>>>>>     }
> >>>>>>
> >>>>>> Although I guess we'd like to call BranchConfig something more like
> >>>>>> "Branched", even if I don't particularly like that pattern.
> >>>>>>
> >>>>>> This makes the source code a little noisier, but it also makes us more
> >>>>>> future-proof, as we can deal with a wide range of alternatives purely
> >>>>>> in the config interface, and never have to deal with adding overloads
> >>>>>> to the KBranchedStream if/when we decide we want the name to be
> >>>>>> optional, or the KStream->KStream to be optional.
> >>>>>>
> >>>>>> WDYT?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
> >>>>>> <michael.droga...@confluent.io> wrote:
> >>>>>>>
> >>>>>>> Matthias: I think that's pretty reasonable from my point of view.
> >>>>>>> Good
> >>>>>>> suggestion.
> >>>>>>>
> >>>>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax
> >>>>>>> <matth...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Interesting discussion.
> >>>>>>>>
> >>>>>>>> I am wondering, if we cannot unify the advantage of both approaches:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> KStream#split() -> KBranchedStream
> >>>>>>>>
> >>>>>>>> // branch is not easily accessible in current scope
> >>>>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
> >>>>>>>>     -> KBranchedStream
> >>>>>>>>
> >>>>>>>> // assign a name to the branch and
> >>>>>>>> // return the sub-stream to the current scope later
> >>>>>>>> //
> >>>>>>>> // can be simple as `#branch(p, s->s, "name")`
> >>>>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
> >>>>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
> >>>>>>>>     -> KBranchedStream
> >>>>>>>>
> >>>>>>>> // default branch is not easily accessible
> >>>>>>>> // return map of all named sub-stream into current scope
> >>>>>>>> KBranchedStream#default(Cosumer<KStream>)
> >>>>>>>>     -> Map<String,KStream>
> >>>>>>>>
> >>>>>>>> // assign custom name to default-branch
> >>>>>>>> // return map of all named sub-stream into current scope
> >>>>>>>> KBranchedStream#default(Function<KStream,KStream>, String)
> >>>>>>>>     -> Map<String,KStream>
> >>>>>>>>
> >>>>>>>> // assign a default name for default
> >>>>>>>> // return map of all named sub-stream into current scope
> >>>>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
> >>>>>>>>     -> Map<String,KStream>
> >>>>>>>>
> >>>>>>>> // return map of all names sub-stream into current scope
> >>>>>>>> KBranchedStream#noDefaultBranch()
> >>>>>>>>     -> Map<String,KStream>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Hence, for each sub-stream, the user can pick to add a name and
> >>>>>>>> return
> >>>>>>>> the branch "result" to the calling scope or not. The
> >>>>>>>> implementation can
> >>>>>>>> also check at runtime that all returned names are unique. The
> >>>>>>>> returned
> >>>>>>>> Map can be empty and it's also optional to use the Map.
> >>>>>>>>
> >>>>>>>> To me, it seems like a good way to get best of both worlds.
> >>>>>>>>
> >>>>>>>> Thoughts?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 5/6/19 5:15 PM, John Roesler wrote:
> >>>>>>>>> Ivan,
> >>>>>>>>>
> >>>>>>>>> That's a very good point about the "start" operator in the
> >>>>>>>>> dynamic case.
> >>>>>>>>> I had no problem with "split()"; I was just questioning the
> >>>>>>>>> necessity.
> >>>>>>>>> Since you've provided a proof of necessity, I'm in favor of the
> >>>>>>>>> "split()" start operator. Thanks!
> >>>>>>>>>
> >>>>>>>>> Separately, I'm interested to see where the present discussion
> >>>>>>>>> leads.
> >>>>>>>>> I've written enough Javascript code in my life to be suspicious of
> >>>>>>>>> nested closures. You have a good point about using method
> >>>>>>>>> references (or
> >>>>>>>>> indeed function literals also work). It should be validating
> >>>>>>>>> that this
> >>>>>>>>> was also the JS community's first approach to flattening the
> >>>>>>>>> logic when
> >>>>>>>>> their nested closure situation got out of hand. Unfortunately, it's
> >>>>>>>>> replacing nesting with redirection, both of which disrupt code
> >>>>>>>>> readability (but in different ways for different reasons). In other
> >>>>>>>>> words, I agree that function references is *the* first-order
> >>>>>>>>> solution if
> >>>>>>>>> the nested code does indeed become a problem.
> >>>>>>>>>
> >>>>>>>>> However, the history of JS also tells us that function
> >>>>>>>>> references aren't
> >>>>>>>>> the end of the story either, and you can see that by observing that
> >>>>>>>>> there have been two follow-on eras, as they continue trying to
> >>>>>>>>> cope with
> >>>>>>>>> the consequences of living in such a callback-heavy language.
> >>>>>>>>> First, you
> >>>>>>>>> have Futures/Promises, which essentially let you convert nested
> >>>>>>>>> code to
> >>>>>>>>> method-chained code (Observables/FP is a popular variation on
> >>>>>>>>> this).
> >>>>>>>>> Most lately, you have async/await, which is an effort to apply
> >>>>>>>>> language
> >>>>>>>>> (not just API) syntax to the problem, and offer the "flattest"
> >>>>>>>>> possible
> >>>>>>>>> programming style to solve the problem (because you get back to
> >>>>>>>>> just one
> >>>>>>>>> code block per functional unit).
> >>>>>>>>>
> >>>>>>>>> Stream-processing is a different domain, and Java+KStreams is
> >>>>>>>>> nowhere
> >>>>>>>>> near as callback heavy as JS, so I don't think we have to take
> >>>>>>>>> the JS
> >>>>>>>>> story for granted, but then again, I think we can derive some
> >>>>>>>>> valuable
> >>>>>>>>> lessons by looking sideways to adjacent domains. I'm just
> >>>>>>>>> bringing this
> >>>>>>>>> up to inspire further/deeper discussion. At the same time, just
> >>>>>>>>> like JS,
> >>>>>>>>> we can afford to take an iterative approach to the problem.
> >>>>>>>>>
> >>>>>>>>> Separately again, I'm interested in the post-branch merge (and
> >>>>>>>>> I'd also
> >>>>>>>>> add join) problem that Paul brought up. We can clearly punt on
> >>>>>>>>> it, by
> >>>>>>>>> terminating the nested branches with sink operators. But is
> >>>>>>>>> there a DSL
> >>>>>>>>> way to do it?
> >>>>>>>>>
> >>>>>>>>> Thanks again for your driving this,
> >>>>>>>>> -John
> >>>>>>>>>
> >>>>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com
> >>>>>>>>> <mailto:pgwha...@gmail.com>> wrote:
> >>>>>>>>>
> >>>>>>>>>       Ivan, I’ll definitely forfeit my point on the clumsiness of
> >>>>>>>>> the
> >>>>>>>>>       branch(predicate, consumer) solution, I don’t see any real
> >>>>>>>>> drawbacks
> >>>>>>>>>       for the dynamic case.
> >>>>>>>>>
> >>>>>>>>>       IMO the one trade off to consider at this point is the scope
> >>>>>>>>>       question. I don’t know if I totally agree that “we rarely
> >>>>>>>>> need them
> >>>>>>>>>       in the same scope” since merging the branches back together
> >>>>>>>>> later
> >>>>>>>>>       seems like a perfectly plausible use case that can be a lot
> >>>>>>>>> nicer
> >>>>>>>>>       when the branched streams are in the same scope. That being
> >>>>>>>>> said,
> >>>>>>>>>       for the reasons Ivan listed, I think it is overall the better
> >>>>>>>>>       solution - working around the scope thing is easy enough if
> >>>>>>>>> you need
> >>>>>>>>>       to.
> >>>>>>>>>
> >>>>>>>>>       > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
> >>>>>>>>>       <iponoma...@mail.ru.invalid> wrote:
> >>>>>>>>>       >
> >>>>>>>>>       > Hello everyone, thank you all for joining the discussion!
> >>>>>>>>>       >
> >>>>>>>>>       > Well, I don't think the idea of named branches, be it a
> >>>>>>>>>       LinkedHashMap (no other Map will do, because order of
> >>>>>>>>> definition
> >>>>>>>>>       matters) or `branch` method  taking name and Consumer has more
> >>>>>>>>>       advantages than drawbacks.
> >>>>>>>>>       >
> >>>>>>>>>       > In my opinion, the only real positive outcome from Michael's
> >>>>>>>>>       proposal is that all the returned branches are in the same
> >>>>>>>>> scope.
> >>>>>>>>>       But 1) we rarely need them in the same scope 2) there is a
> >>>>>>>>>       workaround for the scope problem, described in the KIP.
> >>>>>>>>>       >
> >>>>>>>>>       > 'Inlining the complex logic' is not a problem, because we
> >>>>>>>>> can use
> >>>>>>>>>       method references instead of lambdas. In real world
> >>>>>>>>> scenarios you
> >>>>>>>>>       tend to split the complex logic to methods anyway, so the
> >>>>>>>>> code is
> >>>>>>>>>       going to be clean.
> >>>>>>>>>       >
> >>>>>>>>>       > The drawbacks are strong. The cohesion between predicates
> >>>>>>>>> and
> >>>>>>>>>       handlers is lost. We have to define predicates in one
> >>>>>>>>> place, and
> >>>>>>>>>       handlers in another. This opens the door for bugs:
> >>>>>>>>>       >
> >>>>>>>>>       > - what if we forget to define a handler for a name? or a
> >>>>>>>>> name for
> >>>>>>>>>       a handler?
> >>>>>>>>>       > - what if we misspell a name?
> >>>>>>>>>       > - what if we copy-paste and duplicate a name?
> >>>>>>>>>       >
> >>>>>>>>>       > What Michael propose would have been totally OK if we had
> >>>>>>>>> been
> >>>>>>>>>       writing the API in Lua, Ruby or Python. In those languages the
> >>>>>>>>>       "dynamic naming" approach would have looked most concise and
> >>>>>>>>>       beautiful. But in Java we expect all the problems related to
> >>>>>>>>>       identifiers to be eliminated in compile time.
> >>>>>>>>>       >
> >>>>>>>>>       > Do we have to invent duck-typing for the Java API?
> >>>>>>>>>       >
> >>>>>>>>>       > And if we do, what advantage are we supposed to get
> >>>>>>>>> besides having
> >>>>>>>>>       all the branches in the same scope? Michael, maybe I'm
> >>>>>>>>> missing your
> >>>>>>>>>       point?
> >>>>>>>>>       >
> >>>>>>>>>       > ---
> >>>>>>>>>       >
> >>>>>>>>>       > Earlier in this discussion John Roesler also proposed to do
> >>>>>>>>>       without "start branching" operator, and later Paul
> >>>>>>>>> mentioned that in
> >>>>>>>>>       the case when we have to add a dynamic number of branches, the
> >>>>>>>>>       current KIP is 'clumsier' compared to Michael's 'Map'
> >>>>>>>>> solution. Let
> >>>>>>>>>       me address both comments here.
> >>>>>>>>>       >
> >>>>>>>>>       > 1) "Start branching" operator (I think that *split* is a
> >>>>>>>>> good name
> >>>>>>>>>       for it indeed) is critical when we need to do a dynamic
> >>>>>>>>> branching,
> >>>>>>>>>       see example below.
> >>>>>>>>>       >
> >>>>>>>>>       > 2) No, dynamic branching in current KIP is not clumsy at
> >>>>>>>>> all.
> >>>>>>>>>       Imagine a real-world scenario when you need one branch per
> >>>>>>>>> enum
> >>>>>>>>>       value (say, RecordType). You can have something like this:
> >>>>>>>>>       >
> >>>>>>>>>       > /*John:if we had to start with stream.branch(...) here,
> >>>>>>>>> it would
> >>>>>>>>>       have been much messier.*/
> >>>>>>>>>       > KBranchedStream branched = stream.split();
> >>>>>>>>>       >
> >>>>>>>>>       > /*Not clumsy at all :-)*/
> >>>>>>>>>       > for (RecordType recordType : RecordType.values())
> >>>>>>>>>       >             branched = branched.branch((k, v) ->
> >>>>>>>>> v.getRecType() ==
> >>>>>>>>>       recordType,
> >>>>>>>>>       >                     recordType::processRecords);
> >>>>>>>>>       >
> >>>>>>>>>       > Regards,
> >>>>>>>>>       >
> >>>>>>>>>       > Ivan
> >>>>>>>>>       >
> >>>>>>>>>       >
> >>>>>>>>>       > 02.05.2019 14:40, Matthias J. Sax пишет:
> >>>>>>>>>       >> I also agree with Michael's observation about the core
> >>>>>>>>> problem of
> >>>>>>>>>       >> current `branch()` implementation.
> >>>>>>>>>       >>
> >>>>>>>>>       >> However, I also don't like to pass in a clumsy Map
> >>>>>>>>> object. My
> >>>>>>>>>       thinking
> >>>>>>>>>       >> was more aligned with Paul's proposal to just add a name
> >>>>>>>>> to each
> >>>>>>>>>       >> `branch()` statement and return a `Map<String,KStream>`.
> >>>>>>>>>       >>
> >>>>>>>>>       >> It makes the code easier to read, and also make the
> >>>>>>>>> order of
> >>>>>>>>>       >> `Predicates` (that is essential) easier to grasp.
> >>>>>>>>>       >>
> >>>>>>>>>       >>>>>> Map<String, KStream<K, V>> branches = stream.split()
> >>>>>>>>>       >>>>>>    .branch("branchOne", Predicate<K, V>)
> >>>>>>>>>       >>>>>>    .branch( "branchTwo", Predicate<K, V>)
> >>>>>>>>>       >>>>>>    .defaultBranch("defaultBranch");
> >>>>>>>>>       >> An open question is the case for which no
> >>>>>>>>> defaultBranch() should
> >>>>>>>> be
> >>>>>>>>>       >> specified. Atm, `split()` and `branch()` would return
> >>>>>>>>>       `BranchedKStream`
> >>>>>>>>>       >> and the call to `defaultBranch()` that returns the `Map` is
> >>>>>>>> mandatory
> >>>>>>>>>       >> (what is not the case atm). Or is this actually not a real
> >>>>>>>> problem,
> >>>>>>>>>       >> because users can just ignore the branch returned by
> >>>>>>>>>       `defaultBranch()`
> >>>>>>>>>       >> in the result `Map` ?
> >>>>>>>>>       >>
> >>>>>>>>>       >>
> >>>>>>>>>       >> About "inlining": So far, it seems to be a matter of
> >>>>>>>>> personal
> >>>>>>>>>       >> preference. I can see arguments for both, but no "killer
> >>>>>>>>>       argument" yet
> >>>>>>>>>       >> that clearly make the case for one or the other.
> >>>>>>>>>       >>
> >>>>>>>>>       >>
> >>>>>>>>>       >> -Matthias
> >>>>>>>>>       >>
> >>>>>>>>>       >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
> >>>>>>>>>       >>> Perhaps inlining is the wrong terminology. It doesn’t
> >>>>>>>>> require
> >>>>>>>>>       that a lambda with the full downstream topology be defined
> >>>>>>>>> inline -
> >>>>>>>>>       it can be a method reference as with Ivan’s original
> >>>>>>>>> suggestion.
> >>>>>>>>>       The advantage of putting the predicate and its downstream
> >>>>>>>>> logic
> >>>>>>>>>       (Consumer) together in branch() is that they are required
> >>>>>>>>> to be near
> >>>>>>>>>       to each other.
> >>>>>>>>>       >>>
> >>>>>>>>>       >>> Ultimately the downstream code has to live somewhere,
> >>>>>>>>> and deep
> >>>>>>>>>       branch trees will be hard to read regardless.
> >>>>>>>>>       >>>
> >>>>>>>>>       >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
> >>>>>>>>>       <michael.droga...@confluent.io
> >>>>>>>>>       <mailto:michael.droga...@confluent.io>> wrote:
> >>>>>>>>>       >>>>
> >>>>>>>>>       >>>> I'm less enthusiastic about inlining the branch logic
> >>>>>>>>> with its
> >>>>>>>>>       downstream
> >>>>>>>>>       >>>> functionality. Programs that have deep branch trees will
> >>>>>>>>>       quickly become
> >>>>>>>>>       >>>> harder to read as a single unit.
> >>>>>>>>>       >>>>
> >>>>>>>>>       >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
> >>>>>>>>>       <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote:
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Also +1 on the issues/goals as Michael outlined them,
> >>>>>>>>> I think
> >>>>>>>>>       that sets a
> >>>>>>>>>       >>>>> great framework for the discussion.
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Regarding the SortedMap solution, my understanding is
> >>>>>>>>> that the
> >>>>>>>>>       current
> >>>>>>>>>       >>>>> proposal in the KIP is what is in my PR which
> >>>>>>>>> (pending naming
> >>>>>>>>>       decisions) is
> >>>>>>>>>       >>>>> roughly this:
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> stream.split()
> >>>>>>>>>       >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >>>>>>>>>       >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >>>>>>>>>       >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Obviously some ordering is necessary, since branching
> >>>>>>>>> as a
> >>>>>>>>>       construct
> >>>>>>>>>       >>>>> doesn't work without it, but this solution seems like it
> >>>>>>>>>       provides as much
> >>>>>>>>>       >>>>> associativity as the SortedMap solution, because each
> >>>>>>>>> branch()
> >>>>>>>>>       call
> >>>>>>>>>       >>>>> directly associates the "conditional" with the "code
> >>>>>>>>> block."
> >>>>>>>>>       The value it
> >>>>>>>>>       >>>>> provides over the KIP solution is the accessing of
> >>>>>>>>> streams in
> >>>>>>>>>       the same
> >>>>>>>>>       >>>>> scope.
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> The KIP solution is less "dynamic" than the SortedMap
> >>>>>>>>> solution
> >>>>>>>>>       in the sense
> >>>>>>>>>       >>>>> that it is slightly clumsier to add a dynamic number of
> >>>>>>>>>       branches, but it is
> >>>>>>>>>       >>>>> certainly possible.  It seems to me like the API
> >>>>>>>>> should favor
> >>>>>>>>>       the "static"
> >>>>>>>>>       >>>>> case anyway, and should make it simple and readable to
> >>>>>>>>>       fluently declare and
> >>>>>>>>>       >>>>> access your branches in-line.  It also makes it
> >>>>>>>>> impossible to
> >>>>>>>>>       ignore a
> >>>>>>>>>       >>>>> branch, and it is possible to build an (almost)
> >>>>>>>>> identical
> >>>>>>>>>       SortedMap
> >>>>>>>>>       >>>>> solution on top of it.
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> I could also see a middle ground where instead of a raw
> >>>>>>>>>       SortedMap being
> >>>>>>>>>       >>>>> taken in, branch() takes a name and not a Consumer.
> >>>>>>>>> Something
> >>>>>>>>>       like this:
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Map<String, KStream<K, V>> branches = stream.split()
> >>>>>>>>>       >>>>>    .branch("branchOne", Predicate<K, V>)
> >>>>>>>>>       >>>>>    .branch( "branchTwo", Predicate<K, V>)
> >>>>>>>>>       >>>>>    .defaultBranch("defaultBranch",
> >>>>>>>>> Consumer<KStream<K, V>>);
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Pros for that solution:
> >>>>>>>>>       >>>>> - accessing branched KStreams in same scope
> >>>>>>>>>       >>>>> - no double brace initialization, hopefully slightly
> >>>>>>>>> more
> >>>>>>>>>       readable than
> >>>>>>>>>       >>>>> SortedMap
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Cons
> >>>>>>>>>       >>>>> - downstream branch logic cannot be specified inline
> >>>>>>>>> which
> >>>>>>>>>       makes it harder
> >>>>>>>>>       >>>>> to read top to bottom (like existing API and
> >>>>>>>>> SortedMap, but
> >>>>>>>>>       unlike the KIP)
> >>>>>>>>>       >>>>> - you can forget to "handle" one of the branched
> >>>>>>>>> streams (like
> >>>>>>>>>       existing
> >>>>>>>>>       >>>>> API and SortedMap, but unlike the KIP)
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> (KBranchedStreams could even work *both* ways but
> >>>>>>>>> perhaps
> >>>>>>>>>       that's overdoing
> >>>>>>>>>       >>>>> it).
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Overall I'm curious how important it is to be able to
> >>>>>>>>> easily
> >>>>>>>>>       access the
> >>>>>>>>>       >>>>> branched KStream in the same scope as the original.
> >>>>>>>>> It's
> >>>>>>>>>       possible that it
> >>>>>>>>>       >>>>> doesn't need to be handled directly by the API, but
> >>>>>>>>> instead
> >>>>>>>>>       left up to the
> >>>>>>>>>       >>>>> user.  I'm sort of in the middle on it.
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> Paul
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
> >>>>>>>>>       <sop...@confluent.io <mailto:sop...@confluent.io>>
> >>>>>>>>>       >>>>> wrote:
> >>>>>>>>>       >>>>>
> >>>>>>>>>       >>>>>> I'd like to +1 what Michael said about the issues
> >>>>>>>>> with the
> >>>>>>>>>       existing
> >>>>>>>>>       >>>>> branch
> >>>>>>>>>       >>>>>> method, I agree with what he's outlined and I think
> >>>>>>>>> we should
> >>>>>>>>>       proceed by
> >>>>>>>>>       >>>>>> trying to alleviate these problems. Specifically it
> >>>>>>>>> seems
> >>>>>>>>>       important to be
> >>>>>>>>>       >>>>>> able to cleanly access the individual branches (eg
> >>>>>>>>> by mapping
> >>>>>>>>>       >>>>>> name->stream), which I thought was the original
> >>>>>>>>> intention of
> >>>>>>>>>       this KIP.
> >>>>>>>>>       >>>>>>
> >>>>>>>>>       >>>>>> That said, I don't think we should so easily give in
> >>>>>>>>> to the
> >>>>>>>>>       double brace
> >>>>>>>>>       >>>>>> anti-pattern or force ours users into it if at all
> >>>>>>>>> possible to
> >>>>>>>>>       >>>>> avoid...just
> >>>>>>>>>       >>>>>> my two cents.
> >>>>>>>>>       >>>>>>
> >>>>>>>>>       >>>>>> Cheers,
> >>>>>>>>>       >>>>>> Sophie
> >>>>>>>>>       >>>>>>
> >>>>>>>>>       >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> >>>>>>>>>       >>>>>> michael.droga...@confluent.io
> >>>>>>>>>       <mailto:michael.droga...@confluent.io>> wrote:
> >>>>>>>>>       >>>>>>
> >>>>>>>>>       >>>>>>> I’d like to propose a different way of thinking
> >>>>>>>>> about this.
> >>>>>>>>>       To me,
> >>>>>>>>>       >>>>> there
> >>>>>>>>>       >>>>>>> are three problems with the existing branch signature:
> >>>>>>>>>       >>>>>>>
> >>>>>>>>>       >>>>>>> 1. If you use it the way most people do, Java
> >>>>>>>>> raises unsafe
> >>>>>>>> type
> >>>>>>>>>       >>>>>> warnings.
> >>>>>>>>>       >>>>>>> 2. The way in which you use the stream branches is
> >>>>>>>>>       positionally coupled
> >>>>>>>>>       >>>>>> to
> >>>>>>>>>       >>>>>>> the ordering of the conditionals.
> >>>>>>>>>       >>>>>>> 3. It is brittle to extend existing branch calls with
> >>>>>>>>>       additional code
> >>>>>>>>>       >>>>>>> paths.
> >>>>>>>>>       >>>>>>>
> >>>>>>>>>       >>>>>>> Using associative constructs instead of relying on
> >>>>>>>>> ordered
> >>>>>>>>>       constructs
> >>>>>>>>>       >>>>>> would
> >>>>>>>>>       >>>>>>> be a stronger approach. Consider a signature that
> >>>>>>>>> instead
> >>>>>>>>>       looks like
> >>>>>>>>>       >>>>>> this:
> >>>>>>>>>       >>>>>>> Map<String, KStream<K,V>>
> >>>>>>>>> KStream#branch(SortedMap<String,
> >>>>>>>>>       Predicate<?
> >>>>>>>>>       >>>>>>> super K,? super V>>);
> >>>>>>>>>       >>>>>>>
> >>>>>>>>>       >>>>>>> Branches are given names in a map, and as a result,
> >>>>>>>>> the API
> >>>>>>>>>       returns a
> >>>>>>>>>       >>>>>>> mapping of names to streams. The ordering of the
> >>>>>>>> conditionals is
> >>>>>>>>>       >>>>>> maintained
> >>>>>>>>>       >>>>>>> because it’s a sorted map. Insert order determines
> >>>>>>>>> the order
> >>>>>>>> of
> >>>>>>>>>       >>>>>> evaluation.
> >>>>>>>>>       >>>>>>> This solves problem 1 because there are no more
> >>>>>>>>> varargs. It
> >>>>>>>>>       solves
> >>>>>>>>>       >>>>>> problem
> >>>>>>>>>       >>>>>>> 2 because you no longer lean on ordering to access the
> >>>>>>>>>       branch you’re
> >>>>>>>>>       >>>>>>> interested in. It solves problem 3 because you can
> >>>>>>>>> introduce
> >>>>>>>>>       another
> >>>>>>>>>       >>>>>>> conditional by simply attaching another name to the
> >>>>>>>>>       structure, rather
> >>>>>>>>>       >>>>>> than
> >>>>>>>>>       >>>>>>> messing with the existing indices.
> >>>>>>>>>       >>>>>>>
> >>>>>>>>>       >>>>>>> One of the drawbacks is that creating the map
> >>>>>>>>> inline is
> >>>>>>>>>       historically
> >>>>>>>>>       >>>>>>> awkward in Java. I know it’s an anti-pattern to use
> >>>>>>>>>       voluminously, but
> >>>>>>>>>       >>>>>>> double brace initialization would clean up the
> >>>>>>>>> aesthetics.
> >>>>>>>>>       >>>>>>>
> >>>>>>>>>       >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
> >>>>>>>>>       <j...@confluent.io <mailto:j...@confluent.io>>
> >>>>>>>>>       >>>>> wrote:
> >>>>>>>>>       >>>>>>>> Hi Ivan,
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>> Thanks for the update.
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>> FWIW, I agree with Matthias that the current "start
> >>>>>>>> branching"
> >>>>>>>>>       >>>>> operator
> >>>>>>>>>       >>>>>>> is
> >>>>>>>>>       >>>>>>>> confusing when named the same way as the actual
> >>>>>>>>> branches.
> >>>>>>>>>       "Split"
> >>>>>>>>>       >>>>> seems
> >>>>>>>>>       >>>>>>>> like a good name. Alternatively, we can do without
> >>>>>>>>> a "start
> >>>>>>>>>       >>>>> branching"
> >>>>>>>>>       >>>>>>>> operator at all, and just do:
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>> stream
> >>>>>>>>>       >>>>>>>>      .branch(Predicate)
> >>>>>>>>>       >>>>>>>>      .branch(Predicate)
> >>>>>>>>>       >>>>>>>>      .defaultBranch();
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>> Tentatively, I think that this branching operation
> >>>>>>>>> should be
> >>>>>>>>>       >>>>> terminal.
> >>>>>>>>>       >>>>>>> That
> >>>>>>>>>       >>>>>>>> way, we don't create ambiguity about how to use
> >>>>>>>>> it. That
> >>>>>>>>>       is, `branch`
> >>>>>>>>>       >>>>>>>> should return `KBranchedStream`, while
> >>>>>>>>> `defaultBranch` is
> >>>>>>>>>       `void`, to
> >>>>>>>>>       >>>>>>>> enforce that it comes last, and that there is only
> >>>>>>>>> one
> >>>>>>>>>       definition of
> >>>>>>>>>       >>>>>> the
> >>>>>>>>>       >>>>>>>> default branch. Potentially, we should log a
> >>>>>>>>> warning if
> >>>>>>>>>       there's no
> >>>>>>>>>       >>>>>>> default,
> >>>>>>>>>       >>>>>>>> and additionally log a warning (or throw an
> >>>>>>>>> exception) if a
> >>>>>>>>>       record
> >>>>>>>>>       >>>>>> falls
> >>>>>>>>>       >>>>>>>> though with no default.
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>> Thoughts?
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>> Thanks,
> >>>>>>>>>       >>>>>>>> -John
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
> >>>>>>>>>       >>>>> matth...@confluent.io <mailto:matth...@confluent.io>
> >>>>>>>>>       >>>>>>>> wrote:
> >>>>>>>>>       >>>>>>>>
> >>>>>>>>>       >>>>>>>>> Thanks for updating the KIP and your answers.
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> this is to make the name similar to String#split
> >>>>>>>>>       >>>>>>>>>>> that also returns an array, right?
> >>>>>>>>>       >>>>>>>>> The intend was to avoid name duplication. The
> >>>>>>>>> return type
> >>>>>>>>>       should
> >>>>>>>>>       >>>>>> _not_
> >>>>>>>>>       >>>>>>>>> be an array.
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> The current proposal is
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> stream.branch()
> >>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>       >>>>>>>>>      .defaultBranch();
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> IMHO, this reads a little odd, because the first
> >>>>>>>>>       `branch()` does
> >>>>>>>>>       >>>>> not
> >>>>>>>>>       >>>>>>>>> take any parameters and has different semantics
> >>>>>>>>> than the
> >>>>>>>> later
> >>>>>>>>>       >>>>>>>>> `branch()` calls. Note, that from the code
> >>>>>>>>> snippet above,
> >>>>>>>> it's
> >>>>>>>>>       >>>>> hidden
> >>>>>>>>>       >>>>>>>>> that the first call is `KStream#branch()` while
> >>>>>>>>> the others
> >>>>>>>> are
> >>>>>>>>>       >>>>>>>>> `KBranchedStream#branch()` what makes reading the
> >>>>>>>>> code
> >>>>>>>> harder.
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> Because I suggested to rename `addBranch()` ->
> >>>>>>>>> `branch()`,
> >>>>>>>>>       I though
> >>>>>>>>>       >>>>>> it
> >>>>>>>>>       >>>>>>>>> might be better to also rename `KStream#branch()`
> >>>>>>>>> to avoid
> >>>>>>>> the
> >>>>>>>>>       >>>>> naming
> >>>>>>>>>       >>>>>>>>> overlap that seems to be confusing. The following
> >>>>>>>>> reads
> >>>>>>>> much
> >>>>>>>>>       >>>>> cleaner
> >>>>>>>>>       >>>>>> to
> >>>>>>>>>       >>>>>>>> me:
> >>>>>>>>>       >>>>>>>>> stream.split()
> >>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>       >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>       >>>>>>>>>      .defaultBranch();
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> Maybe there is a better alternative to `split()`
> >>>>>>>>> though to
> >>>>>>>>>       avoid
> >>>>>>>>>       >>>>> the
> >>>>>>>>>       >>>>>>>>> naming overlap.
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> 'default' is, however, a reserved word, so
> >>>>>>>>> unfortunately
> >>>>>>>> we
> >>>>>>>>>       >>>>> cannot
> >>>>>>>>>       >>>>>>> have
> >>>>>>>>>       >>>>>>>>> a method with such name :-)
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> Bummer. Didn't consider this. Maybe we can still
> >>>>>>>>> come up
> >>>>>>>>>       with a
> >>>>>>>>>       >>>>> short
> >>>>>>>>>       >>>>>>>> name?
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> Can you add the interface `KBranchedStream` to
> >>>>>>>>> the KIP
> >>>>>>>>>       with all
> >>>>>>>>>       >>>>> it's
> >>>>>>>>>       >>>>>>>>> methods? It will be part of public API and should be
> >>>>>>>>>       contained in
> >>>>>>>>>       >>>>> the
> >>>>>>>>>       >>>>>>>>> KIP. For example, it's unclear atm, what the
> >>>>>>>>> return type of
> >>>>>>>>>       >>>>>>>>> `defaultBranch()` is.
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> You did not comment on the idea to add a
> >>>>>>>>>       `KBranchedStream#get(int
> >>>>>>>>>       >>>>>>> index)
> >>>>>>>>>       >>>>>>>>> -> KStream` method to get the individually
> >>>>>>>>>       branched-KStreams. Would
> >>>>>>>>>       >>>>>> be
> >>>>>>>>>       >>>>>>>>> nice to get your feedback about it. It seems you
> >>>>>>>>> suggest
> >>>>>>>>>       that users
> >>>>>>>>>       >>>>>>>>> would need to write custom utility code
> >>>>>>>>> otherwise, to
> >>>>>>>>>       access them.
> >>>>>>>>>       >>>>> We
> >>>>>>>>>       >>>>>>>>> should discuss the pros and cons of both
> >>>>>>>>> approaches. It
> >>>>>>>> feels
> >>>>>>>>>       >>>>>>>>> "incomplete" to me atm, if the API has no
> >>>>>>>>> built-in support
> >>>>>>>>>       to get
> >>>>>>>>>       >>>>> the
> >>>>>>>>>       >>>>>>>>> branched-KStreams directly.
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>> -Matthias
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> >>>>>>>>>       >>>>>>>>>> Hi all!
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> I have updated the KIP-418 according to the new
> >>>>>>>>> vision.
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> Matthias, thanks for your comment!
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> Renaming KStream#branch() -> #split()
> >>>>>>>>>       >>>>>>>>>> I can see your point: this is to make the name
> >>>>>>>>> similar to
> >>>>>>>>>       >>>>>>> String#split
> >>>>>>>>>       >>>>>>>>>> that also returns an array, right? But is it
> >>>>>>>>> worth the
> >>>>>>>>>       loss of
> >>>>>>>>>       >>>>>>>> backwards
> >>>>>>>>>       >>>>>>>>>> compatibility? We can have overloaded branch()
> >>>>>>>>> as well
> >>>>>>>>>       without
> >>>>>>>>>       >>>>>>>> affecting
> >>>>>>>>>       >>>>>>>>>> the existing code. Maybe the old array-based
> >>>>>>>>> `branch`
> >>>>>>>> method
> >>>>>>>>>       >>>>> should
> >>>>>>>>>       >>>>>>> be
> >>>>>>>>>       >>>>>>>>>> deprecated, but this is a subject for discussion.
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
> >>>>>>>>>       >>>>> BranchingKStream#branch(),
> >>>>>>>>>       >>>>>>>>>> KBranchedStream#defaultBranch() ->
> >>>>>>>> BranchingKStream#default()
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> Totally agree with 'addBranch->branch' rename.
> >>>>>>>>> 'default'
> >>>>>>>> is,
> >>>>>>>>>       >>>>>>> however, a
> >>>>>>>>>       >>>>>>>>>> reserved word, so unfortunately we cannot have a
> >>>>>>>>> method
> >>>>>>>>>       with such
> >>>>>>>>>       >>>>>>> name
> >>>>>>>>>       >>>>>>>>> :-)
> >>>>>>>>>       >>>>>>>>>>> defaultBranch() does take an `Predicate` as
> >>>>>>>>> argument,
> >>>>>>>> but I
> >>>>>>>>>       >>>>> think
> >>>>>>>>>       >>>>>>> that
> >>>>>>>>>       >>>>>>>>>> is not required?
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> Absolutely! I think that was just copy-paste
> >>>>>>>>> error or
> >>>>>>>>>       something.
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> Dear colleagues,
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> please revise the new version of the KIP and
> >>>>>>>>> Paul's PR
> >>>>>>>>>       >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> Any new suggestions/objections?
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> Ivan
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
> >>>>>>>>>       >>>>>>>>>>> Thanks for driving the discussion of this KIP.
> >>>>>>>>> It seems
> >>>>>>>> that
> >>>>>>>>>       >>>>>>> everybody
> >>>>>>>>>       >>>>>>>>>>> agrees that the current branch() method using
> >>>>>>>>> arrays is
> >>>>>>>> not
> >>>>>>>>>       >>>>>> optimal.
> >>>>>>>>>       >>>>>>>>>>> I had a quick look into the PR and I like the
> >>>>>>>>> overall
> >>>>>>>>>       proposal.
> >>>>>>>>>       >>>>>>> There
> >>>>>>>>>       >>>>>>>>>>> are some minor things we need to consider. I would
> >>>>>>>>>       recommend the
> >>>>>>>>>       >>>>>>>>>>> following renaming:
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> KStream#branch() -> #split()
> >>>>>>>>>       >>>>>>>>>>> KBranchedStream#addBranch() ->
> >>>>>>>>> BranchingKStream#branch()
> >>>>>>>>>       >>>>>>>>>>> KBranchedStream#defaultBranch() ->
> >>>>>>>>>       BranchingKStream#default()
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> It's just a suggestion to get slightly shorter
> >>>>>>>>> method
> >>>>>>>> names.
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> In the current PR, defaultBranch() does take an
> >>>>>>>>>       `Predicate` as
> >>>>>>>>>       >>>>>>>> argument,
> >>>>>>>>>       >>>>>>>>>>> but I think that is not required?
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> Also, we should consider KIP-307, that was
> >>>>>>>>> recently
> >>>>>>>>>       accepted and
> >>>>>>>>>       >>>>>> is
> >>>>>>>>>       >>>>>>>>>>> currently implemented:
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>
> >>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> >>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> Ie, we should add overloads that accepted a
> >>>>>>>>> `Named`
> >>>>>>>>>       parameter.
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> For the issue that the created `KStream` object
> >>>>>>>>> are in
> >>>>>>>>>       different
> >>>>>>>>>       >>>>>>>> scopes:
> >>>>>>>>>       >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
> >>>>>>>>>       index)` method
> >>>>>>>>>       >>>>>>> that
> >>>>>>>>>       >>>>>>>>>>> returns the corresponding "branched" result
> >>>>>>>>> `KStream`
> >>>>>>>>>       object?
> >>>>>>>>>       >>>>>> Maybe,
> >>>>>>>>>       >>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>> second argument of `addBranch()` should not be a
> >>>>>>>>>       >>>>>> `Consumer<KStream>`
> >>>>>>>>>       >>>>>>>> but
> >>>>>>>>>       >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could
> >>>>>>>>> return
> >>>>>>>>>       whatever
> >>>>>>>>>       >>>>>> the
> >>>>>>>>>       >>>>>>>>>>> `Function` returns?
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> Finally, I would also suggest to update the KIP
> >>>>>>>>> with the
> >>>>>>>>>       current
> >>>>>>>>>       >>>>>>>>>>> proposal. That makes it easier to review.
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>> -Matthias
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
> >>>>>>>>>       >>>>>>>>>>>> Ivan,
> >>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>> I'm a bit of a novice here as well, but I
> >>>>>>>>> think it
> >>>>>>>>>       makes sense
> >>>>>>>>>       >>>>>> for
> >>>>>>>>>       >>>>>>>> you
> >>>>>>>>>       >>>>>>>>> to
> >>>>>>>>>       >>>>>>>>>>>> revise the KIP and continue the discussion.
> >>>>>>>>> Obviously
> >>>>>>>>>       we'll
> >>>>>>>>>       >>>>> need
> >>>>>>>>>       >>>>>>>> some
> >>>>>>>>>       >>>>>>>>>>>> buy-in from committers that have actual
> >>>>>>>>> binding votes on
> >>>>>>>>>       >>>>> whether
> >>>>>>>>>       >>>>>>> the
> >>>>>>>>>       >>>>>>>>> KIP
> >>>>>>>>>       >>>>>>>>>>>> could be adopted.  It would be great to hear
> >>>>>>>>> if they
> >>>>>>>>>       think this
> >>>>>>>>>       >>>>>> is
> >>>>>>>>>       >>>>>>> a
> >>>>>>>>>       >>>>>>>>> good
> >>>>>>>>>       >>>>>>>>>>>> idea overall.  I'm not sure if that happens
> >>>>>>>>> just by
> >>>>>>>>>       starting a
> >>>>>>>>>       >>>>>>> vote,
> >>>>>>>>>       >>>>>>>>> or if
> >>>>>>>>>       >>>>>>>>>>>> there is generally some indication of interest
> >>>>>>>> beforehand.
> >>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>> That being said, I'll continue the discussion
> >>>>>>>>> a bit:
> >>>>>>>>>       assuming
> >>>>>>>>>       >>>>> we
> >>>>>>>>>       >>>>>> do
> >>>>>>>>>       >>>>>>>>> move
> >>>>>>>>>       >>>>>>>>>>>> forward the solution of "stream.branch() returns
> >>>>>>>>>       >>>>>> KBranchedStream",
> >>>>>>>>>       >>>>>>> do
> >>>>>>>>>       >>>>>>>>> we
> >>>>>>>>>       >>>>>>>>>>>> deprecate "stream.branch(...) returns
> >>>>>>>>> KStream[]"?  I
> >>>>>>>> would
> >>>>>>>>>       >>>>> favor
> >>>>>>>>>       >>>>>>>>>>>> deprecating, since having two mutually
> >>>>>>>>> exclusive APIs
> >>>>>>>> that
> >>>>>>>>>       >>>>>>> accomplish
> >>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>> same thing is confusing, especially when
> >>>>>>>>> they're fairly
> >>>>>>>>>       similar
> >>>>>>>>>       >>>>>>>>> anyway.  We
> >>>>>>>>>       >>>>>>>>>>>> just need to be sure we're not making something
> >>>>>>>>>       >>>>>>> impossible/difficult
> >>>>>>>>>       >>>>>>>>> that
> >>>>>>>>>       >>>>>>>>>>>> is currently possible/easy.
> >>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>> Regarding my PR - I think the general
> >>>>>>>>> structure would
> >>>>>>>> work,
> >>>>>>>>>       >>>>> it's
> >>>>>>>>>       >>>>>>>> just a
> >>>>>>>>>       >>>>>>>>>>>> little sloppy overall in terms of naming and
> >>>>>>>>> clarity. In
> >>>>>>>>>       >>>>>>> particular,
> >>>>>>>>>       >>>>>>>>>>>> passing in the "predicates" and "children"
> >>>>>>>>> lists which
> >>>>>>>> get
> >>>>>>>>>       >>>>>> modified
> >>>>>>>>>       >>>>>>>> in
> >>>>>>>>>       >>>>>>>>>>>> KBranchedStream but read from all the way
> >>>>>>>>>       KStreamLazyBranch is
> >>>>>>>>>       >>>>> a
> >>>>>>>>>       >>>>>>> bit
> >>>>>>>>>       >>>>>>>>>>>> complicated to follow.
> >>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>> Thanks,
> >>>>>>>>>       >>>>>>>>>>>> Paul
> >>>>>>>>>       >>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
> >>>>>>>>>       >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >>>>>>>>>       >>>>>>>>> wrote:
> >>>>>>>>>       >>>>>>>>>>>>> Hi Paul!
> >>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>> I read your code carefully and now I am fully
> >>>>>>>>>       convinced: your
> >>>>>>>>>       >>>>>>>> proposal
> >>>>>>>>>       >>>>>>>>>>>>> looks better and should work. We just have to
> >>>>>>>>> document
> >>>>>>>> the
> >>>>>>>>>       >>>>>> crucial
> >>>>>>>>>       >>>>>>>>> fact
> >>>>>>>>>       >>>>>>>>>>>>> that KStream consumers are invoked as they're
> >>>>>>>>> added.
> >>>>>>>>>       And then
> >>>>>>>>>       >>>>>> it's
> >>>>>>>>>       >>>>>>>> all
> >>>>>>>>>       >>>>>>>>>>>>> going to be very nice.
> >>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>> What shall we do now? I should re-write the
> >>>>>>>>> KIP and
> >>>>>>>>>       resume the
> >>>>>>>>>       >>>>>>>>>>>>> discussion here, right?
> >>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>> Why are you telling that your PR 'should not
> >>>>>>>>> be even a
> >>>>>>>>>       >>>>> starting
> >>>>>>>>>       >>>>>>>> point
> >>>>>>>>>       >>>>>>>>> if
> >>>>>>>>>       >>>>>>>>>>>>> we go in this direction'? To me it looks like
> >>>>>>>>> a good
> >>>>>>>>>       starting
> >>>>>>>>>       >>>>>>> point.
> >>>>>>>>>       >>>>>>>>> But
> >>>>>>>>>       >>>>>>>>>>>>> as a novice in this project I might miss some
> >>>>>>>>> important
> >>>>>>>>>       >>>>> details.
> >>>>>>>>>       >>>>>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>> Ivan
> >>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
> >>>>>>>>>       >>>>>>>>>>>>>> Ivan,
> >>>>>>>>>       >>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
> >>>>>>>>>       >>>>> stream.branch()
> >>>>>>>>>       >>>>>>>>> solution
> >>>>>>>>>       >>>>>>>>>>>>> supports this. The couponIssuer::set*
> >>>>>>>>> consumers will be
> >>>>>>>>>       >>>>> invoked
> >>>>>>>>>       >>>>>> as
> >>>>>>>>>       >>>>>>>>> they’re
> >>>>>>>>>       >>>>>>>>>>>>> added, not during streamsBuilder.build(). So
> >>>>>>>>> the user
> >>>>>>>>>       still
> >>>>>>>>>       >>>>>> ought
> >>>>>>>>>       >>>>>>> to
> >>>>>>>>>       >>>>>>>>> be
> >>>>>>>>>       >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward
> >>>>>>>>> and
> >>>>>>>>>       depend on
> >>>>>>>>>       >>>>> the
> >>>>>>>>>       >>>>>>>>> branched
> >>>>>>>>>       >>>>>>>>>>>>> streams having been set.
> >>>>>>>>>       >>>>>>>>>>>>>> The issue I mean to point out is that it is
> >>>>>>>>> hard to
> >>>>>>>>>       access
> >>>>>>>>>       >>>>> the
> >>>>>>>>>       >>>>>>>>> branched
> >>>>>>>>>       >>>>>>>>>>>>> streams in the same scope as the original
> >>>>>>>>> stream (that
> >>>>>>>>>       is, not
> >>>>>>>>>       >>>>>>>> inside
> >>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>> couponIssuer), which is a problem with both
> >>>>>>>>> proposed
> >>>>>>>>>       >>>>> solutions.
> >>>>>>>>>       >>>>>> It
> >>>>>>>>>       >>>>>>>>> can be
> >>>>>>>>>       >>>>>>>>>>>>> worked around though.
> >>>>>>>>>       >>>>>>>>>>>>>> [Also, great to hear additional interest in
> >>>>>>>>> 401, I’m
> >>>>>>>>>       excited
> >>>>>>>>>       >>>>> to
> >>>>>>>>>       >>>>>>>> hear
> >>>>>>>>>       >>>>>>>>>>>>> your thoughts!]
> >>>>>>>>>       >>>>>>>>>>>>>> Paul
> >>>>>>>>>       >>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
> >>>>>>>>>       >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >>>>>>>>>       >>>>>>>>> wrote:
> >>>>>>>>>       >>>>>>>>>>>>>>> Hi Paul!
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> The idea to postpone the wiring of branches
> >>>>>>>>> to the
> >>>>>>>>>       >>>>>>>>>>>>> streamsBuilder.build() also looked great for
> >>>>>>>>> me at
> >>>>>>>> first
> >>>>>>>>>       >>>>> glance,
> >>>>>>>>>       >>>>>>> but
> >>>>>>>>>       >>>>>>>>> ---
> >>>>>>>>>       >>>>>>>>>>>>>>>> the newly branched streams are not
> >>>>>>>>> available in the
> >>>>>>>>>       same
> >>>>>>>>>       >>>>>> scope
> >>>>>>>>>       >>>>>>> as
> >>>>>>>>>       >>>>>>>>> each
> >>>>>>>>>       >>>>>>>>>>>>> other.  That is, if we wanted to merge them back
> >>>>>>>> together
> >>>>>>>>>       >>>>> again
> >>>>>>>>>       >>>>>> I
> >>>>>>>>>       >>>>>>>>> don't see
> >>>>>>>>>       >>>>>>>>>>>>> a way to do that.
> >>>>>>>>>       >>>>>>>>>>>>>>> You just took the words right out of my
> >>>>>>>>> mouth, I was
> >>>>>>>>>       just
> >>>>>>>>>       >>>>>> going
> >>>>>>>>>       >>>>>>> to
> >>>>>>>>>       >>>>>>>>>>>>> write in details about this issue.
> >>>>>>>>>       >>>>>>>>>>>>>>> Consider the example from Bill's book, p.
> >>>>>>>>> 101: say
> >>>>>>>>>       we need
> >>>>>>>>>       >>>>> to
> >>>>>>>>>       >>>>>>>>> identify
> >>>>>>>>>       >>>>>>>>>>>>> customers who have bought coffee and made a
> >>>>>>>>> purchase
> >>>>>>>>>       in the
> >>>>>>>>>       >>>>>>>>> electronics
> >>>>>>>>>       >>>>>>>>>>>>> store to give them coupons.
> >>>>>>>>>       >>>>>>>>>>>>>>> This is the code I usually write under these
> >>>>>>>>>       circumstances
> >>>>>>>>>       >>>>>> using
> >>>>>>>>>       >>>>>>>> my
> >>>>>>>>>       >>>>>>>>>>>>> 'brancher' class:
> >>>>>>>>>       >>>>>>>>>>>>>>> @Setter
> >>>>>>>>>       >>>>>>>>>>>>>>> class CouponIssuer{
> >>>>>>>>>       >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
> >>>>>>>>>       >>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>   KStream<...> coupons(){
> >>>>>>>>>       >>>>>>>>>>>>>>>       return
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
> >>>>>>>>>       >>>>>>>>>>>>>>>       /*In the real world the code here can be
> >>>>>>>>>       complex, so
> >>>>>>>>>       >>>>>>>>> creation of
> >>>>>>>>>       >>>>>>>>>>>>> a separate CouponIssuer class is fully
> >>>>>>>>> justified, in
> >>>>>>>>>       order to
> >>>>>>>>>       >>>>>>>> separate
> >>>>>>>>>       >>>>>>>>>>>>> classes' responsibilities.*/
> >>>>>>>>>       >>>>>>>>>>>>>>>  }
> >>>>>>>>>       >>>>>>>>>>>>>>> }
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new
> >>>>>>>>> CouponIssuer();
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
> >>>>>>>>>       >>>>>>>>>>>>>>>     .branch(predicate1,
> >>>>>>>> couponIssuer::setCoffePurchases)
> >>>>>>>>>       >>>>>>>>>>>>>>>     .branch(predicate2,
> >>>>>>>>>       >>>>>> couponIssuer::setElectronicsPurchases)
> >>>>>>>>>       >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to
> >>>>>>>>> wire up
> >>>>>>>>>       everything
> >>>>>>>>>       >>>>>>>> later,
> >>>>>>>>>       >>>>>>>>>>>>> without the terminal operation!!!*/
> >>>>>>>>>       >>>>>>>>>>>>>>> couponIssuer.coupons()...
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> Does this make sense?  In order to properly
> >>>>>>>>>       initialize the
> >>>>>>>>>       >>>>>>>>> CouponIssuer
> >>>>>>>>>       >>>>>>>>>>>>> we need the terminal operation to be called
> >>>>>>>>> before
> >>>>>>>>>       >>>>>>>>> streamsBuilder.build()
> >>>>>>>>>       >>>>>>>>>>>>> is called.
> >>>>>>>>>       >>>>>>>>>>>>>>> [BTW Paul, I just found out that your
> >>>>>>>>> KIP-401 is
> >>>>>>>>>       essentially
> >>>>>>>>>       >>>>>> the
> >>>>>>>>>       >>>>>>>>> next
> >>>>>>>>>       >>>>>>>>>>>>> KIP I was going to write here. I have some
> >>>>>>>>> thoughts
> >>>>>>>>>       based on
> >>>>>>>>>       >>>>> my
> >>>>>>>>>       >>>>>>>>> experience,
> >>>>>>>>>       >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
> >>>>>>>>>       >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> Ivan
> >>>>>>>>>       >>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> >>>>>>>>>       >>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>       >>>>>>>>>>>>>>>> I tried to make a very rough proof of
> >>>>>>>>> concept of a
> >>>>>>>>>       fluent
> >>>>>>>>>       >>>>> API
> >>>>>>>>>       >>>>>>>> based
> >>>>>>>>>       >>>>>>>>>>>>> off of
> >>>>>>>>>       >>>>>>>>>>>>>>>> KStream here
> >>>>>>>>>       (https://github.com/apache/kafka/pull/6512),
> >>>>>>>>>       >>>>>> and
> >>>>>>>>>       >>>>>>> I
> >>>>>>>>>       >>>>>>>>> think
> >>>>>>>>>       >>>>>>>>>>>>> I
> >>>>>>>>>       >>>>>>>>>>>>>>>> succeeded at removing both cons.
> >>>>>>>>>       >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect
> >>>>>>>>> earlier about
> >>>>>>>>>       >>>>>>> compatibility
> >>>>>>>>>       >>>>>>>>>>>>> issues,
> >>>>>>>>>       >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was
> >>>>>>>>> unaware
> >>>>>>>>>       that Java
> >>>>>>>>>       >>>>> is
> >>>>>>>>>       >>>>>>>> smart
> >>>>>>>>>       >>>>>>>>>>>>> enough to
> >>>>>>>>>       >>>>>>>>>>>>>>>>    distinguish between a branch(varargs...)
> >>>>>>>>>       returning one
> >>>>>>>>>       >>>>>>> thing
> >>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>       >>>>>>>>>>>>> branch()
> >>>>>>>>>       >>>>>>>>>>>>>>>>    with no arguments returning another thing.
> >>>>>>>>>       >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't
> >>>>>>>>> actually
> >>>>>>>>>       need
> >>>>>>>>>       >>>>> it.
> >>>>>>>>>       >>>>>>> We
> >>>>>>>>>       >>>>>>>>> can
> >>>>>>>>>       >>>>>>>>>>>>> just
> >>>>>>>>>       >>>>>>>>>>>>>>>>    build up the branches in the
> >>>>>>>>> KBranchedStream who
> >>>>>>>>>       shares
> >>>>>>>>>       >>>>>> its
> >>>>>>>>>       >>>>>>>>> state
> >>>>>>>>>       >>>>>>>>>>>>> with the
> >>>>>>>>>       >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do
> >>>>>>>>> the
> >>>>>>>>>       branching.
> >>>>>>>>>       >>>>>>> It's
> >>>>>>>>>       >>>>>>>>> not
> >>>>>>>>>       >>>>>>>>>>>>> terribly
> >>>>>>>>>       >>>>>>>>>>>>>>>>    pretty in its current form, but I think it
> >>>>>>>>>       demonstrates
> >>>>>>>>>       >>>>>> its
> >>>>>>>>>       >>>>>>>>>>>>> feasibility.
> >>>>>>>>>       >>>>>>>>>>>>>>>> To be clear, I don't think that pull
> >>>>>>>>> request should
> >>>>>>>> be
> >>>>>>>>>       >>>>> final
> >>>>>>>>>       >>>>>> or
> >>>>>>>>>       >>>>>>>>> even a
> >>>>>>>>>       >>>>>>>>>>>>>>>> starting point if we go in this direction,
> >>>>>>>>> I just
> >>>>>>>>>       wanted to
> >>>>>>>>>       >>>>>> see
> >>>>>>>>>       >>>>>>>> how
> >>>>>>>>>       >>>>>>>>>>>>>>>> challenging it would be to get the API
> >>>>>>>>> working.
> >>>>>>>>>       >>>>>>>>>>>>>>>> I will say though, that I'm not sure the
> >>>>>>>>> existing
> >>>>>>>>>       solution
> >>>>>>>>>       >>>>>>> could
> >>>>>>>>>       >>>>>>>> be
> >>>>>>>>>       >>>>>>>>>>>>>>>> deprecated in favor of this, which I had
> >>>>>>>>> originally
> >>>>>>>>>       >>>>> suggested
> >>>>>>>>>       >>>>>>>> was a
> >>>>>>>>>       >>>>>>>>>>>>>>>> possibility.  The reason is that the newly
> >>>>>>>>> branched
> >>>>>>>>>       streams
> >>>>>>>>>       >>>>>> are
> >>>>>>>>>       >>>>>>>> not
> >>>>>>>>>       >>>>>>>>>>>>>>>> available in the same scope as each
> >>>>>>>>> other.  That
> >>>>>>>>>       is, if we
> >>>>>>>>>       >>>>>>> wanted
> >>>>>>>>>       >>>>>>>>> to
> >>>>>>>>>       >>>>>>>>>>>>> merge
> >>>>>>>>>       >>>>>>>>>>>>>>>> them back together again I don't see a way
> >>>>>>>>> to do
> >>>>>>>>>       that.  The
> >>>>>>>>>       >>>>>> KIP
> >>>>>>>>>       >>>>>>>>>>>>> proposal
> >>>>>>>>>       >>>>>>>>>>>>>>>> has the same issue, though - all this
> >>>>>>>>> means is that
> >>>>>>>> for
> >>>>>>>>>       >>>>>> either
> >>>>>>>>>       >>>>>>>>>>>>> solution,
> >>>>>>>>>       >>>>>>>>>>>>>>>> deprecating the existing branch(...) is
> >>>>>>>>> not on the
> >>>>>>>>>       table.
> >>>>>>>>>       >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>       >>>>>>>>>>>>>>>> Paul
> >>>>>>>>>       >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan
> >>>>>>>>> Ponomarev <
> >>>>>>>>>       >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>>
> >>>>>>>>>       >>>>>>>>>>>>> wrote:
> >>>>>>>>>       >>>>>>>>>>>>>>>>> OK, let me summarize what we have
> >>>>>>>>> discussed up to
> >>>>>>>> this
> >>>>>>>>>       >>>>>> point.
> >>>>>>>>>       >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed
> >>>>>>>>> that
> >>>>>>>>>       branch API
> >>>>>>>>>       >>>>>>> needs
> >>>>>>>>>       >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> There are two potential ways to do it:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
> >>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
> >>>>>>>>>       >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...)....
> >>>>>>>>> //onTopOf
> >>>>>>>>>       returns
> >>>>>>>>>       >>>>>> its
> >>>>>>>>>       >>>>>>>>> argument
> >>>>>>>>>       >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2)
> >>>>>>>>> The code
> >>>>>>>> won't
> >>>>>>>>>       >>>>> make
> >>>>>>>>>       >>>>>>>> sense
> >>>>>>>>>       >>>>>>>>>>>>> until
> >>>>>>>>>       >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> CONS: The need to create a
> >>>>>>>>> KafkaStreamsBrancher
> >>>>>>>>>       instance
> >>>>>>>>>       >>>>>>>>> contrasts the
> >>>>>>>>>       >>>>>>>>>>>>>>>>> fluency of other KStream methods.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> stream
> >>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or
> >>>>>>>>> noDefault(). Both
> >>>>>>>>>       >>>>>>>>> defaultBranch(..)
> >>>>>>>>>       >>>>>>>>>>>>> and
> >>>>>>>>>       >>>>>>>>>>>>>>>>> noDefault() return void
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams
> >>>>>>>>> interface
> >>>>>>>> is
> >>>>>>>>>       >>>>>> defined.
> >>>>>>>>>       >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
> >>>>>>>>>       >>>>>>>> (defaultBranch(ks->)
> >>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>       >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very
> >>>>>>>>> easy to
> >>>>>>>>>       miss the
> >>>>>>>>>       >>>>>> fact
> >>>>>>>>>       >>>>>>>>> that one
> >>>>>>>>>       >>>>>>>>>>>>>>>>> of the terminal methods should be called.
> >>>>>>>>> If these
> >>>>>>>>>       methods
> >>>>>>>>>       >>>>>> are
> >>>>>>>>>       >>>>>>>> not
> >>>>>>>>>       >>>>>>>>>>>>>>>>> called, we can throw an exception in
> >>>>>>>>> runtime.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can
> >>>>>>>>> we do
> >>>>>>>> better?
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>       >>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> Paul,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> I see your point when you are talking
> >>>>>>>>> about
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
> >>>>>>>>>       implemented the
> >>>>>>>>>       >>>>>>> easy
> >>>>>>>>>       >>>>>>>>> way.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
> >>>>>>>> assumes
> >>>>>>>>>       >>>>> nothing
> >>>>>>>>>       >>>>>>>> will
> >>>>>>>>>       >>>>>>>>>>>>> reach
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> the default branch,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> throwing an exception if such a case
> >>>>>>>>> occurs.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be
> >>>>>>>>> the only
> >>>>>>>> option
> >>>>>>>>>       >>>>>> besides
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios
> >>>>>>>>> when we
> >>>>>>>>>       want to
> >>>>>>>>>       >>>>>> just
> >>>>>>>>>       >>>>>>>>> silently
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
> >>>>>>>>>       predicate. 2)
> >>>>>>>>>       >>>>>>> Throwing
> >>>>>>>>>       >>>>>>>>> an
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> exception in the middle of data flow
> >>>>>>>>> processing
> >>>>>>>>>       looks
> >>>>>>>>>       >>>>>> like a
> >>>>>>>>>       >>>>>>>> bad
> >>>>>>>>>       >>>>>>>>>>>>> idea.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would
> >>>>>>>>> prefer to
> >>>>>>>>>       emit a
> >>>>>>>>>       >>>>>>>> special
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is
> >>>>>>>>> exactly
> >>>>>>>> where
> >>>>>>>>>       >>>>>>> `default`
> >>>>>>>>>       >>>>>>>>> can
> >>>>>>>>>       >>>>>>>>>>>>> be
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> used.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >>>>>>>>>       >>>>> InternalTopologyBuilder
> >>>>>>>>>       >>>>>>> to
> >>>>>>>>>       >>>>>>>>> track
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> dangling
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> branches that haven't been terminated
> >>>>>>>>> and raise
> >>>>>>>>>       a clear
> >>>>>>>>>       >>>>>>> error
> >>>>>>>>>       >>>>>>>>>>>>> before it
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> becomes an issue.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the
> >>>>>>>>> program is
> >>>>>>>>>       >>>>> compiled
> >>>>>>>>>       >>>>>>> and
> >>>>>>>>>       >>>>>>>>> run?
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't
> >>>>>>>>>       compile if
> >>>>>>>>>       >>>>> used
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
> >>>>>>>>>       method chain
> >>>>>>>>>       >>>>>>>> starting
> >>>>>>>>>       >>>>>>>>>>>>> from
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost
> >>>>>>>>> difference
> >>>>>>>>>       between
> >>>>>>>>>       >>>>>>>> runtime
> >>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure
> >>>>>>>>> uncovers
> >>>>>>>>>       >>>>> instantly
> >>>>>>>>>       >>>>>> on
> >>>>>>>>>       >>>>>>>>> unit
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> tests, it costs more for the project
> >>>>>>>>> than a
> >>>>>>>>>       compilation
> >>>>>>>>>       >>>>>>>> failure.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Good point about the terminal
> >>>>>>>>> operation being
> >>>>>>>>>       required.
> >>>>>>>>>       >>>>>>> But
> >>>>>>>>>       >>>>>>>> is
> >>>>>>>>>       >>>>>>>>>>>>> that
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> really
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't
> >>>>>>>>> want a
> >>>>>>>>>       >>>>>> defaultBranch
> >>>>>>>>>       >>>>>>>>> they
> >>>>>>>>>       >>>>>>>>>>>>> can
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> call
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> some other terminal method
> >>>>>>>>> (noDefaultBranch()?)
> >>>>>>>>>       just as
> >>>>>>>>>       >>>>>>>>> easily.  In
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> fact I
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a
> >>>>>>>>> nicer API
> >>>>>>>> - a
> >>>>>>>>>       >>>>> user
> >>>>>>>>>       >>>>>>>> could
> >>>>>>>>>       >>>>>>>>>>>>> specify
> >>>>>>>>>       >>>>>>>>>>>>>>>>> a
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing
> >>>>>>>>> will reach
> >>>>>>>> the
> >>>>>>>>>       >>>>>> default
> >>>>>>>>>       >>>>>>>>> branch,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case
> >>>>>>>>> occurs.
> >>>>>>>> That
> >>>>>>>>>       >>>>> seems
> >>>>>>>>>       >>>>>>> like
> >>>>>>>>>       >>>>>>>>> an
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> improvement over the current branch()
> >>>>>>>>> API,
> >>>>>>>>>       which allows
> >>>>>>>>>       >>>>>> for
> >>>>>>>>>       >>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>> more
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> subtle
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
> >>>>>>>> dropped.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> The need for a terminal operation
> >>>>>>>>> certainly has
> >>>>>>>>>       to be
> >>>>>>>>>       >>>>>> well
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> documented, but
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >>>>>>>>>       >>>>> InternalTopologyBuilder
> >>>>>>>>>       >>>>>>> to
> >>>>>>>>>       >>>>>>>>> track
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> dangling
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated
> >>>>>>>>> and raise
> >>>>>>>>>       a clear
> >>>>>>>>>       >>>>>>> error
> >>>>>>>>>       >>>>>>>>>>>>> before it
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that
> >>>>>>>>> there is
> >>>>>>>> a
> >>>>>>>>>       >>>>> "build
> >>>>>>>>>       >>>>>>>> step"
> >>>>>>>>>       >>>>>>>>>>>>> where
> >>>>>>>>>       >>>>>>>>>>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
> >>>>>>>>>       >>>>>> StreamsBuilder.build()
> >>>>>>>>>       >>>>>>> is
> >>>>>>>>>       >>>>>>>>>>>>> called.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its
> >>>>>>>>> argument, I
> >>>>>>>> agree
> >>>>>>>>>       >>>>> that
> >>>>>>>>>       >>>>>>> it's
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> critical to
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> allow users to do other operations on
> >>>>>>>>> the input
> >>>>>>>>>       stream.
> >>>>>>>>>       >>>>>>> With
> >>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>>>>>> fluent
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same
> >>>>>>>>> way all
> >>>>>>>> other
> >>>>>>>>>       >>>>>>> operations
> >>>>>>>>>       >>>>>>>>> do -
> >>>>>>>>>       >>>>>>>>>>>>> if
> >>>>>>>>>       >>>>>>>>>>>>>>>>> you
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
> >>>>>>>> multiple
> >>>>>>>>>       >>>>> times,
> >>>>>>>>>       >>>>>>> you
> >>>>>>>>>       >>>>>>>>> just
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> need the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call
> >>>>>>>>> as many
> >>>>>>>>>       operations
> >>>>>>>>>       >>>>>> on
> >>>>>>>>>       >>>>>>> it
> >>>>>>>>>       >>>>>>>>> as
> >>>>>>>>>       >>>>>>>>>>>>> you
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> desire.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Thoughts?
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> Paul
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan
> >>>>>>>>> Ponomarev <
> >>>>>>>>>       >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Hello Paul,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we
> >>>>>>>>> do not
> >>>>>>>>>       always need
> >>>>>>>>>       >>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
> >>>>>>>> operation we
> >>>>>>>>>       >>>>> don't
> >>>>>>>>>       >>>>>>>> know
> >>>>>>>>>       >>>>>>>>>>>>> when to
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its
> >>>>>>>>> argument,
> >>>>>>>>>       so we
> >>>>>>>>>       >>>>> can
> >>>>>>>>>       >>>>>> do
> >>>>>>>>>       >>>>>>>>>>>>> something
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> more with the original branch after
> >>>>>>>>> branching.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
> >>>>>>>> special
> >>>>>>>>>       >>>>> object
> >>>>>>>>>       >>>>>>>>>>>>> construction
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream
> >>>>>>>>> methods.
> >>>>>>>> But
> >>>>>>>>>       >>>>> here
> >>>>>>>>>       >>>>>> we
> >>>>>>>>>       >>>>>>>>> have a
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to
> >>>>>>>>> split the
> >>>>>>>>>       flow,
> >>>>>>>>>       >>>>> so
> >>>>>>>>>       >>>>>> I
> >>>>>>>>>       >>>>>>>>> think
> >>>>>>>>>       >>>>>>>>>>>>> this
> >>>>>>>>>       >>>>>>>>>>>>>>>>> is
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> still idiomatic.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve
> >>>>>>>>> this
> >>>>>>>>>       API, but I
> >>>>>>>>>       >>>>>> find
> >>>>>>>>>       >>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> onTopOff()
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
> >>>>>>>>>       contrasts the
> >>>>>>>>>       >>>>>>> fluency
> >>>>>>>>>       >>>>>>>>> of
> >>>>>>>>>       >>>>>>>>>>>>> other
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd
> >>>>>>>>> like to
> >>>>>>>>>       just call
> >>>>>>>>>       >>>>> a
> >>>>>>>>>       >>>>>>>>> method on
> >>>>>>>>>       >>>>>>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> stream
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if
> >>>>>>>>> the branch
> >>>>>>>>>       cases
> >>>>>>>>>       >>>>> are
> >>>>>>>>>       >>>>>>>>> defined
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> fluently.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate,
> >>>>>>>>> handleCase)
> >>>>>>>>>       is very
> >>>>>>>>>       >>>>>> nice
> >>>>>>>>>       >>>>>>>>> and the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> right
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped
> >>>>>>>>> around
> >>>>>>>>>       how we
> >>>>>>>>>       >>>>>>> specify
> >>>>>>>>>       >>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>> source
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> stream.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Like:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> stream.branch()
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
> >>>>>>>> this::handle1)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
> >>>>>>>> this::handle2)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>> .defaultBranch(this::handleDefault);
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a
> >>>>>>>>> KBranchedStreams or
> >>>>>>>>>       >>>>>>>> KStreamBrancher
> >>>>>>>>>       >>>>>>>>> or
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> something,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
> >>>>>>>>>       terminated by
> >>>>>>>>>       >>>>>>>>>>>>> defaultBranch()
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> (which
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
> >>>>>>>>>       incompatible with
> >>>>>>>>>       >>>>> the
> >>>>>>>>>       >>>>>>>>> current
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> API, so
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to
> >>>>>>>>> have a
> >>>>>>>>>       different
> >>>>>>>>>       >>>>>> name,
> >>>>>>>>>       >>>>>>>> but
> >>>>>>>>>       >>>>>>>>> that
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> seems
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we
> >>>>>>>>> could call it
> >>>>>>>>>       >>>>>> something
> >>>>>>>>>       >>>>>>>> like
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> branched()
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the
> >>>>>>>>> old API.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of
> >>>>>>>>> your
> >>>>>>>>>       KIP?  It
> >>>>>>>>>       >>>>>> seems
> >>>>>>>>>       >>>>>>>>> like it
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> does to
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line
> >>>>>>>>> branching
> >>>>>>>>>       while also
> >>>>>>>>>       >>>>>>>> allowing
> >>>>>>>>>       >>>>>>>>> you
> >>>>>>>>>       >>>>>>>>>>>>> to
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
> >>>>>>>>>       KBranchedStreams
> >>>>>>>>>       >>>>>> if
> >>>>>>>>>       >>>>>>>>> desired.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> Paul
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan
> >>>>>>>>> Ponomarev
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> void
> >>>>>>>>> handleFirstCase(KStream<String, String>
> >>>>>>>>>       ks){
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>> ks.filter(....).mapValues(...)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
> >>>>>>>>>       String> ks){
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>> ks.selectKey(...).groupByKey()...
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> ......
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String,
> >>>>>>>>> String>()
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
> >>>>>>>>>       this::handleFirstCase)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
> >>>>>>>>>       this::handleSecondCase)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
> >>>>>>>> KafkaStreamsBrancher
> >>>>>>>>>       >>>>> takes a
> >>>>>>>>>       >>>>>>>>> Consumer
> >>>>>>>>>       >>>>>>>>>>>>> as a
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>> second
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing,
> >>>>>>>>> and the
> >>>>>>>>>       example in
> >>>>>>>>>       >>>>>> the
> >>>>>>>>>       >>>>>>>> KIP
> >>>>>>>>>       >>>>>>>>>>>>> shows
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> each
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a
> >>>>>>>>> terminal node
> >>>>>>>>>       >>>>>>>>> (KafkaStreams#to()
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> in this
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> case).
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but
> >>>>>>>>> how would
> >>>>>>>> we
> >>>>>>>>>       >>>>> handle
> >>>>>>>>>       >>>>>>> the
> >>>>>>>>>       >>>>>>>>> case
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> where the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but
> >>>>>>>>> wants to
> >>>>>>>> continue
> >>>>>>>>>       >>>>>>>> processing
> >>>>>>>>>       >>>>>>>>> and
> >>>>>>>>>       >>>>>>>>>>>>> not
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on
> >>>>>>>>> the
> >>>>>>>> branched
> >>>>>>>>>       >>>>>> stream
> >>>>>>>>>       >>>>>>>>>>>>> immediately?
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic
> >>>>>>>>> as is if
> >>>>>>>>>       we had
> >>>>>>>>>       >>>>>>>> something
> >>>>>>>>>       >>>>>>>>> like
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> this:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>> branches[0].filter(....).mapValues(...)..
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>> branches[1].selectKey(...).groupByKey().....
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM
> >>>>>>>>> Bill Bejeck
> >>>>>>>> <
> >>>>>>>>>       >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> All,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the
> >>>>>>>>> discussion for
> >>>>>>>> KIP-
> >>>>>>>>>       >>>>> 418.
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
> >>>>>>>> KIP-418.
> >>>>>>>>>       >>>>> Please
> >>>>>>>>>       >>>>>>>> take
> >>>>>>>>>       >>>>>>>>> a
> >>>>>>>>>       >>>>>>>>>>>>> look
> >>>>>>>>>       >>>>>>>>>>>>>>>>> at
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would
> >>>>>>>>> appreciate any
> >>>>>>>>>       feedback :)
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
> >>>>>>>>>       >>>>>
> >>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> >>>>>>>>>       >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
> >>>>>>>>>       >>>>> https://github.com/apache/kafka/pull/6164
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>       >>>>>>>>>
> >>>>>>>>>       >
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> > 
> 
> 
>

Reply via email to