Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
> Hi everyone!
> 
> Let me revive the discussion of this KIP.
> 
> I'm very sorry for stopping my participation in the discussion in June
> 2019. My project work was very intensive then and it didn't leave me
> spare time. But I think I must finish this, because we invested
> substantial effort into this discussion and I'm not feel entitled to
> propose other things before this one is finalized.
> 
> During these months I proceeded with writing and reviewing Kafka
> Streams-related code. Every time I needed branching, Spring-Kafka's
> KafkaStreamBrancher class of my invention (the original idea for this
> KIP) worked for me -- that's another reason why I gave up pushing the
> KIP forward. When I was coming across the problem with the scope of
> branches, I worked around it this way:
> 
> AtomicReference<KStream<...>> result = new AtomicReference<>();
> new KafkaStreamBrancher<....>()
>     .branch(....)
>     .defaultBranch(result::set)
>     .onTopOf(someStream);
> result.get()...
> 
> 
> And yes, of course I don't feel very happy with this approach.
> 
> I think that Matthias came up with a bright solution in his post from
> May, 24th 2019. Let me quote it:
> 
> KStream#split() -> KBranchedStream
> // branch is not easily accessible in current scope
> KBranchedStream#branch(Predicate, Consumer<KStream>)
>   -> KBranchedStream
> // assign a name to the branch and
> // return the sub-stream to the current scope later
> //
> // can be simple as `#branch(p, s->s, "name")`
> // or also complex as `#branch(p, s->s.filter(...), "name")`
> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>   -> KBranchedStream
> // default branch is not easily accessible
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Cosumer<KStream>)
>   -> Map<String,KStream>
> // assign custom name to default-branch
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Function<KStream,KStream>, String)
>   -> Map<String,KStream>
> // assign a default name for default
> // return map of all named sub-stream into current scope
> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>   -> Map<String,KStream>
> // return map of all names sub-stream into current scope
> KBranchedStream#noDefaultBranch()
>   -> Map<String,KStream>
> 
> I believe this would satisfy everyone. Optional names seems to be a good
> idea: when you don't need to have the branches in the same scope, you
> just don't use names and you don't risk making your code brittle. Or,
> you might want to add names just for debugging purposes. Or, finally,
> you might use the returned Map to have the named branches in the
> original scope.
> 
> There also was an input from John Roesler on June 4th, 2019, who
> suggested using Named class. I can't comment on this. The idea seems
> reasonable, but in this matter I'd rather trust people who are more
> familiar with Streams API design principles than me.
> 
> Regards,
> 
> Ivan
> 
> 
> 
> 08.10.2019 1:38, Matthias J. Sax пишет:
>> I am moving this KIP into "inactive status". Feel free to resume the KIP
>> at any point.
>>
>> If anybody else is interested in picking up this KIP, feel free to do so.
>>
>>
>>
>> -Matthias
>>
>> On 7/11/19 4:00 PM, Matthias J. Sax wrote:
>>> Ivan,
>>>
>>> did you see my last reply? What do you think about my proposal to mix
>>> both approaches and try to get best-of-both worlds?
>>>
>>>
>>> -Matthias
>>>
>>> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
>>>> Thanks for the input John!
>>>>
>>>>> under your suggestion, it seems that the name is required
>>>>
>>>> If you want to get the `KStream` as part of the `Map` back using a
>>>> `Function`, yes. If you follow the "embedded chaining" pattern using a
>>>> `Consumer`, no.
>>>>
>>>> Allowing for a default name via `split()` can of course be done.
>>>> Similarly, using `Named` instead of `String` is possible.
>>>>
>>>> I wanted to sketch out a high level proposal to merge both patterns
>>>> only. Your suggestions to align the new API with the existing API make
>>>> totally sense.
>>>>
>>>>
>>>>
>>>> One follow up question: Would `Named` be optional or required in
>>>> `split()` and `branch()`? It's unclear from your example.
>>>>
>>>> If both are mandatory, what do we gain by it? The returned `Map` only
>>>> contains the corresponding branches, so why should we prefix all of
>>>> them? If only `Named` is mandatory in `branch()`, but optional in
>>>> `split()`, the same question raises?
>>>>
>>>> Requiring `Named` in `split()` seems only to make sense, if `Named` is
>>>> optional in `branch()` and we generate `-X` suffix using a counter for
>>>> different branch name. However, this might lead to the problem of
>>>> changing names if branches are added/removed. Also, how would the names
>>>> be generated if `Consumer` is mixed in (ie, not all branches are
>>>> returned in the `Map`).
>>>>
>>>> If `Named` is optional for both, it could happen that a user misses to
>>>> specify a name for a branch what would lead to runtime issues.
>>>>
>>>>
>>>> Hence, I am actually in favor to not allow a default name but keep
>>>> `split()` without parameter and make `Named` in `branch()` required
>>>> if a
>>>> `Function` is used. This makes it explicit to the user that
>>>> specifying a
>>>> name is required if a `Function` is used.
>>>>
>>>>
>>>>
>>>> About
>>>>
>>>>> KBranchedStream#branch(BranchConfig)
>>>>
>>>> I don't think that the branching predicate is a configuration and hence
>>>> would not include it in a configuration object.
>>>>
>>>>>      withChain(...);
>>>>
>>>> Similar, `withChain()` (that would only take a `Consumer`?) does not
>>>> seem to be a configuration. We can also not prevent a user to call
>>>> `withName()` in combination of `withChain()` what does not make sense
>>>> IMHO. We could of course throw an RTE but not have a compile time check
>>>> seems less appealing. Also, it could happen that neither `withChain()`
>>>> not `withName()` is called and the branch is missing in the returned
>>>> `Map` what lead to runtime issues, too.
>>>>
>>>> Hence, I don't think that we should add `BranchConfig`. A config object
>>>> is helpful if each configuration can be set independently of all
>>>> others,
>>>> but this seems not to be the case here. If we add new configuration
>>>> later, we can also just move forward by deprecating the methods that
>>>> accept `Named` and add new methods that accepted `BranchConfig` (that
>>>> would of course implement `Named`).
>>>>
>>>>
>>>> Thoughts?
>>>>
>>>>
>>>> @Ivan, what do you think about the general idea to blend the two main
>>>> approaches of returning a `Map` plus an "embedded chaining"?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 6/4/19 10:33 AM, John Roesler wrote:
>>>>> Thanks for the idea, Matthias, it does seem like this would satisfy
>>>>> everyone. Returning the map from the terminal operations also solves
>>>>> the problem of merging/joining the branched streams, if we want to add
>>>>> support for the compliment later on.
>>>>>
>>>>> Under your suggestion, it seems that the name is required. Otherwise,
>>>>> we wouldn't have keys for the map to return. I this this is actually
>>>>> not too bad, since experience has taught us that, although names for
>>>>> operations are not required to define stream processing logic, it does
>>>>> significantly improve the operational experience when you can map the
>>>>> topology, logs, metrics, etc. back to the source code. Since you
>>>>> wouldn't (have to) reference the name to chain extra processing onto
>>>>> the branch (thanks to the second argument), you can avoid the
>>>>> "unchecked name" problem that Ivan pointed out.
>>>>>
>>>>> In the current implementation of Branch, you can name the branch
>>>>> operator itself, and then all the branches get index-suffixed names
>>>>> built from the branch operator name. I guess under this proposal, we
>>>>> could naturally append the branch name to the branching operator name,
>>>>> like this:
>>>>>
>>>>>     stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>>>>                .branch(..., ..., "abranch") // creates node
>>>>> "mysplit-abranch"
>>>>>                .defaultBranch(...) // creates node "mysplit-default"
>>>>>
>>>>> It does make me wonder about the DSL syntax itself, though.
>>>>>
>>>>> We don't have a defined grammar, so there's plenty of room to debate
>>>>> the "best" syntax in the context of each operation, but in general,
>>>>> the KStream DSL operators follow this pattern:
>>>>>
>>>>>      operator(function, config_object?) OR operator(config_object)
>>>>>
>>>>> where config_object is often just Named in the "function" variant.
>>>>> Even when the config_object isn't a Named, but some other config
>>>>> class, that config class _always_ implements NamedOperation.
>>>>>
>>>>> Here, we're introducing a totally different pattern:
>>>>>
>>>>>    operator(function, function, string)
>>>>>
>>>>> where the string is the name.
>>>>> My first question is whether the name should instead be specified with
>>>>> the NamedOperation interface.
>>>>>
>>>>> My second question is whether we should just roll all these arguments
>>>>> up into a config object like:
>>>>>
>>>>>     KBranchedStream#branch(BranchConfig)
>>>>>
>>>>>     interface BranchConfig extends NamedOperation {
>>>>>      withPredicate(...);
>>>>>      withChain(...);
>>>>>      withName(...);
>>>>>    }
>>>>>
>>>>> Although I guess we'd like to call BranchConfig something more like
>>>>> "Branched", even if I don't particularly like that pattern.
>>>>>
>>>>> This makes the source code a little noisier, but it also makes us more
>>>>> future-proof, as we can deal with a wide range of alternatives purely
>>>>> in the config interface, and never have to deal with adding overloads
>>>>> to the KBranchedStream if/when we decide we want the name to be
>>>>> optional, or the KStream->KStream to be optional.
>>>>>
>>>>> WDYT?
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
>>>>> <michael.droga...@confluent.io> wrote:
>>>>>>
>>>>>> Matthias: I think that's pretty reasonable from my point of view.
>>>>>> Good
>>>>>> suggestion.
>>>>>>
>>>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax
>>>>>> <matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Interesting discussion.
>>>>>>>
>>>>>>> I am wondering, if we cannot unify the advantage of both approaches:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> KStream#split() -> KBranchedStream
>>>>>>>
>>>>>>> // branch is not easily accessible in current scope
>>>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
>>>>>>>    -> KBranchedStream
>>>>>>>
>>>>>>> // assign a name to the branch and
>>>>>>> // return the sub-stream to the current scope later
>>>>>>> //
>>>>>>> // can be simple as `#branch(p, s->s, "name")`
>>>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
>>>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>>>>>>>    -> KBranchedStream
>>>>>>>
>>>>>>> // default branch is not easily accessible
>>>>>>> // return map of all named sub-stream into current scope
>>>>>>> KBranchedStream#default(Cosumer<KStream>)
>>>>>>>    -> Map<String,KStream>
>>>>>>>
>>>>>>> // assign custom name to default-branch
>>>>>>> // return map of all named sub-stream into current scope
>>>>>>> KBranchedStream#default(Function<KStream,KStream>, String)
>>>>>>>    -> Map<String,KStream>
>>>>>>>
>>>>>>> // assign a default name for default
>>>>>>> // return map of all named sub-stream into current scope
>>>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>>>>>>>    -> Map<String,KStream>
>>>>>>>
>>>>>>> // return map of all names sub-stream into current scope
>>>>>>> KBranchedStream#noDefaultBranch()
>>>>>>>    -> Map<String,KStream>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hence, for each sub-stream, the user can pick to add a name and
>>>>>>> return
>>>>>>> the branch "result" to the calling scope or not. The
>>>>>>> implementation can
>>>>>>> also check at runtime that all returned names are unique. The
>>>>>>> returned
>>>>>>> Map can be empty and it's also optional to use the Map.
>>>>>>>
>>>>>>> To me, it seems like a good way to get best of both worlds.
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 5/6/19 5:15 PM, John Roesler wrote:
>>>>>>>> Ivan,
>>>>>>>>
>>>>>>>> That's a very good point about the "start" operator in the
>>>>>>>> dynamic case.
>>>>>>>> I had no problem with "split()"; I was just questioning the
>>>>>>>> necessity.
>>>>>>>> Since you've provided a proof of necessity, I'm in favor of the
>>>>>>>> "split()" start operator. Thanks!
>>>>>>>>
>>>>>>>> Separately, I'm interested to see where the present discussion
>>>>>>>> leads.
>>>>>>>> I've written enough Javascript code in my life to be suspicious of
>>>>>>>> nested closures. You have a good point about using method
>>>>>>>> references (or
>>>>>>>> indeed function literals also work). It should be validating
>>>>>>>> that this
>>>>>>>> was also the JS community's first approach to flattening the
>>>>>>>> logic when
>>>>>>>> their nested closure situation got out of hand. Unfortunately, it's
>>>>>>>> replacing nesting with redirection, both of which disrupt code
>>>>>>>> readability (but in different ways for different reasons). In other
>>>>>>>> words, I agree that function references is *the* first-order
>>>>>>>> solution if
>>>>>>>> the nested code does indeed become a problem.
>>>>>>>>
>>>>>>>> However, the history of JS also tells us that function
>>>>>>>> references aren't
>>>>>>>> the end of the story either, and you can see that by observing that
>>>>>>>> there have been two follow-on eras, as they continue trying to
>>>>>>>> cope with
>>>>>>>> the consequences of living in such a callback-heavy language.
>>>>>>>> First, you
>>>>>>>> have Futures/Promises, which essentially let you convert nested
>>>>>>>> code to
>>>>>>>> method-chained code (Observables/FP is a popular variation on
>>>>>>>> this).
>>>>>>>> Most lately, you have async/await, which is an effort to apply
>>>>>>>> language
>>>>>>>> (not just API) syntax to the problem, and offer the "flattest"
>>>>>>>> possible
>>>>>>>> programming style to solve the problem (because you get back to
>>>>>>>> just one
>>>>>>>> code block per functional unit).
>>>>>>>>
>>>>>>>> Stream-processing is a different domain, and Java+KStreams is
>>>>>>>> nowhere
>>>>>>>> near as callback heavy as JS, so I don't think we have to take
>>>>>>>> the JS
>>>>>>>> story for granted, but then again, I think we can derive some
>>>>>>>> valuable
>>>>>>>> lessons by looking sideways to adjacent domains. I'm just
>>>>>>>> bringing this
>>>>>>>> up to inspire further/deeper discussion. At the same time, just
>>>>>>>> like JS,
>>>>>>>> we can afford to take an iterative approach to the problem.
>>>>>>>>
>>>>>>>> Separately again, I'm interested in the post-branch merge (and
>>>>>>>> I'd also
>>>>>>>> add join) problem that Paul brought up. We can clearly punt on
>>>>>>>> it, by
>>>>>>>> terminating the nested branches with sink operators. But is
>>>>>>>> there a DSL
>>>>>>>> way to do it?
>>>>>>>>
>>>>>>>> Thanks again for your driving this,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com
>>>>>>>> <mailto:pgwha...@gmail.com>> wrote:
>>>>>>>>
>>>>>>>>      Ivan, I’ll definitely forfeit my point on the clumsiness of
>>>>>>>> the
>>>>>>>>      branch(predicate, consumer) solution, I don’t see any real
>>>>>>>> drawbacks
>>>>>>>>      for the dynamic case.
>>>>>>>>
>>>>>>>>      IMO the one trade off to consider at this point is the scope
>>>>>>>>      question. I don’t know if I totally agree that “we rarely
>>>>>>>> need them
>>>>>>>>      in the same scope” since merging the branches back together
>>>>>>>> later
>>>>>>>>      seems like a perfectly plausible use case that can be a lot
>>>>>>>> nicer
>>>>>>>>      when the branched streams are in the same scope. That being
>>>>>>>> said,
>>>>>>>>      for the reasons Ivan listed, I think it is overall the better
>>>>>>>>      solution - working around the scope thing is easy enough if
>>>>>>>> you need
>>>>>>>>      to.
>>>>>>>>
>>>>>>>>      > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
>>>>>>>>      <iponoma...@mail.ru.invalid> wrote:
>>>>>>>>      >
>>>>>>>>      > Hello everyone, thank you all for joining the discussion!
>>>>>>>>      >
>>>>>>>>      > Well, I don't think the idea of named branches, be it a
>>>>>>>>      LinkedHashMap (no other Map will do, because order of
>>>>>>>> definition
>>>>>>>>      matters) or `branch` method  taking name and Consumer has more
>>>>>>>>      advantages than drawbacks.
>>>>>>>>      >
>>>>>>>>      > In my opinion, the only real positive outcome from Michael's
>>>>>>>>      proposal is that all the returned branches are in the same
>>>>>>>> scope.
>>>>>>>>      But 1) we rarely need them in the same scope 2) there is a
>>>>>>>>      workaround for the scope problem, described in the KIP.
>>>>>>>>      >
>>>>>>>>      > 'Inlining the complex logic' is not a problem, because we
>>>>>>>> can use
>>>>>>>>      method references instead of lambdas. In real world
>>>>>>>> scenarios you
>>>>>>>>      tend to split the complex logic to methods anyway, so the
>>>>>>>> code is
>>>>>>>>      going to be clean.
>>>>>>>>      >
>>>>>>>>      > The drawbacks are strong. The cohesion between predicates
>>>>>>>> and
>>>>>>>>      handlers is lost. We have to define predicates in one
>>>>>>>> place, and
>>>>>>>>      handlers in another. This opens the door for bugs:
>>>>>>>>      >
>>>>>>>>      > - what if we forget to define a handler for a name? or a
>>>>>>>> name for
>>>>>>>>      a handler?
>>>>>>>>      > - what if we misspell a name?
>>>>>>>>      > - what if we copy-paste and duplicate a name?
>>>>>>>>      >
>>>>>>>>      > What Michael propose would have been totally OK if we had
>>>>>>>> been
>>>>>>>>      writing the API in Lua, Ruby or Python. In those languages the
>>>>>>>>      "dynamic naming" approach would have looked most concise and
>>>>>>>>      beautiful. But in Java we expect all the problems related to
>>>>>>>>      identifiers to be eliminated in compile time.
>>>>>>>>      >
>>>>>>>>      > Do we have to invent duck-typing for the Java API?
>>>>>>>>      >
>>>>>>>>      > And if we do, what advantage are we supposed to get
>>>>>>>> besides having
>>>>>>>>      all the branches in the same scope? Michael, maybe I'm
>>>>>>>> missing your
>>>>>>>>      point?
>>>>>>>>      >
>>>>>>>>      > ---
>>>>>>>>      >
>>>>>>>>      > Earlier in this discussion John Roesler also proposed to do
>>>>>>>>      without "start branching" operator, and later Paul
>>>>>>>> mentioned that in
>>>>>>>>      the case when we have to add a dynamic number of branches, the
>>>>>>>>      current KIP is 'clumsier' compared to Michael's 'Map'
>>>>>>>> solution. Let
>>>>>>>>      me address both comments here.
>>>>>>>>      >
>>>>>>>>      > 1) "Start branching" operator (I think that *split* is a
>>>>>>>> good name
>>>>>>>>      for it indeed) is critical when we need to do a dynamic
>>>>>>>> branching,
>>>>>>>>      see example below.
>>>>>>>>      >
>>>>>>>>      > 2) No, dynamic branching in current KIP is not clumsy at
>>>>>>>> all.
>>>>>>>>      Imagine a real-world scenario when you need one branch per
>>>>>>>> enum
>>>>>>>>      value (say, RecordType). You can have something like this:
>>>>>>>>      >
>>>>>>>>      > /*John:if we had to start with stream.branch(...) here,
>>>>>>>> it would
>>>>>>>>      have been much messier.*/
>>>>>>>>      > KBranchedStream branched = stream.split();
>>>>>>>>      >
>>>>>>>>      > /*Not clumsy at all :-)*/
>>>>>>>>      > for (RecordType recordType : RecordType.values())
>>>>>>>>      >             branched = branched.branch((k, v) ->
>>>>>>>> v.getRecType() ==
>>>>>>>>      recordType,
>>>>>>>>      >                     recordType::processRecords);
>>>>>>>>      >
>>>>>>>>      > Regards,
>>>>>>>>      >
>>>>>>>>      > Ivan
>>>>>>>>      >
>>>>>>>>      >
>>>>>>>>      > 02.05.2019 14:40, Matthias J. Sax пишет:
>>>>>>>>      >> I also agree with Michael's observation about the core
>>>>>>>> problem of
>>>>>>>>      >> current `branch()` implementation.
>>>>>>>>      >>
>>>>>>>>      >> However, I also don't like to pass in a clumsy Map
>>>>>>>> object. My
>>>>>>>>      thinking
>>>>>>>>      >> was more aligned with Paul's proposal to just add a name
>>>>>>>> to each
>>>>>>>>      >> `branch()` statement and return a `Map<String,KStream>`.
>>>>>>>>      >>
>>>>>>>>      >> It makes the code easier to read, and also make the
>>>>>>>> order of
>>>>>>>>      >> `Predicates` (that is essential) easier to grasp.
>>>>>>>>      >>
>>>>>>>>      >>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>>>>      >>>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>>>>      >>>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>>>>      >>>>>>    .defaultBranch("defaultBranch");
>>>>>>>>      >> An open question is the case for which no
>>>>>>>> defaultBranch() should
>>>>>>> be
>>>>>>>>      >> specified. Atm, `split()` and `branch()` would return
>>>>>>>>      `BranchedKStream`
>>>>>>>>      >> and the call to `defaultBranch()` that returns the `Map` is
>>>>>>> mandatory
>>>>>>>>      >> (what is not the case atm). Or is this actually not a real
>>>>>>> problem,
>>>>>>>>      >> because users can just ignore the branch returned by
>>>>>>>>      `defaultBranch()`
>>>>>>>>      >> in the result `Map` ?
>>>>>>>>      >>
>>>>>>>>      >>
>>>>>>>>      >> About "inlining": So far, it seems to be a matter of
>>>>>>>> personal
>>>>>>>>      >> preference. I can see arguments for both, but no "killer
>>>>>>>>      argument" yet
>>>>>>>>      >> that clearly make the case for one or the other.
>>>>>>>>      >>
>>>>>>>>      >>
>>>>>>>>      >> -Matthias
>>>>>>>>      >>
>>>>>>>>      >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>>>>>>>      >>> Perhaps inlining is the wrong terminology. It doesn’t
>>>>>>>> require
>>>>>>>>      that a lambda with the full downstream topology be defined
>>>>>>>> inline -
>>>>>>>>      it can be a method reference as with Ivan’s original
>>>>>>>> suggestion.
>>>>>>>>      The advantage of putting the predicate and its downstream
>>>>>>>> logic
>>>>>>>>      (Consumer) together in branch() is that they are required
>>>>>>>> to be near
>>>>>>>>      to each other.
>>>>>>>>      >>>
>>>>>>>>      >>> Ultimately the downstream code has to live somewhere,
>>>>>>>> and deep
>>>>>>>>      branch trees will be hard to read regardless.
>>>>>>>>      >>>
>>>>>>>>      >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
>>>>>>>>      <michael.droga...@confluent.io
>>>>>>>>      <mailto:michael.droga...@confluent.io>> wrote:
>>>>>>>>      >>>>
>>>>>>>>      >>>> I'm less enthusiastic about inlining the branch logic
>>>>>>>> with its
>>>>>>>>      downstream
>>>>>>>>      >>>> functionality. Programs that have deep branch trees will
>>>>>>>>      quickly become
>>>>>>>>      >>>> harder to read as a single unit.
>>>>>>>>      >>>>
>>>>>>>>      >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
>>>>>>>>      <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote:
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Also +1 on the issues/goals as Michael outlined them,
>>>>>>>> I think
>>>>>>>>      that sets a
>>>>>>>>      >>>>> great framework for the discussion.
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Regarding the SortedMap solution, my understanding is
>>>>>>>> that the
>>>>>>>>      current
>>>>>>>>      >>>>> proposal in the KIP is what is in my PR which
>>>>>>>> (pending naming
>>>>>>>>      decisions) is
>>>>>>>>      >>>>> roughly this:
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> stream.split()
>>>>>>>>      >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>>>>      >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>>>>      >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Obviously some ordering is necessary, since branching
>>>>>>>> as a
>>>>>>>>      construct
>>>>>>>>      >>>>> doesn't work without it, but this solution seems like it
>>>>>>>>      provides as much
>>>>>>>>      >>>>> associativity as the SortedMap solution, because each
>>>>>>>> branch()
>>>>>>>>      call
>>>>>>>>      >>>>> directly associates the "conditional" with the "code
>>>>>>>> block."
>>>>>>>>      The value it
>>>>>>>>      >>>>> provides over the KIP solution is the accessing of
>>>>>>>> streams in
>>>>>>>>      the same
>>>>>>>>      >>>>> scope.
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> The KIP solution is less "dynamic" than the SortedMap
>>>>>>>> solution
>>>>>>>>      in the sense
>>>>>>>>      >>>>> that it is slightly clumsier to add a dynamic number of
>>>>>>>>      branches, but it is
>>>>>>>>      >>>>> certainly possible.  It seems to me like the API
>>>>>>>> should favor
>>>>>>>>      the "static"
>>>>>>>>      >>>>> case anyway, and should make it simple and readable to
>>>>>>>>      fluently declare and
>>>>>>>>      >>>>> access your branches in-line.  It also makes it
>>>>>>>> impossible to
>>>>>>>>      ignore a
>>>>>>>>      >>>>> branch, and it is possible to build an (almost)
>>>>>>>> identical
>>>>>>>>      SortedMap
>>>>>>>>      >>>>> solution on top of it.
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> I could also see a middle ground where instead of a raw
>>>>>>>>      SortedMap being
>>>>>>>>      >>>>> taken in, branch() takes a name and not a Consumer. 
>>>>>>>> Something
>>>>>>>>      like this:
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>>>>      >>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>>>>      >>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>>>>      >>>>>    .defaultBranch("defaultBranch",
>>>>>>>> Consumer<KStream<K, V>>);
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Pros for that solution:
>>>>>>>>      >>>>> - accessing branched KStreams in same scope
>>>>>>>>      >>>>> - no double brace initialization, hopefully slightly
>>>>>>>> more
>>>>>>>>      readable than
>>>>>>>>      >>>>> SortedMap
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Cons
>>>>>>>>      >>>>> - downstream branch logic cannot be specified inline
>>>>>>>> which
>>>>>>>>      makes it harder
>>>>>>>>      >>>>> to read top to bottom (like existing API and
>>>>>>>> SortedMap, but
>>>>>>>>      unlike the KIP)
>>>>>>>>      >>>>> - you can forget to "handle" one of the branched
>>>>>>>> streams (like
>>>>>>>>      existing
>>>>>>>>      >>>>> API and SortedMap, but unlike the KIP)
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> (KBranchedStreams could even work *both* ways but
>>>>>>>> perhaps
>>>>>>>>      that's overdoing
>>>>>>>>      >>>>> it).
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Overall I'm curious how important it is to be able to
>>>>>>>> easily
>>>>>>>>      access the
>>>>>>>>      >>>>> branched KStream in the same scope as the original. 
>>>>>>>> It's
>>>>>>>>      possible that it
>>>>>>>>      >>>>> doesn't need to be handled directly by the API, but
>>>>>>>> instead
>>>>>>>>      left up to the
>>>>>>>>      >>>>> user.  I'm sort of in the middle on it.
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> Paul
>>>>>>>>      >>>>>
>>>>>>>>      >>>>>
>>>>>>>>      >>>>>
>>>>>>>>      >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
>>>>>>>>      <sop...@confluent.io <mailto:sop...@confluent.io>>
>>>>>>>>      >>>>> wrote:
>>>>>>>>      >>>>>
>>>>>>>>      >>>>>> I'd like to +1 what Michael said about the issues
>>>>>>>> with the
>>>>>>>>      existing
>>>>>>>>      >>>>> branch
>>>>>>>>      >>>>>> method, I agree with what he's outlined and I think
>>>>>>>> we should
>>>>>>>>      proceed by
>>>>>>>>      >>>>>> trying to alleviate these problems. Specifically it
>>>>>>>> seems
>>>>>>>>      important to be
>>>>>>>>      >>>>>> able to cleanly access the individual branches (eg
>>>>>>>> by mapping
>>>>>>>>      >>>>>> name->stream), which I thought was the original
>>>>>>>> intention of
>>>>>>>>      this KIP.
>>>>>>>>      >>>>>>
>>>>>>>>      >>>>>> That said, I don't think we should so easily give in
>>>>>>>> to the
>>>>>>>>      double brace
>>>>>>>>      >>>>>> anti-pattern or force ours users into it if at all
>>>>>>>> possible to
>>>>>>>>      >>>>> avoid...just
>>>>>>>>      >>>>>> my two cents.
>>>>>>>>      >>>>>>
>>>>>>>>      >>>>>> Cheers,
>>>>>>>>      >>>>>> Sophie
>>>>>>>>      >>>>>>
>>>>>>>>      >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>>>>>>      >>>>>> michael.droga...@confluent.io
>>>>>>>>      <mailto:michael.droga...@confluent.io>> wrote:
>>>>>>>>      >>>>>>
>>>>>>>>      >>>>>>> I’d like to propose a different way of thinking
>>>>>>>> about this.
>>>>>>>>      To me,
>>>>>>>>      >>>>> there
>>>>>>>>      >>>>>>> are three problems with the existing branch signature:
>>>>>>>>      >>>>>>>
>>>>>>>>      >>>>>>> 1. If you use it the way most people do, Java
>>>>>>>> raises unsafe
>>>>>>> type
>>>>>>>>      >>>>>> warnings.
>>>>>>>>      >>>>>>> 2. The way in which you use the stream branches is
>>>>>>>>      positionally coupled
>>>>>>>>      >>>>>> to
>>>>>>>>      >>>>>>> the ordering of the conditionals.
>>>>>>>>      >>>>>>> 3. It is brittle to extend existing branch calls with
>>>>>>>>      additional code
>>>>>>>>      >>>>>>> paths.
>>>>>>>>      >>>>>>>
>>>>>>>>      >>>>>>> Using associative constructs instead of relying on
>>>>>>>> ordered
>>>>>>>>      constructs
>>>>>>>>      >>>>>> would
>>>>>>>>      >>>>>>> be a stronger approach. Consider a signature that
>>>>>>>> instead
>>>>>>>>      looks like
>>>>>>>>      >>>>>> this:
>>>>>>>>      >>>>>>> Map<String, KStream<K,V>>
>>>>>>>> KStream#branch(SortedMap<String,
>>>>>>>>      Predicate<?
>>>>>>>>      >>>>>>> super K,? super V>>);
>>>>>>>>      >>>>>>>
>>>>>>>>      >>>>>>> Branches are given names in a map, and as a result,
>>>>>>>> the API
>>>>>>>>      returns a
>>>>>>>>      >>>>>>> mapping of names to streams. The ordering of the
>>>>>>> conditionals is
>>>>>>>>      >>>>>> maintained
>>>>>>>>      >>>>>>> because it’s a sorted map. Insert order determines
>>>>>>>> the order
>>>>>>> of
>>>>>>>>      >>>>>> evaluation.
>>>>>>>>      >>>>>>> This solves problem 1 because there are no more
>>>>>>>> varargs. It
>>>>>>>>      solves
>>>>>>>>      >>>>>> problem
>>>>>>>>      >>>>>>> 2 because you no longer lean on ordering to access the
>>>>>>>>      branch you’re
>>>>>>>>      >>>>>>> interested in. It solves problem 3 because you can
>>>>>>>> introduce
>>>>>>>>      another
>>>>>>>>      >>>>>>> conditional by simply attaching another name to the
>>>>>>>>      structure, rather
>>>>>>>>      >>>>>> than
>>>>>>>>      >>>>>>> messing with the existing indices.
>>>>>>>>      >>>>>>>
>>>>>>>>      >>>>>>> One of the drawbacks is that creating the map
>>>>>>>> inline is
>>>>>>>>      historically
>>>>>>>>      >>>>>>> awkward in Java. I know it’s an anti-pattern to use
>>>>>>>>      voluminously, but
>>>>>>>>      >>>>>>> double brace initialization would clean up the
>>>>>>>> aesthetics.
>>>>>>>>      >>>>>>>
>>>>>>>>      >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
>>>>>>>>      <j...@confluent.io <mailto:j...@confluent.io>>
>>>>>>>>      >>>>> wrote:
>>>>>>>>      >>>>>>>> Hi Ivan,
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>> Thanks for the update.
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>> FWIW, I agree with Matthias that the current "start
>>>>>>> branching"
>>>>>>>>      >>>>> operator
>>>>>>>>      >>>>>>> is
>>>>>>>>      >>>>>>>> confusing when named the same way as the actual
>>>>>>>> branches.
>>>>>>>>      "Split"
>>>>>>>>      >>>>> seems
>>>>>>>>      >>>>>>>> like a good name. Alternatively, we can do without
>>>>>>>> a "start
>>>>>>>>      >>>>> branching"
>>>>>>>>      >>>>>>>> operator at all, and just do:
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>> stream
>>>>>>>>      >>>>>>>>      .branch(Predicate)
>>>>>>>>      >>>>>>>>      .branch(Predicate)
>>>>>>>>      >>>>>>>>      .defaultBranch();
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>> Tentatively, I think that this branching operation
>>>>>>>> should be
>>>>>>>>      >>>>> terminal.
>>>>>>>>      >>>>>>> That
>>>>>>>>      >>>>>>>> way, we don't create ambiguity about how to use
>>>>>>>> it. That
>>>>>>>>      is, `branch`
>>>>>>>>      >>>>>>>> should return `KBranchedStream`, while
>>>>>>>> `defaultBranch` is
>>>>>>>>      `void`, to
>>>>>>>>      >>>>>>>> enforce that it comes last, and that there is only
>>>>>>>> one
>>>>>>>>      definition of
>>>>>>>>      >>>>>> the
>>>>>>>>      >>>>>>>> default branch. Potentially, we should log a
>>>>>>>> warning if
>>>>>>>>      there's no
>>>>>>>>      >>>>>>> default,
>>>>>>>>      >>>>>>>> and additionally log a warning (or throw an
>>>>>>>> exception) if a
>>>>>>>>      record
>>>>>>>>      >>>>>> falls
>>>>>>>>      >>>>>>>> though with no default.
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>> Thoughts?
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>> Thanks,
>>>>>>>>      >>>>>>>> -John
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>>>>>>>      >>>>> matth...@confluent.io <mailto:matth...@confluent.io>
>>>>>>>>      >>>>>>>> wrote:
>>>>>>>>      >>>>>>>>
>>>>>>>>      >>>>>>>>> Thanks for updating the KIP and your answers.
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>> this is to make the name similar to String#split
>>>>>>>>      >>>>>>>>>>> that also returns an array, right?
>>>>>>>>      >>>>>>>>> The intend was to avoid name duplication. The
>>>>>>>> return type
>>>>>>>>      should
>>>>>>>>      >>>>>> _not_
>>>>>>>>      >>>>>>>>> be an array.
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> The current proposal is
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> stream.branch()
>>>>>>>>      >>>>>>>>>      .branch(Predicate)
>>>>>>>>      >>>>>>>>>      .branch(Predicate)
>>>>>>>>      >>>>>>>>>      .defaultBranch();
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> IMHO, this reads a little odd, because the first
>>>>>>>>      `branch()` does
>>>>>>>>      >>>>> not
>>>>>>>>      >>>>>>>>> take any parameters and has different semantics
>>>>>>>> than the
>>>>>>> later
>>>>>>>>      >>>>>>>>> `branch()` calls. Note, that from the code
>>>>>>>> snippet above,
>>>>>>> it's
>>>>>>>>      >>>>> hidden
>>>>>>>>      >>>>>>>>> that the first call is `KStream#branch()` while
>>>>>>>> the others
>>>>>>> are
>>>>>>>>      >>>>>>>>> `KBranchedStream#branch()` what makes reading the
>>>>>>>> code
>>>>>>> harder.
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> Because I suggested to rename `addBranch()` ->
>>>>>>>> `branch()`,
>>>>>>>>      I though
>>>>>>>>      >>>>>> it
>>>>>>>>      >>>>>>>>> might be better to also rename `KStream#branch()`
>>>>>>>> to avoid
>>>>>>> the
>>>>>>>>      >>>>> naming
>>>>>>>>      >>>>>>>>> overlap that seems to be confusing. The following
>>>>>>>> reads
>>>>>>> much
>>>>>>>>      >>>>> cleaner
>>>>>>>>      >>>>>> to
>>>>>>>>      >>>>>>>> me:
>>>>>>>>      >>>>>>>>> stream.split()
>>>>>>>>      >>>>>>>>>      .branch(Predicate)
>>>>>>>>      >>>>>>>>>      .branch(Predicate)
>>>>>>>>      >>>>>>>>>      .defaultBranch();
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> Maybe there is a better alternative to `split()`
>>>>>>>> though to
>>>>>>>>      avoid
>>>>>>>>      >>>>> the
>>>>>>>>      >>>>>>>>> naming overlap.
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>> 'default' is, however, a reserved word, so
>>>>>>>> unfortunately
>>>>>>> we
>>>>>>>>      >>>>> cannot
>>>>>>>>      >>>>>>> have
>>>>>>>>      >>>>>>>>> a method with such name :-)
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> Bummer. Didn't consider this. Maybe we can still
>>>>>>>> come up
>>>>>>>>      with a
>>>>>>>>      >>>>> short
>>>>>>>>      >>>>>>>> name?
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> Can you add the interface `KBranchedStream` to
>>>>>>>> the KIP
>>>>>>>>      with all
>>>>>>>>      >>>>> it's
>>>>>>>>      >>>>>>>>> methods? It will be part of public API and should be
>>>>>>>>      contained in
>>>>>>>>      >>>>> the
>>>>>>>>      >>>>>>>>> KIP. For example, it's unclear atm, what the
>>>>>>>> return type of
>>>>>>>>      >>>>>>>>> `defaultBranch()` is.
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> You did not comment on the idea to add a
>>>>>>>>      `KBranchedStream#get(int
>>>>>>>>      >>>>>>> index)
>>>>>>>>      >>>>>>>>> -> KStream` method to get the individually
>>>>>>>>      branched-KStreams. Would
>>>>>>>>      >>>>>> be
>>>>>>>>      >>>>>>>>> nice to get your feedback about it. It seems you
>>>>>>>> suggest
>>>>>>>>      that users
>>>>>>>>      >>>>>>>>> would need to write custom utility code
>>>>>>>> otherwise, to
>>>>>>>>      access them.
>>>>>>>>      >>>>> We
>>>>>>>>      >>>>>>>>> should discuss the pros and cons of both
>>>>>>>> approaches. It
>>>>>>> feels
>>>>>>>>      >>>>>>>>> "incomplete" to me atm, if the API has no
>>>>>>>> built-in support
>>>>>>>>      to get
>>>>>>>>      >>>>> the
>>>>>>>>      >>>>>>>>> branched-KStreams directly.
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>> -Matthias
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>>>>>      >>>>>>>>>> Hi all!
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> I have updated the KIP-418 according to the new
>>>>>>>> vision.
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> Matthias, thanks for your comment!
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>>>>>      >>>>>>>>>> I can see your point: this is to make the name
>>>>>>>> similar to
>>>>>>>>      >>>>>>> String#split
>>>>>>>>      >>>>>>>>>> that also returns an array, right? But is it
>>>>>>>> worth the
>>>>>>>>      loss of
>>>>>>>>      >>>>>>>> backwards
>>>>>>>>      >>>>>>>>>> compatibility? We can have overloaded branch()
>>>>>>>> as well
>>>>>>>>      without
>>>>>>>>      >>>>>>>> affecting
>>>>>>>>      >>>>>>>>>> the existing code. Maybe the old array-based
>>>>>>>> `branch`
>>>>>>> method
>>>>>>>>      >>>>> should
>>>>>>>>      >>>>>>> be
>>>>>>>>      >>>>>>>>>> deprecated, but this is a subject for discussion.
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>>>>>>>      >>>>> BranchingKStream#branch(),
>>>>>>>>      >>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>>>> BranchingKStream#default()
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> Totally agree with 'addBranch->branch' rename.
>>>>>>>> 'default'
>>>>>>> is,
>>>>>>>>      >>>>>>> however, a
>>>>>>>>      >>>>>>>>>> reserved word, so unfortunately we cannot have a
>>>>>>>> method
>>>>>>>>      with such
>>>>>>>>      >>>>>>> name
>>>>>>>>      >>>>>>>>> :-)
>>>>>>>>      >>>>>>>>>>> defaultBranch() does take an `Predicate` as
>>>>>>>> argument,
>>>>>>> but I
>>>>>>>>      >>>>> think
>>>>>>>>      >>>>>>> that
>>>>>>>>      >>>>>>>>>> is not required?
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> Absolutely! I think that was just copy-paste
>>>>>>>> error or
>>>>>>>>      something.
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> Dear colleagues,
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> please revise the new version of the KIP and
>>>>>>>> Paul's PR
>>>>>>>>      >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> Any new suggestions/objections?
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> Ivan
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>>
>>>>>>>>      >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>>>>>      >>>>>>>>>>> Thanks for driving the discussion of this KIP.
>>>>>>>> It seems
>>>>>>> that
>>>>>>>>      >>>>>>> everybody
>>>>>>>>      >>>>>>>>>>> agrees that the current branch() method using
>>>>>>>> arrays is
>>>>>>> not
>>>>>>>>      >>>>>> optimal.
>>>>>>>>      >>>>>>>>>>> I had a quick look into the PR and I like the
>>>>>>>> overall
>>>>>>>>      proposal.
>>>>>>>>      >>>>>>> There
>>>>>>>>      >>>>>>>>>>> are some minor things we need to consider. I would
>>>>>>>>      recommend the
>>>>>>>>      >>>>>>>>>>> following renaming:
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> KStream#branch() -> #split()
>>>>>>>>      >>>>>>>>>>> KBranchedStream#addBranch() ->
>>>>>>>> BranchingKStream#branch()
>>>>>>>>      >>>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>>>>>      BranchingKStream#default()
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> It's just a suggestion to get slightly shorter
>>>>>>>> method
>>>>>>> names.
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> In the current PR, defaultBranch() does take an
>>>>>>>>      `Predicate` as
>>>>>>>>      >>>>>>>> argument,
>>>>>>>>      >>>>>>>>>>> but I think that is not required?
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> Also, we should consider KIP-307, that was
>>>>>>>> recently
>>>>>>>>      accepted and
>>>>>>>>      >>>>>> is
>>>>>>>>      >>>>>>>>>>> currently implemented:
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>
>>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>>
>>>>>>>>      >>>>>>>>>>> Ie, we should add overloads that accepted a
>>>>>>>> `Named`
>>>>>>>>      parameter.
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> For the issue that the created `KStream` object
>>>>>>>> are in
>>>>>>>>      different
>>>>>>>>      >>>>>>>> scopes:
>>>>>>>>      >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
>>>>>>>>      index)` method
>>>>>>>>      >>>>>>> that
>>>>>>>>      >>>>>>>>>>> returns the corresponding "branched" result
>>>>>>>> `KStream`
>>>>>>>>      object?
>>>>>>>>      >>>>>> Maybe,
>>>>>>>>      >>>>>>>> the
>>>>>>>>      >>>>>>>>>>> second argument of `addBranch()` should not be a
>>>>>>>>      >>>>>> `Consumer<KStream>`
>>>>>>>>      >>>>>>>> but
>>>>>>>>      >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could
>>>>>>>> return
>>>>>>>>      whatever
>>>>>>>>      >>>>>> the
>>>>>>>>      >>>>>>>>>>> `Function` returns?
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> Finally, I would also suggest to update the KIP
>>>>>>>> with the
>>>>>>>>      current
>>>>>>>>      >>>>>>>>>>> proposal. That makes it easier to review.
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>> -Matthias
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>>>>>      >>>>>>>>>>>> Ivan,
>>>>>>>>      >>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>> I'm a bit of a novice here as well, but I
>>>>>>>> think it
>>>>>>>>      makes sense
>>>>>>>>      >>>>>> for
>>>>>>>>      >>>>>>>> you
>>>>>>>>      >>>>>>>>> to
>>>>>>>>      >>>>>>>>>>>> revise the KIP and continue the discussion. 
>>>>>>>> Obviously
>>>>>>>>      we'll
>>>>>>>>      >>>>> need
>>>>>>>>      >>>>>>>> some
>>>>>>>>      >>>>>>>>>>>> buy-in from committers that have actual
>>>>>>>> binding votes on
>>>>>>>>      >>>>> whether
>>>>>>>>      >>>>>>> the
>>>>>>>>      >>>>>>>>> KIP
>>>>>>>>      >>>>>>>>>>>> could be adopted.  It would be great to hear
>>>>>>>> if they
>>>>>>>>      think this
>>>>>>>>      >>>>>> is
>>>>>>>>      >>>>>>> a
>>>>>>>>      >>>>>>>>> good
>>>>>>>>      >>>>>>>>>>>> idea overall.  I'm not sure if that happens
>>>>>>>> just by
>>>>>>>>      starting a
>>>>>>>>      >>>>>>> vote,
>>>>>>>>      >>>>>>>>> or if
>>>>>>>>      >>>>>>>>>>>> there is generally some indication of interest
>>>>>>> beforehand.
>>>>>>>>      >>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>> That being said, I'll continue the discussion
>>>>>>>> a bit:
>>>>>>>>      assuming
>>>>>>>>      >>>>> we
>>>>>>>>      >>>>>> do
>>>>>>>>      >>>>>>>>> move
>>>>>>>>      >>>>>>>>>>>> forward the solution of "stream.branch() returns
>>>>>>>>      >>>>>> KBranchedStream",
>>>>>>>>      >>>>>>> do
>>>>>>>>      >>>>>>>>> we
>>>>>>>>      >>>>>>>>>>>> deprecate "stream.branch(...) returns
>>>>>>>> KStream[]"?  I
>>>>>>> would
>>>>>>>>      >>>>> favor
>>>>>>>>      >>>>>>>>>>>> deprecating, since having two mutually
>>>>>>>> exclusive APIs
>>>>>>> that
>>>>>>>>      >>>>>>> accomplish
>>>>>>>>      >>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>> same thing is confusing, especially when
>>>>>>>> they're fairly
>>>>>>>>      similar
>>>>>>>>      >>>>>>>>> anyway.  We
>>>>>>>>      >>>>>>>>>>>> just need to be sure we're not making something
>>>>>>>>      >>>>>>> impossible/difficult
>>>>>>>>      >>>>>>>>> that
>>>>>>>>      >>>>>>>>>>>> is currently possible/easy.
>>>>>>>>      >>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>> Regarding my PR - I think the general
>>>>>>>> structure would
>>>>>>> work,
>>>>>>>>      >>>>> it's
>>>>>>>>      >>>>>>>> just a
>>>>>>>>      >>>>>>>>>>>> little sloppy overall in terms of naming and
>>>>>>>> clarity. In
>>>>>>>>      >>>>>>> particular,
>>>>>>>>      >>>>>>>>>>>> passing in the "predicates" and "children"
>>>>>>>> lists which
>>>>>>> get
>>>>>>>>      >>>>>> modified
>>>>>>>>      >>>>>>>> in
>>>>>>>>      >>>>>>>>>>>> KBranchedStream but read from all the way
>>>>>>>>      KStreamLazyBranch is
>>>>>>>>      >>>>> a
>>>>>>>>      >>>>>>> bit
>>>>>>>>      >>>>>>>>>>>> complicated to follow.
>>>>>>>>      >>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>> Thanks,
>>>>>>>>      >>>>>>>>>>>> Paul
>>>>>>>>      >>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
>>>>>>>>      >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>>>      >>>>>>>>> wrote:
>>>>>>>>      >>>>>>>>>>>>> Hi Paul!
>>>>>>>>      >>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>> I read your code carefully and now I am fully
>>>>>>>>      convinced: your
>>>>>>>>      >>>>>>>> proposal
>>>>>>>>      >>>>>>>>>>>>> looks better and should work. We just have to
>>>>>>>> document
>>>>>>> the
>>>>>>>>      >>>>>> crucial
>>>>>>>>      >>>>>>>>> fact
>>>>>>>>      >>>>>>>>>>>>> that KStream consumers are invoked as they're
>>>>>>>> added.
>>>>>>>>      And then
>>>>>>>>      >>>>>> it's
>>>>>>>>      >>>>>>>> all
>>>>>>>>      >>>>>>>>>>>>> going to be very nice.
>>>>>>>>      >>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>> What shall we do now? I should re-write the
>>>>>>>> KIP and
>>>>>>>>      resume the
>>>>>>>>      >>>>>>>>>>>>> discussion here, right?
>>>>>>>>      >>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>> Why are you telling that your PR 'should not
>>>>>>>> be even a
>>>>>>>>      >>>>> starting
>>>>>>>>      >>>>>>>> point
>>>>>>>>      >>>>>>>>> if
>>>>>>>>      >>>>>>>>>>>>> we go in this direction'? To me it looks like
>>>>>>>> a good
>>>>>>>>      starting
>>>>>>>>      >>>>>>> point.
>>>>>>>>      >>>>>>>>> But
>>>>>>>>      >>>>>>>>>>>>> as a novice in this project I might miss some
>>>>>>>> important
>>>>>>>>      >>>>> details.
>>>>>>>>      >>>>>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>> Ivan
>>>>>>>>      >>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>>>>>      >>>>>>>>>>>>>> Ivan,
>>>>>>>>      >>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
>>>>>>>>      >>>>> stream.branch()
>>>>>>>>      >>>>>>>>> solution
>>>>>>>>      >>>>>>>>>>>>> supports this. The couponIssuer::set*
>>>>>>>> consumers will be
>>>>>>>>      >>>>> invoked
>>>>>>>>      >>>>>> as
>>>>>>>>      >>>>>>>>> they’re
>>>>>>>>      >>>>>>>>>>>>> added, not during streamsBuilder.build(). So
>>>>>>>> the user
>>>>>>>>      still
>>>>>>>>      >>>>>> ought
>>>>>>>>      >>>>>>> to
>>>>>>>>      >>>>>>>>> be
>>>>>>>>      >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward
>>>>>>>> and
>>>>>>>>      depend on
>>>>>>>>      >>>>> the
>>>>>>>>      >>>>>>>>> branched
>>>>>>>>      >>>>>>>>>>>>> streams having been set.
>>>>>>>>      >>>>>>>>>>>>>> The issue I mean to point out is that it is
>>>>>>>> hard to
>>>>>>>>      access
>>>>>>>>      >>>>> the
>>>>>>>>      >>>>>>>>> branched
>>>>>>>>      >>>>>>>>>>>>> streams in the same scope as the original
>>>>>>>> stream (that
>>>>>>>>      is, not
>>>>>>>>      >>>>>>>> inside
>>>>>>>>      >>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>> couponIssuer), which is a problem with both
>>>>>>>> proposed
>>>>>>>>      >>>>> solutions.
>>>>>>>>      >>>>>> It
>>>>>>>>      >>>>>>>>> can be
>>>>>>>>      >>>>>>>>>>>>> worked around though.
>>>>>>>>      >>>>>>>>>>>>>> [Also, great to hear additional interest in
>>>>>>>> 401, I’m
>>>>>>>>      excited
>>>>>>>>      >>>>> to
>>>>>>>>      >>>>>>>> hear
>>>>>>>>      >>>>>>>>>>>>> your thoughts!]
>>>>>>>>      >>>>>>>>>>>>>> Paul
>>>>>>>>      >>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
>>>>>>>>      >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>>>      >>>>>>>>> wrote:
>>>>>>>>      >>>>>>>>>>>>>>> Hi Paul!
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> The idea to postpone the wiring of branches
>>>>>>>> to the
>>>>>>>>      >>>>>>>>>>>>> streamsBuilder.build() also looked great for
>>>>>>>> me at
>>>>>>> first
>>>>>>>>      >>>>> glance,
>>>>>>>>      >>>>>>> but
>>>>>>>>      >>>>>>>>> ---
>>>>>>>>      >>>>>>>>>>>>>>>> the newly branched streams are not
>>>>>>>> available in the
>>>>>>>>      same
>>>>>>>>      >>>>>> scope
>>>>>>>>      >>>>>>> as
>>>>>>>>      >>>>>>>>> each
>>>>>>>>      >>>>>>>>>>>>> other.  That is, if we wanted to merge them back
>>>>>>> together
>>>>>>>>      >>>>> again
>>>>>>>>      >>>>>> I
>>>>>>>>      >>>>>>>>> don't see
>>>>>>>>      >>>>>>>>>>>>> a way to do that.
>>>>>>>>      >>>>>>>>>>>>>>> You just took the words right out of my
>>>>>>>> mouth, I was
>>>>>>>>      just
>>>>>>>>      >>>>>> going
>>>>>>>>      >>>>>>> to
>>>>>>>>      >>>>>>>>>>>>> write in details about this issue.
>>>>>>>>      >>>>>>>>>>>>>>> Consider the example from Bill's book, p.
>>>>>>>> 101: say
>>>>>>>>      we need
>>>>>>>>      >>>>> to
>>>>>>>>      >>>>>>>>> identify
>>>>>>>>      >>>>>>>>>>>>> customers who have bought coffee and made a
>>>>>>>> purchase
>>>>>>>>      in the
>>>>>>>>      >>>>>>>>> electronics
>>>>>>>>      >>>>>>>>>>>>> store to give them coupons.
>>>>>>>>      >>>>>>>>>>>>>>> This is the code I usually write under these
>>>>>>>>      circumstances
>>>>>>>>      >>>>>> using
>>>>>>>>      >>>>>>>> my
>>>>>>>>      >>>>>>>>>>>>> 'brancher' class:
>>>>>>>>      >>>>>>>>>>>>>>> @Setter
>>>>>>>>      >>>>>>>>>>>>>>> class CouponIssuer{
>>>>>>>>      >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
>>>>>>>>      >>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>   KStream<...> coupons(){
>>>>>>>>      >>>>>>>>>>>>>>>       return
>>>>>>>>      >>>>>>>>>
>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>>>>>      >>>>>>>>>>>>>>>       /*In the real world the code here can be
>>>>>>>>      complex, so
>>>>>>>>      >>>>>>>>> creation of
>>>>>>>>      >>>>>>>>>>>>> a separate CouponIssuer class is fully
>>>>>>>> justified, in
>>>>>>>>      order to
>>>>>>>>      >>>>>>>> separate
>>>>>>>>      >>>>>>>>>>>>> classes' responsibilities.*/
>>>>>>>>      >>>>>>>>>>>>>>>  }
>>>>>>>>      >>>>>>>>>>>>>>> }
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new
>>>>>>>> CouponIssuer();
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>>>>>      >>>>>>>>>>>>>>>     .branch(predicate1,
>>>>>>> couponIssuer::setCoffePurchases)
>>>>>>>>      >>>>>>>>>>>>>>>     .branch(predicate2,
>>>>>>>>      >>>>>> couponIssuer::setElectronicsPurchases)
>>>>>>>>      >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to
>>>>>>>> wire up
>>>>>>>>      everything
>>>>>>>>      >>>>>>>> later,
>>>>>>>>      >>>>>>>>>>>>> without the terminal operation!!!*/
>>>>>>>>      >>>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> Does this make sense?  In order to properly
>>>>>>>>      initialize the
>>>>>>>>      >>>>>>>>> CouponIssuer
>>>>>>>>      >>>>>>>>>>>>> we need the terminal operation to be called
>>>>>>>> before
>>>>>>>>      >>>>>>>>> streamsBuilder.build()
>>>>>>>>      >>>>>>>>>>>>> is called.
>>>>>>>>      >>>>>>>>>>>>>>> [BTW Paul, I just found out that your
>>>>>>>> KIP-401 is
>>>>>>>>      essentially
>>>>>>>>      >>>>>> the
>>>>>>>>      >>>>>>>>> next
>>>>>>>>      >>>>>>>>>>>>> KIP I was going to write here. I have some
>>>>>>>> thoughts
>>>>>>>>      based on
>>>>>>>>      >>>>> my
>>>>>>>>      >>>>>>>>> experience,
>>>>>>>>      >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
>>>>>>>>      >>>>>>>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> Ivan
>>>>>>>>      >>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>>>>>      >>>>>>>>>>>>>>>> Ivan,
>>>>>>>>      >>>>>>>>>>>>>>>> I tried to make a very rough proof of
>>>>>>>> concept of a
>>>>>>>>      fluent
>>>>>>>>      >>>>> API
>>>>>>>>      >>>>>>>> based
>>>>>>>>      >>>>>>>>>>>>> off of
>>>>>>>>      >>>>>>>>>>>>>>>> KStream here
>>>>>>>>      (https://github.com/apache/kafka/pull/6512),
>>>>>>>>      >>>>>> and
>>>>>>>>      >>>>>>> I
>>>>>>>>      >>>>>>>>> think
>>>>>>>>      >>>>>>>>>>>>> I
>>>>>>>>      >>>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>>>>>      >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect
>>>>>>>> earlier about
>>>>>>>>      >>>>>>> compatibility
>>>>>>>>      >>>>>>>>>>>>> issues,
>>>>>>>>      >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was
>>>>>>>> unaware
>>>>>>>>      that Java
>>>>>>>>      >>>>> is
>>>>>>>>      >>>>>>>> smart
>>>>>>>>      >>>>>>>>>>>>> enough to
>>>>>>>>      >>>>>>>>>>>>>>>>    distinguish between a branch(varargs...)
>>>>>>>>      returning one
>>>>>>>>      >>>>>>> thing
>>>>>>>>      >>>>>>>>> and
>>>>>>>>      >>>>>>>>>>>>> branch()
>>>>>>>>      >>>>>>>>>>>>>>>>    with no arguments returning another thing.
>>>>>>>>      >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't
>>>>>>>> actually
>>>>>>>>      need
>>>>>>>>      >>>>> it.
>>>>>>>>      >>>>>>> We
>>>>>>>>      >>>>>>>>> can
>>>>>>>>      >>>>>>>>>>>>> just
>>>>>>>>      >>>>>>>>>>>>>>>>    build up the branches in the
>>>>>>>> KBranchedStream who
>>>>>>>>      shares
>>>>>>>>      >>>>>> its
>>>>>>>>      >>>>>>>>> state
>>>>>>>>      >>>>>>>>>>>>> with the
>>>>>>>>      >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do
>>>>>>>> the
>>>>>>>>      branching.
>>>>>>>>      >>>>>>> It's
>>>>>>>>      >>>>>>>>> not
>>>>>>>>      >>>>>>>>>>>>> terribly
>>>>>>>>      >>>>>>>>>>>>>>>>    pretty in its current form, but I think it
>>>>>>>>      demonstrates
>>>>>>>>      >>>>>> its
>>>>>>>>      >>>>>>>>>>>>> feasibility.
>>>>>>>>      >>>>>>>>>>>>>>>> To be clear, I don't think that pull
>>>>>>>> request should
>>>>>>> be
>>>>>>>>      >>>>> final
>>>>>>>>      >>>>>> or
>>>>>>>>      >>>>>>>>> even a
>>>>>>>>      >>>>>>>>>>>>>>>> starting point if we go in this direction,
>>>>>>>> I just
>>>>>>>>      wanted to
>>>>>>>>      >>>>>> see
>>>>>>>>      >>>>>>>> how
>>>>>>>>      >>>>>>>>>>>>>>>> challenging it would be to get the API
>>>>>>>> working.
>>>>>>>>      >>>>>>>>>>>>>>>> I will say though, that I'm not sure the
>>>>>>>> existing
>>>>>>>>      solution
>>>>>>>>      >>>>>>> could
>>>>>>>>      >>>>>>>> be
>>>>>>>>      >>>>>>>>>>>>>>>> deprecated in favor of this, which I had
>>>>>>>> originally
>>>>>>>>      >>>>> suggested
>>>>>>>>      >>>>>>>> was a
>>>>>>>>      >>>>>>>>>>>>>>>> possibility.  The reason is that the newly
>>>>>>>> branched
>>>>>>>>      streams
>>>>>>>>      >>>>>> are
>>>>>>>>      >>>>>>>> not
>>>>>>>>      >>>>>>>>>>>>>>>> available in the same scope as each
>>>>>>>> other.  That
>>>>>>>>      is, if we
>>>>>>>>      >>>>>>> wanted
>>>>>>>>      >>>>>>>>> to
>>>>>>>>      >>>>>>>>>>>>> merge
>>>>>>>>      >>>>>>>>>>>>>>>> them back together again I don't see a way
>>>>>>>> to do
>>>>>>>>      that.  The
>>>>>>>>      >>>>>> KIP
>>>>>>>>      >>>>>>>>>>>>> proposal
>>>>>>>>      >>>>>>>>>>>>>>>> has the same issue, though - all this
>>>>>>>> means is that
>>>>>>> for
>>>>>>>>      >>>>>> either
>>>>>>>>      >>>>>>>>>>>>> solution,
>>>>>>>>      >>>>>>>>>>>>>>>> deprecating the existing branch(...) is
>>>>>>>> not on the
>>>>>>>>      table.
>>>>>>>>      >>>>>>>>>>>>>>>> Thanks,
>>>>>>>>      >>>>>>>>>>>>>>>> Paul
>>>>>>>>      >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan
>>>>>>>> Ponomarev <
>>>>>>>>      >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>>
>>>>>>>>      >>>>>>>>>>>>> wrote:
>>>>>>>>      >>>>>>>>>>>>>>>>> OK, let me summarize what we have
>>>>>>>> discussed up to
>>>>>>> this
>>>>>>>>      >>>>>> point.
>>>>>>>>      >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed
>>>>>>>> that
>>>>>>>>      branch API
>>>>>>>>      >>>>>>> needs
>>>>>>>>      >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>>>>>      >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
>>>>>>>>      >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
>>>>>>>>      >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
>>>>>>>>      >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...)....
>>>>>>>> //onTopOf
>>>>>>>>      returns
>>>>>>>>      >>>>>> its
>>>>>>>>      >>>>>>>>> argument
>>>>>>>>      >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2)
>>>>>>>> The code
>>>>>>> won't
>>>>>>>>      >>>>> make
>>>>>>>>      >>>>>>>> sense
>>>>>>>>      >>>>>>>>>>>>> until
>>>>>>>>      >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> CONS: The need to create a
>>>>>>>> KafkaStreamsBrancher
>>>>>>>>      instance
>>>>>>>>      >>>>>>>>> contrasts the
>>>>>>>>      >>>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> stream
>>>>>>>>      >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
>>>>>>>>      >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
>>>>>>>>      >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or
>>>>>>>> noDefault(). Both
>>>>>>>>      >>>>>>>>> defaultBranch(..)
>>>>>>>>      >>>>>>>>>>>>> and
>>>>>>>>      >>>>>>>>>>>>>>>>> noDefault() return void
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams
>>>>>>>> interface
>>>>>>> is
>>>>>>>>      >>>>>> defined.
>>>>>>>>      >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
>>>>>>>>      >>>>>>>> (defaultBranch(ks->)
>>>>>>>>      >>>>>>>>> and
>>>>>>>>      >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very
>>>>>>>> easy to
>>>>>>>>      miss the
>>>>>>>>      >>>>>> fact
>>>>>>>>      >>>>>>>>> that one
>>>>>>>>      >>>>>>>>>>>>>>>>> of the terminal methods should be called.
>>>>>>>> If these
>>>>>>>>      methods
>>>>>>>>      >>>>>> are
>>>>>>>>      >>>>>>>> not
>>>>>>>>      >>>>>>>>>>>>>>>>> called, we can throw an exception in
>>>>>>>> runtime.
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can
>>>>>>>> we do
>>>>>>> better?
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> Ivan
>>>>>>>>      >>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>>>>      >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>>>>      >>>>>>>>>>>>>>>>>>> Paul,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>> I see your point when you are talking
>>>>>>>> about
>>>>>>>>      >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
>>>>>>>>      implemented the
>>>>>>>>      >>>>>>> easy
>>>>>>>>      >>>>>>>>> way.
>>>>>>>>      >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
>>>>>>> assumes
>>>>>>>>      >>>>> nothing
>>>>>>>>      >>>>>>>> will
>>>>>>>>      >>>>>>>>>>>>> reach
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> the default branch,
>>>>>>>>      >>>>>>>>>>>>>>>>>>> throwing an exception if such a case
>>>>>>>> occurs.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be
>>>>>>>> the only
>>>>>>> option
>>>>>>>>      >>>>>> besides
>>>>>>>>      >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios
>>>>>>>> when we
>>>>>>>>      want to
>>>>>>>>      >>>>>> just
>>>>>>>>      >>>>>>>>> silently
>>>>>>>>      >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
>>>>>>>>      predicate. 2)
>>>>>>>>      >>>>>>> Throwing
>>>>>>>>      >>>>>>>>> an
>>>>>>>>      >>>>>>>>>>>>>>>>>>> exception in the middle of data flow
>>>>>>>> processing
>>>>>>>>      looks
>>>>>>>>      >>>>>> like a
>>>>>>>>      >>>>>>>> bad
>>>>>>>>      >>>>>>>>>>>>> idea.
>>>>>>>>      >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would
>>>>>>>> prefer to
>>>>>>>>      emit a
>>>>>>>>      >>>>>>>> special
>>>>>>>>      >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is
>>>>>>>> exactly
>>>>>>> where
>>>>>>>>      >>>>>>> `default`
>>>>>>>>      >>>>>>>>> can
>>>>>>>>      >>>>>>>>>>>>> be
>>>>>>>>      >>>>>>>>>>>>>>>>>>> used.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>>>>      >>>>> InternalTopologyBuilder
>>>>>>>>      >>>>>>> to
>>>>>>>>      >>>>>>>>> track
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>      >>>>>>>>>>>>>>>>>>> branches that haven't been terminated
>>>>>>>> and raise
>>>>>>>>      a clear
>>>>>>>>      >>>>>>> error
>>>>>>>>      >>>>>>>>>>>>> before it
>>>>>>>>      >>>>>>>>>>>>>>>>>>> becomes an issue.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the
>>>>>>>> program is
>>>>>>>>      >>>>> compiled
>>>>>>>>      >>>>>>> and
>>>>>>>>      >>>>>>>>> run?
>>>>>>>>      >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't
>>>>>>>>      compile if
>>>>>>>>      >>>>> used
>>>>>>>>      >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
>>>>>>>>      method chain
>>>>>>>>      >>>>>>>> starting
>>>>>>>>      >>>>>>>>>>>>> from
>>>>>>>>      >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost
>>>>>>>> difference
>>>>>>>>      between
>>>>>>>>      >>>>>>>> runtime
>>>>>>>>      >>>>>>>>> and
>>>>>>>>      >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure
>>>>>>>> uncovers
>>>>>>>>      >>>>> instantly
>>>>>>>>      >>>>>> on
>>>>>>>>      >>>>>>>>> unit
>>>>>>>>      >>>>>>>>>>>>>>>>>>> tests, it costs more for the project
>>>>>>>> than a
>>>>>>>>      compilation
>>>>>>>>      >>>>>>>> failure.
>>>>>>>>      >>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> Good point about the terminal
>>>>>>>> operation being
>>>>>>>>      required.
>>>>>>>>      >>>>>>> But
>>>>>>>>      >>>>>>>> is
>>>>>>>>      >>>>>>>>>>>>> that
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> really
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't
>>>>>>>> want a
>>>>>>>>      >>>>>> defaultBranch
>>>>>>>>      >>>>>>>>> they
>>>>>>>>      >>>>>>>>>>>>> can
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> call
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> some other terminal method
>>>>>>>> (noDefaultBranch()?)
>>>>>>>>      just as
>>>>>>>>      >>>>>>>>> easily.  In
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> fact I
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a
>>>>>>>> nicer API
>>>>>>> - a
>>>>>>>>      >>>>> user
>>>>>>>>      >>>>>>>> could
>>>>>>>>      >>>>>>>>>>>>> specify
>>>>>>>>      >>>>>>>>>>>>>>>>> a
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing
>>>>>>>> will reach
>>>>>>> the
>>>>>>>>      >>>>>> default
>>>>>>>>      >>>>>>>>> branch,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case
>>>>>>>> occurs.
>>>>>>> That
>>>>>>>>      >>>>> seems
>>>>>>>>      >>>>>>> like
>>>>>>>>      >>>>>>>>> an
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> improvement over the current branch()
>>>>>>>> API,
>>>>>>>>      which allows
>>>>>>>>      >>>>>> for
>>>>>>>>      >>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>> more
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> subtle
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
>>>>>>> dropped.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> The need for a terminal operation
>>>>>>>> certainly has
>>>>>>>>      to be
>>>>>>>>      >>>>>> well
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> documented, but
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>>>>      >>>>> InternalTopologyBuilder
>>>>>>>>      >>>>>>> to
>>>>>>>>      >>>>>>>>> track
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated
>>>>>>>> and raise
>>>>>>>>      a clear
>>>>>>>>      >>>>>>> error
>>>>>>>>      >>>>>>>>>>>>> before it
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that
>>>>>>>> there is
>>>>>>> a
>>>>>>>>      >>>>> "build
>>>>>>>>      >>>>>>>> step"
>>>>>>>>      >>>>>>>>>>>>> where
>>>>>>>>      >>>>>>>>>>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>>>>>>      >>>>>> StreamsBuilder.build()
>>>>>>>>      >>>>>>> is
>>>>>>>>      >>>>>>>>>>>>> called.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its
>>>>>>>> argument, I
>>>>>>> agree
>>>>>>>>      >>>>> that
>>>>>>>>      >>>>>>> it's
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> critical to
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> allow users to do other operations on
>>>>>>>> the input
>>>>>>>>      stream.
>>>>>>>>      >>>>>>> With
>>>>>>>>      >>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>>>>>> fluent
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same
>>>>>>>> way all
>>>>>>> other
>>>>>>>>      >>>>>>> operations
>>>>>>>>      >>>>>>>>> do -
>>>>>>>>      >>>>>>>>>>>>> if
>>>>>>>>      >>>>>>>>>>>>>>>>> you
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
>>>>>>> multiple
>>>>>>>>      >>>>> times,
>>>>>>>>      >>>>>>> you
>>>>>>>>      >>>>>>>>> just
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> need the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call
>>>>>>>> as many
>>>>>>>>      operations
>>>>>>>>      >>>>>> on
>>>>>>>>      >>>>>>> it
>>>>>>>>      >>>>>>>>> as
>>>>>>>>      >>>>>>>>>>>>> you
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> desire.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan
>>>>>>>> Ponomarev <
>>>>>>>>      >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we
>>>>>>>> do not
>>>>>>>>      always need
>>>>>>>>      >>>>>> the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
>>>>>>> operation we
>>>>>>>>      >>>>> don't
>>>>>>>>      >>>>>>>> know
>>>>>>>>      >>>>>>>>>>>>> when to
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its
>>>>>>>> argument,
>>>>>>>>      so we
>>>>>>>>      >>>>> can
>>>>>>>>      >>>>>> do
>>>>>>>>      >>>>>>>>>>>>> something
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> more with the original branch after
>>>>>>>> branching.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
>>>>>>> special
>>>>>>>>      >>>>> object
>>>>>>>>      >>>>>>>>>>>>> construction
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream
>>>>>>>> methods.
>>>>>>> But
>>>>>>>>      >>>>> here
>>>>>>>>      >>>>>> we
>>>>>>>>      >>>>>>>>> have a
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to
>>>>>>>> split the
>>>>>>>>      flow,
>>>>>>>>      >>>>> so
>>>>>>>>      >>>>>> I
>>>>>>>>      >>>>>>>>> think
>>>>>>>>      >>>>>>>>>>>>> this
>>>>>>>>      >>>>>>>>>>>>>>>>> is
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve
>>>>>>>> this
>>>>>>>>      API, but I
>>>>>>>>      >>>>>> find
>>>>>>>>      >>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
>>>>>>>>      contrasts the
>>>>>>>>      >>>>>>> fluency
>>>>>>>>      >>>>>>>>> of
>>>>>>>>      >>>>>>>>>>>>> other
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd
>>>>>>>> like to
>>>>>>>>      just call
>>>>>>>>      >>>>> a
>>>>>>>>      >>>>>>>>> method on
>>>>>>>>      >>>>>>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> stream
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if
>>>>>>>> the branch
>>>>>>>>      cases
>>>>>>>>      >>>>> are
>>>>>>>>      >>>>>>>>> defined
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> fluently.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate,
>>>>>>>> handleCase)
>>>>>>>>      is very
>>>>>>>>      >>>>>> nice
>>>>>>>>      >>>>>>>>> and the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> right
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> way
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped
>>>>>>>> around
>>>>>>>>      how we
>>>>>>>>      >>>>>>> specify
>>>>>>>>      >>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>> source
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> stream.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> Like:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
>>>>>>> this::handle1)
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
>>>>>>> this::handle2)
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>          
>>>>>>>> .defaultBranch(this::handleDefault);
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a
>>>>>>>> KBranchedStreams or
>>>>>>>>      >>>>>>>> KStreamBrancher
>>>>>>>>      >>>>>>>>> or
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> something,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
>>>>>>>>      terminated by
>>>>>>>>      >>>>>>>>>>>>> defaultBranch()
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> (which
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
>>>>>>>>      incompatible with
>>>>>>>>      >>>>> the
>>>>>>>>      >>>>>>>>> current
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> API, so
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to
>>>>>>>> have a
>>>>>>>>      different
>>>>>>>>      >>>>>> name,
>>>>>>>>      >>>>>>>> but
>>>>>>>>      >>>>>>>>> that
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we
>>>>>>>> could call it
>>>>>>>>      >>>>>> something
>>>>>>>>      >>>>>>>> like
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> branched()
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the
>>>>>>>> old API.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of
>>>>>>>> your
>>>>>>>>      KIP?  It
>>>>>>>>      >>>>>> seems
>>>>>>>>      >>>>>>>>> like it
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> does to
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line
>>>>>>>> branching
>>>>>>>>      while also
>>>>>>>>      >>>>>>>> allowing
>>>>>>>>      >>>>>>>>> you
>>>>>>>>      >>>>>>>>>>>>> to
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
>>>>>>>>      KBranchedStreams
>>>>>>>>      >>>>>> if
>>>>>>>>      >>>>>>>>> desired.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan
>>>>>>>> Ponomarev
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> void
>>>>>>>> handleFirstCase(KStream<String, String>
>>>>>>>>      ks){
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>          
>>>>>>>> ks.filter(....).mapValues(...)
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
>>>>>>>>      String> ks){
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>          
>>>>>>>> ks.selectKey(...).groupByKey()...
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> ......
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String,
>>>>>>>> String>()
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
>>>>>>>>      this::handleFirstCase)
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
>>>>>>>>      this::handleSecondCase)
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
>>>>>>> KafkaStreamsBrancher
>>>>>>>>      >>>>> takes a
>>>>>>>>      >>>>>>>>> Consumer
>>>>>>>>      >>>>>>>>>>>>> as a
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>> second
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing,
>>>>>>>> and the
>>>>>>>>      example in
>>>>>>>>      >>>>>> the
>>>>>>>>      >>>>>>>> KIP
>>>>>>>>      >>>>>>>>>>>>> shows
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a
>>>>>>>> terminal node
>>>>>>>>      >>>>>>>>> (KafkaStreams#to()
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> in this
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> case).
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but
>>>>>>>> how would
>>>>>>> we
>>>>>>>>      >>>>> handle
>>>>>>>>      >>>>>>> the
>>>>>>>>      >>>>>>>>> case
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> where the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but
>>>>>>>> wants to
>>>>>>> continue
>>>>>>>>      >>>>>>>> processing
>>>>>>>>      >>>>>>>>> and
>>>>>>>>      >>>>>>>>>>>>> not
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on
>>>>>>>> the
>>>>>>> branched
>>>>>>>>      >>>>>> stream
>>>>>>>>      >>>>>>>>>>>>> immediately?
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic
>>>>>>>> as is if
>>>>>>>>      we had
>>>>>>>>      >>>>>>>> something
>>>>>>>>      >>>>>>>>> like
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM
>>>>>>>> Bill Bejeck
>>>>>>> <
>>>>>>>>      >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the
>>>>>>>> discussion for
>>>>>>> KIP-
>>>>>>>>      >>>>> 418.
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
>>>>>>> KIP-418.
>>>>>>>>      >>>>> Please
>>>>>>>>      >>>>>>>> take
>>>>>>>>      >>>>>>>>> a
>>>>>>>>      >>>>>>>>>>>>> look
>>>>>>>>      >>>>>>>>>>>>>>>>> at
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would
>>>>>>>> appreciate any
>>>>>>>>      feedback :)
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>>>>>      >>>>>
>>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>>>>>      >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>>>>>>>      >>>>> https://github.com/apache/kafka/pull/6164
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>      >>>>>>>>>
>>>>>>>>      >
>>>>>>>>
>>>>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to