What is the status of this KIP?


On 7/11/19 4:00 PM, Matthias J. Sax wrote:
> Ivan,
> did you see my last reply? What do you think about my proposal to mix
> both approaches and try to get best-of-both worlds?
> -Matthias
> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
>> Thanks for the input John!
>>> under your suggestion, it seems that the name is required
>> If you want to get the `KStream` as part of the `Map` back using a
>> `Function`, yes. If you follow the "embedded chaining" pattern using a
>> `Consumer`, no.
>> Allowing for a default name via `split()` can of course be done.
>> Similarly, using `Named` instead of `String` is possible.
>> I wanted to sketch out a high level proposal to merge both patterns
>> only. Your suggestions to align the new API with the existing API make
>> totally sense.
>> One follow up question: Would `Named` be optional or required in
>> `split()` and `branch()`? It's unclear from your example.
>> If both are mandatory, what do we gain by it? The returned `Map` only
>> contains the corresponding branches, so why should we prefix all of
>> them? If only `Named` is mandatory in `branch()`, but optional in
>> `split()`, the same question raises?
>> Requiring `Named` in `split()` seems only to make sense, if `Named` is
>> optional in `branch()` and we generate `-X` suffix using a counter for
>> different branch name. However, this might lead to the problem of
>> changing names if branches are added/removed. Also, how would the names
>> be generated if `Consumer` is mixed in (ie, not all branches are
>> returned in the `Map`).
>> If `Named` is optional for both, it could happen that a user misses to
>> specify a name for a branch what would lead to runtime issues.
>> Hence, I am actually in favor to not allow a default name but keep
>> `split()` without parameter and make `Named` in `branch()` required if a
>> `Function` is used. This makes it explicit to the user that specifying a
>> name is required if a `Function` is used.
>> About
>>> KBranchedStream#branch(BranchConfig)
>> I don't think that the branching predicate is a configuration and hence
>> would not include it in a configuration object.
>>>     withChain(...);
>> Similar, `withChain()` (that would only take a `Consumer`?) does not
>> seem to be a configuration. We can also not prevent a user to call
>> `withName()` in combination of `withChain()` what does not make sense
>> IMHO. We could of course throw an RTE but not have a compile time check
>> seems less appealing. Also, it could happen that neither `withChain()`
>> not `withName()` is called and the branch is missing in the returned
>> `Map` what lead to runtime issues, too.
>> Hence, I don't think that we should add `BranchConfig`. A config object
>> is helpful if each configuration can be set independently of all others,
>> but this seems not to be the case here. If we add new configuration
>> later, we can also just move forward by deprecating the methods that
>> accept `Named` and add new methods that accepted `BranchConfig` (that
>> would of course implement `Named`).
>> Thoughts?
>> @Ivan, what do you think about the general idea to blend the two main
>> approaches of returning a `Map` plus an "embedded chaining"?
>> -Matthias
>> On 6/4/19 10:33 AM, John Roesler wrote:
>>> Thanks for the idea, Matthias, it does seem like this would satisfy
>>> everyone. Returning the map from the terminal operations also solves
>>> the problem of merging/joining the branched streams, if we want to add
>>> support for the compliment later on.
>>> Under your suggestion, it seems that the name is required. Otherwise,
>>> we wouldn't have keys for the map to return. I this this is actually
>>> not too bad, since experience has taught us that, although names for
>>> operations are not required to define stream processing logic, it does
>>> significantly improve the operational experience when you can map the
>>> topology, logs, metrics, etc. back to the source code. Since you
>>> wouldn't (have to) reference the name to chain extra processing onto
>>> the branch (thanks to the second argument), you can avoid the
>>> "unchecked name" problem that Ivan pointed out.
>>> In the current implementation of Branch, you can name the branch
>>> operator itself, and then all the branches get index-suffixed names
>>> built from the branch operator name. I guess under this proposal, we
>>> could naturally append the branch name to the branching operator name,
>>> like this:
>>>    stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>>               .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>>>               .defaultBranch(...) // creates node "mysplit-default"
>>> It does make me wonder about the DSL syntax itself, though.
>>> We don't have a defined grammar, so there's plenty of room to debate
>>> the "best" syntax in the context of each operation, but in general,
>>> the KStream DSL operators follow this pattern:
>>>     operator(function, config_object?) OR operator(config_object)
>>> where config_object is often just Named in the "function" variant.
>>> Even when the config_object isn't a Named, but some other config
>>> class, that config class _always_ implements NamedOperation.
>>> Here, we're introducing a totally different pattern:
>>>   operator(function, function, string)
>>> where the string is the name.
>>> My first question is whether the name should instead be specified with
>>> the NamedOperation interface.
>>> My second question is whether we should just roll all these arguments
>>> up into a config object like:
>>>    KBranchedStream#branch(BranchConfig)
>>>    interface BranchConfig extends NamedOperation {
>>>     withPredicate(...);
>>>     withChain(...);
>>>     withName(...);
>>>   }
>>> Although I guess we'd like to call BranchConfig something more like
>>> "Branched", even if I don't particularly like that pattern.
>>> This makes the source code a little noisier, but it also makes us more
>>> future-proof, as we can deal with a wide range of alternatives purely
>>> in the config interface, and never have to deal with adding overloads
>>> to the KBranchedStream if/when we decide we want the name to be
>>> optional, or the KStream->KStream to be optional.
>>> WDYT?
>>> Thanks,
>>> -John
>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
>>> <michael.droga...@confluent.io> wrote:
>>>> Matthias: I think that's pretty reasonable from my point of view. Good
>>>> suggestion.
>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>> Interesting discussion.
>>>>> I am wondering, if we cannot unify the advantage of both approaches:
>>>>> KStream#split() -> KBranchedStream
>>>>> // branch is not easily accessible in current scope
>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
>>>>>   -> KBranchedStream
>>>>> // assign a name to the branch and
>>>>> // return the sub-stream to the current scope later
>>>>> //
>>>>> // can be simple as `#branch(p, s->s, "name")`
>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>>>>>   -> KBranchedStream
>>>>> // default branch is not easily accessible
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#default(Cosumer<KStream>)
>>>>>   -> Map<String,KStream>
>>>>> // assign custom name to default-branch
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#default(Function<KStream,KStream>, String)
>>>>>   -> Map<String,KStream>
>>>>> // assign a default name for default
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>>>>>   -> Map<String,KStream>
>>>>> // return map of all names sub-stream into current scope
>>>>> KBranchedStream#noDefaultBranch()
>>>>>   -> Map<String,KStream>
>>>>> Hence, for each sub-stream, the user can pick to add a name and return
>>>>> the branch "result" to the calling scope or not. The implementation can
>>>>> also check at runtime that all returned names are unique. The returned
>>>>> Map can be empty and it's also optional to use the Map.
>>>>> To me, it seems like a good way to get best of both worlds.
>>>>> Thoughts?
>>>>> -Matthias
>>>>> On 5/6/19 5:15 PM, John Roesler wrote:
>>>>>> Ivan,
>>>>>> That's a very good point about the "start" operator in the dynamic case.
>>>>>> I had no problem with "split()"; I was just questioning the necessity.
>>>>>> Since you've provided a proof of necessity, I'm in favor of the
>>>>>> "split()" start operator. Thanks!
>>>>>> Separately, I'm interested to see where the present discussion leads.
>>>>>> I've written enough Javascript code in my life to be suspicious of
>>>>>> nested closures. You have a good point about using method references (or
>>>>>> indeed function literals also work). It should be validating that this
>>>>>> was also the JS community's first approach to flattening the logic when
>>>>>> their nested closure situation got out of hand. Unfortunately, it's
>>>>>> replacing nesting with redirection, both of which disrupt code
>>>>>> readability (but in different ways for different reasons). In other
>>>>>> words, I agree that function references is *the* first-order solution if
>>>>>> the nested code does indeed become a problem.
>>>>>> However, the history of JS also tells us that function references aren't
>>>>>> the end of the story either, and you can see that by observing that
>>>>>> there have been two follow-on eras, as they continue trying to cope with
>>>>>> the consequences of living in such a callback-heavy language. First, you
>>>>>> have Futures/Promises, which essentially let you convert nested code to
>>>>>> method-chained code (Observables/FP is a popular variation on this).
>>>>>> Most lately, you have async/await, which is an effort to apply language
>>>>>> (not just API) syntax to the problem, and offer the "flattest" possible
>>>>>> programming style to solve the problem (because you get back to just one
>>>>>> code block per functional unit).
>>>>>> Stream-processing is a different domain, and Java+KStreams is nowhere
>>>>>> near as callback heavy as JS, so I don't think we have to take the JS
>>>>>> story for granted, but then again, I think we can derive some valuable
>>>>>> lessons by looking sideways to adjacent domains. I'm just bringing this
>>>>>> up to inspire further/deeper discussion. At the same time, just like JS,
>>>>>> we can afford to take an iterative approach to the problem.
>>>>>> Separately again, I'm interested in the post-branch merge (and I'd also
>>>>>> add join) problem that Paul brought up. We can clearly punt on it, by
>>>>>> terminating the nested branches with sink operators. But is there a DSL
>>>>>> way to do it?
>>>>>> Thanks again for your driving this,
>>>>>> -John
>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com
>>>>>> <mailto:pgwha...@gmail.com>> wrote:
>>>>>>     Ivan, I’ll definitely forfeit my point on the clumsiness of the
>>>>>>     branch(predicate, consumer) solution, I don’t see any real drawbacks
>>>>>>     for the dynamic case.
>>>>>>     IMO the one trade off to consider at this point is the scope
>>>>>>     question. I don’t know if I totally agree that “we rarely need them
>>>>>>     in the same scope” since merging the branches back together later
>>>>>>     seems like a perfectly plausible use case that can be a lot nicer
>>>>>>     when the branched streams are in the same scope. That being said,
>>>>>>     for the reasons Ivan listed, I think it is overall the better
>>>>>>     solution - working around the scope thing is easy enough if you need
>>>>>>     to.
>>>>>>     > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
>>>>>>     <iponoma...@mail.ru.invalid> wrote:
>>>>>>     >
>>>>>>     > Hello everyone, thank you all for joining the discussion!
>>>>>>     >
>>>>>>     > Well, I don't think the idea of named branches, be it a
>>>>>>     LinkedHashMap (no other Map will do, because order of definition
>>>>>>     matters) or `branch` method  taking name and Consumer has more
>>>>>>     advantages than drawbacks.
>>>>>>     >
>>>>>>     > In my opinion, the only real positive outcome from Michael's
>>>>>>     proposal is that all the returned branches are in the same scope.
>>>>>>     But 1) we rarely need them in the same scope 2) there is a
>>>>>>     workaround for the scope problem, described in the KIP.
>>>>>>     >
>>>>>>     > 'Inlining the complex logic' is not a problem, because we can use
>>>>>>     method references instead of lambdas. In real world scenarios you
>>>>>>     tend to split the complex logic to methods anyway, so the code is
>>>>>>     going to be clean.
>>>>>>     >
>>>>>>     > The drawbacks are strong. The cohesion between predicates and
>>>>>>     handlers is lost. We have to define predicates in one place, and
>>>>>>     handlers in another. This opens the door for bugs:
>>>>>>     >
>>>>>>     > - what if we forget to define a handler for a name? or a name for
>>>>>>     a handler?
>>>>>>     > - what if we misspell a name?
>>>>>>     > - what if we copy-paste and duplicate a name?
>>>>>>     >
>>>>>>     > What Michael propose would have been totally OK if we had been
>>>>>>     writing the API in Lua, Ruby or Python. In those languages the
>>>>>>     "dynamic naming" approach would have looked most concise and
>>>>>>     beautiful. But in Java we expect all the problems related to
>>>>>>     identifiers to be eliminated in compile time.
>>>>>>     >
>>>>>>     > Do we have to invent duck-typing for the Java API?
>>>>>>     >
>>>>>>     > And if we do, what advantage are we supposed to get besides having
>>>>>>     all the branches in the same scope? Michael, maybe I'm missing your
>>>>>>     point?
>>>>>>     >
>>>>>>     > ---
>>>>>>     >
>>>>>>     > Earlier in this discussion John Roesler also proposed to do
>>>>>>     without "start branching" operator, and later Paul mentioned that in
>>>>>>     the case when we have to add a dynamic number of branches, the
>>>>>>     current KIP is 'clumsier' compared to Michael's 'Map' solution. Let
>>>>>>     me address both comments here.
>>>>>>     >
>>>>>>     > 1) "Start branching" operator (I think that *split* is a good name
>>>>>>     for it indeed) is critical when we need to do a dynamic branching,
>>>>>>     see example below.
>>>>>>     >
>>>>>>     > 2) No, dynamic branching in current KIP is not clumsy at all.
>>>>>>     Imagine a real-world scenario when you need one branch per enum
>>>>>>     value (say, RecordType). You can have something like this:
>>>>>>     >
>>>>>>     > /*John:if we had to start with stream.branch(...) here, it would
>>>>>>     have been much messier.*/
>>>>>>     > KBranchedStream branched = stream.split();
>>>>>>     >
>>>>>>     > /*Not clumsy at all :-)*/
>>>>>>     > for (RecordType recordType : RecordType.values())
>>>>>>     >             branched = branched.branch((k, v) -> v.getRecType() ==
>>>>>>     recordType,
>>>>>>     >                     recordType::processRecords);
>>>>>>     >
>>>>>>     > Regards,
>>>>>>     >
>>>>>>     > Ivan
>>>>>>     >
>>>>>>     >
>>>>>>     > 02.05.2019 14:40, Matthias J. Sax пишет:
>>>>>>     >> I also agree with Michael's observation about the core problem of
>>>>>>     >> current `branch()` implementation.
>>>>>>     >>
>>>>>>     >> However, I also don't like to pass in a clumsy Map object. My
>>>>>>     thinking
>>>>>>     >> was more aligned with Paul's proposal to just add a name to each
>>>>>>     >> `branch()` statement and return a `Map<String,KStream>`.
>>>>>>     >>
>>>>>>     >> It makes the code easier to read, and also make the order of
>>>>>>     >> `Predicates` (that is essential) easier to grasp.
>>>>>>     >>
>>>>>>     >>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>>     >>>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>>     >>>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>>     >>>>>>    .defaultBranch("defaultBranch");
>>>>>>     >> An open question is the case for which no defaultBranch() should
>>>>> be
>>>>>>     >> specified. Atm, `split()` and `branch()` would return
>>>>>>     `BranchedKStream`
>>>>>>     >> and the call to `defaultBranch()` that returns the `Map` is
>>>>> mandatory
>>>>>>     >> (what is not the case atm). Or is this actually not a real
>>>>> problem,
>>>>>>     >> because users can just ignore the branch returned by
>>>>>>     `defaultBranch()`
>>>>>>     >> in the result `Map` ?
>>>>>>     >>
>>>>>>     >>
>>>>>>     >> About "inlining": So far, it seems to be a matter of personal
>>>>>>     >> preference. I can see arguments for both, but no "killer
>>>>>>     argument" yet
>>>>>>     >> that clearly make the case for one or the other.
>>>>>>     >>
>>>>>>     >>
>>>>>>     >> -Matthias
>>>>>>     >>
>>>>>>     >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>>>>>     >>> Perhaps inlining is the wrong terminology. It doesn’t require
>>>>>>     that a lambda with the full downstream topology be defined inline -
>>>>>>     it can be a method reference as with Ivan’s original suggestion.
>>>>>>     The advantage of putting the predicate and its downstream logic
>>>>>>     (Consumer) together in branch() is that they are required to be near
>>>>>>     to each other.
>>>>>>     >>>
>>>>>>     >>> Ultimately the downstream code has to live somewhere, and deep
>>>>>>     branch trees will be hard to read regardless.
>>>>>>     >>>
>>>>>>     >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
>>>>>>     <michael.droga...@confluent.io
>>>>>>     <mailto:michael.droga...@confluent.io>> wrote:
>>>>>>     >>>>
>>>>>>     >>>> I'm less enthusiastic about inlining the branch logic with its
>>>>>>     downstream
>>>>>>     >>>> functionality. Programs that have deep branch trees will
>>>>>>     quickly become
>>>>>>     >>>> harder to read as a single unit.
>>>>>>     >>>>
>>>>>>     >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
>>>>>>     <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote:
>>>>>>     >>>>>
>>>>>>     >>>>> Also +1 on the issues/goals as Michael outlined them, I think
>>>>>>     that sets a
>>>>>>     >>>>> great framework for the discussion.
>>>>>>     >>>>>
>>>>>>     >>>>> Regarding the SortedMap solution, my understanding is that the
>>>>>>     current
>>>>>>     >>>>> proposal in the KIP is what is in my PR which (pending naming
>>>>>>     decisions) is
>>>>>>     >>>>> roughly this:
>>>>>>     >>>>>
>>>>>>     >>>>> stream.split()
>>>>>>     >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>>     >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>>     >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
>>>>>>     >>>>>
>>>>>>     >>>>> Obviously some ordering is necessary, since branching as a
>>>>>>     construct
>>>>>>     >>>>> doesn't work without it, but this solution seems like it
>>>>>>     provides as much
>>>>>>     >>>>> associativity as the SortedMap solution, because each branch()
>>>>>>     call
>>>>>>     >>>>> directly associates the "conditional" with the "code block."
>>>>>>     The value it
>>>>>>     >>>>> provides over the KIP solution is the accessing of streams in
>>>>>>     the same
>>>>>>     >>>>> scope.
>>>>>>     >>>>>
>>>>>>     >>>>> The KIP solution is less "dynamic" than the SortedMap solution
>>>>>>     in the sense
>>>>>>     >>>>> that it is slightly clumsier to add a dynamic number of
>>>>>>     branches, but it is
>>>>>>     >>>>> certainly possible.  It seems to me like the API should favor
>>>>>>     the "static"
>>>>>>     >>>>> case anyway, and should make it simple and readable to
>>>>>>     fluently declare and
>>>>>>     >>>>> access your branches in-line.  It also makes it impossible to
>>>>>>     ignore a
>>>>>>     >>>>> branch, and it is possible to build an (almost) identical
>>>>>>     SortedMap
>>>>>>     >>>>> solution on top of it.
>>>>>>     >>>>>
>>>>>>     >>>>> I could also see a middle ground where instead of a raw
>>>>>>     SortedMap being
>>>>>>     >>>>> taken in, branch() takes a name and not a Consumer.  Something
>>>>>>     like this:
>>>>>>     >>>>>
>>>>>>     >>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>>     >>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>>     >>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>>     >>>>>    .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
>>>>>>     >>>>>
>>>>>>     >>>>> Pros for that solution:
>>>>>>     >>>>> - accessing branched KStreams in same scope
>>>>>>     >>>>> - no double brace initialization, hopefully slightly more
>>>>>>     readable than
>>>>>>     >>>>> SortedMap
>>>>>>     >>>>>
>>>>>>     >>>>> Cons
>>>>>>     >>>>> - downstream branch logic cannot be specified inline which
>>>>>>     makes it harder
>>>>>>     >>>>> to read top to bottom (like existing API and SortedMap, but
>>>>>>     unlike the KIP)
>>>>>>     >>>>> - you can forget to "handle" one of the branched streams (like
>>>>>>     existing
>>>>>>     >>>>> API and SortedMap, but unlike the KIP)
>>>>>>     >>>>>
>>>>>>     >>>>> (KBranchedStreams could even work *both* ways but perhaps
>>>>>>     that's overdoing
>>>>>>     >>>>> it).
>>>>>>     >>>>>
>>>>>>     >>>>> Overall I'm curious how important it is to be able to easily
>>>>>>     access the
>>>>>>     >>>>> branched KStream in the same scope as the original.  It's
>>>>>>     possible that it
>>>>>>     >>>>> doesn't need to be handled directly by the API, but instead
>>>>>>     left up to the
>>>>>>     >>>>> user.  I'm sort of in the middle on it.
>>>>>>     >>>>>
>>>>>>     >>>>> Paul
>>>>>>     >>>>>
>>>>>>     >>>>>
>>>>>>     >>>>>
>>>>>>     >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
>>>>>>     <sop...@confluent.io <mailto:sop...@confluent.io>>
>>>>>>     >>>>> wrote:
>>>>>>     >>>>>
>>>>>>     >>>>>> I'd like to +1 what Michael said about the issues with the
>>>>>>     existing
>>>>>>     >>>>> branch
>>>>>>     >>>>>> method, I agree with what he's outlined and I think we should
>>>>>>     proceed by
>>>>>>     >>>>>> trying to alleviate these problems. Specifically it seems
>>>>>>     important to be
>>>>>>     >>>>>> able to cleanly access the individual branches (eg by mapping
>>>>>>     >>>>>> name->stream), which I thought was the original intention of
>>>>>>     this KIP.
>>>>>>     >>>>>>
>>>>>>     >>>>>> That said, I don't think we should so easily give in to the
>>>>>>     double brace
>>>>>>     >>>>>> anti-pattern or force ours users into it if at all possible to
>>>>>>     >>>>> avoid...just
>>>>>>     >>>>>> my two cents.
>>>>>>     >>>>>>
>>>>>>     >>>>>> Cheers,
>>>>>>     >>>>>> Sophie
>>>>>>     >>>>>>
>>>>>>     >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>>>>     >>>>>> michael.droga...@confluent.io
>>>>>>     <mailto:michael.droga...@confluent.io>> wrote:
>>>>>>     >>>>>>
>>>>>>     >>>>>>> I’d like to propose a different way of thinking about this.
>>>>>>     To me,
>>>>>>     >>>>> there
>>>>>>     >>>>>>> are three problems with the existing branch signature:
>>>>>>     >>>>>>>
>>>>>>     >>>>>>> 1. If you use it the way most people do, Java raises unsafe
>>>>> type
>>>>>>     >>>>>> warnings.
>>>>>>     >>>>>>> 2. The way in which you use the stream branches is
>>>>>>     positionally coupled
>>>>>>     >>>>>> to
>>>>>>     >>>>>>> the ordering of the conditionals.
>>>>>>     >>>>>>> 3. It is brittle to extend existing branch calls with
>>>>>>     additional code
>>>>>>     >>>>>>> paths.
>>>>>>     >>>>>>>
>>>>>>     >>>>>>> Using associative constructs instead of relying on ordered
>>>>>>     constructs
>>>>>>     >>>>>> would
>>>>>>     >>>>>>> be a stronger approach. Consider a signature that instead
>>>>>>     looks like
>>>>>>     >>>>>> this:
>>>>>>     >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String,
>>>>>>     Predicate<?
>>>>>>     >>>>>>> super K,? super V>>);
>>>>>>     >>>>>>>
>>>>>>     >>>>>>> Branches are given names in a map, and as a result, the API
>>>>>>     returns a
>>>>>>     >>>>>>> mapping of names to streams. The ordering of the
>>>>> conditionals is
>>>>>>     >>>>>> maintained
>>>>>>     >>>>>>> because it’s a sorted map. Insert order determines the order
>>>>> of
>>>>>>     >>>>>> evaluation.
>>>>>>     >>>>>>> This solves problem 1 because there are no more varargs. It
>>>>>>     solves
>>>>>>     >>>>>> problem
>>>>>>     >>>>>>> 2 because you no longer lean on ordering to access the
>>>>>>     branch you’re
>>>>>>     >>>>>>> interested in. It solves problem 3 because you can introduce
>>>>>>     another
>>>>>>     >>>>>>> conditional by simply attaching another name to the
>>>>>>     structure, rather
>>>>>>     >>>>>> than
>>>>>>     >>>>>>> messing with the existing indices.
>>>>>>     >>>>>>>
>>>>>>     >>>>>>> One of the drawbacks is that creating the map inline is
>>>>>>     historically
>>>>>>     >>>>>>> awkward in Java. I know it’s an anti-pattern to use
>>>>>>     voluminously, but
>>>>>>     >>>>>>> double brace initialization would clean up the aesthetics.
>>>>>>     >>>>>>>
>>>>>>     >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
>>>>>>     <j...@confluent.io <mailto:j...@confluent.io>>
>>>>>>     >>>>> wrote:
>>>>>>     >>>>>>>> Hi Ivan,
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>> Thanks for the update.
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>> FWIW, I agree with Matthias that the current "start
>>>>> branching"
>>>>>>     >>>>> operator
>>>>>>     >>>>>>> is
>>>>>>     >>>>>>>> confusing when named the same way as the actual branches.
>>>>>>     "Split"
>>>>>>     >>>>> seems
>>>>>>     >>>>>>>> like a good name. Alternatively, we can do without a "start
>>>>>>     >>>>> branching"
>>>>>>     >>>>>>>> operator at all, and just do:
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>> stream
>>>>>>     >>>>>>>>      .branch(Predicate)
>>>>>>     >>>>>>>>      .branch(Predicate)
>>>>>>     >>>>>>>>      .defaultBranch();
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>> Tentatively, I think that this branching operation should be
>>>>>>     >>>>> terminal.
>>>>>>     >>>>>>> That
>>>>>>     >>>>>>>> way, we don't create ambiguity about how to use it. That
>>>>>>     is, `branch`
>>>>>>     >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is
>>>>>>     `void`, to
>>>>>>     >>>>>>>> enforce that it comes last, and that there is only one
>>>>>>     definition of
>>>>>>     >>>>>> the
>>>>>>     >>>>>>>> default branch. Potentially, we should log a warning if
>>>>>>     there's no
>>>>>>     >>>>>>> default,
>>>>>>     >>>>>>>> and additionally log a warning (or throw an exception) if a
>>>>>>     record
>>>>>>     >>>>>> falls
>>>>>>     >>>>>>>> though with no default.
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>> Thoughts?
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>> Thanks,
>>>>>>     >>>>>>>> -John
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>>>>>     >>>>> matth...@confluent.io <mailto:matth...@confluent.io>
>>>>>>     >>>>>>>> wrote:
>>>>>>     >>>>>>>>
>>>>>>     >>>>>>>>> Thanks for updating the KIP and your answers.
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>> this is to make the name similar to String#split
>>>>>>     >>>>>>>>>>> that also returns an array, right?
>>>>>>     >>>>>>>>> The intend was to avoid name duplication. The return type
>>>>>>     should
>>>>>>     >>>>>> _not_
>>>>>>     >>>>>>>>> be an array.
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> The current proposal is
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> stream.branch()
>>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>>     >>>>>>>>>      .defaultBranch();
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> IMHO, this reads a little odd, because the first
>>>>>>     `branch()` does
>>>>>>     >>>>> not
>>>>>>     >>>>>>>>> take any parameters and has different semantics than the
>>>>> later
>>>>>>     >>>>>>>>> `branch()` calls. Note, that from the code snippet above,
>>>>> it's
>>>>>>     >>>>> hidden
>>>>>>     >>>>>>>>> that the first call is `KStream#branch()` while the others
>>>>> are
>>>>>>     >>>>>>>>> `KBranchedStream#branch()` what makes reading the code
>>>>> harder.
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`,
>>>>>>     I though
>>>>>>     >>>>>> it
>>>>>>     >>>>>>>>> might be better to also rename `KStream#branch()` to avoid
>>>>> the
>>>>>>     >>>>> naming
>>>>>>     >>>>>>>>> overlap that seems to be confusing. The following reads
>>>>> much
>>>>>>     >>>>> cleaner
>>>>>>     >>>>>> to
>>>>>>     >>>>>>>> me:
>>>>>>     >>>>>>>>> stream.split()
>>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>>     >>>>>>>>>      .branch(Predicate)
>>>>>>     >>>>>>>>>      .defaultBranch();
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> Maybe there is a better alternative to `split()` though to
>>>>>>     avoid
>>>>>>     >>>>> the
>>>>>>     >>>>>>>>> naming overlap.
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately
>>>>> we
>>>>>>     >>>>> cannot
>>>>>>     >>>>>>> have
>>>>>>     >>>>>>>>> a method with such name :-)
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up
>>>>>>     with a
>>>>>>     >>>>> short
>>>>>>     >>>>>>>> name?
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP
>>>>>>     with all
>>>>>>     >>>>> it's
>>>>>>     >>>>>>>>> methods? It will be part of public API and should be
>>>>>>     contained in
>>>>>>     >>>>> the
>>>>>>     >>>>>>>>> KIP. For example, it's unclear atm, what the return type of
>>>>>>     >>>>>>>>> `defaultBranch()` is.
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> You did not comment on the idea to add a
>>>>>>     `KBranchedStream#get(int
>>>>>>     >>>>>>> index)
>>>>>>     >>>>>>>>> -> KStream` method to get the individually
>>>>>>     branched-KStreams. Would
>>>>>>     >>>>>> be
>>>>>>     >>>>>>>>> nice to get your feedback about it. It seems you suggest
>>>>>>     that users
>>>>>>     >>>>>>>>> would need to write custom utility code otherwise, to
>>>>>>     access them.
>>>>>>     >>>>> We
>>>>>>     >>>>>>>>> should discuss the pros and cons of both approaches. It
>>>>> feels
>>>>>>     >>>>>>>>> "incomplete" to me atm, if the API has no built-in support
>>>>>>     to get
>>>>>>     >>>>> the
>>>>>>     >>>>>>>>> branched-KStreams directly.
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>> -Matthias
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>
>>>>>>     >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>>>     >>>>>>>>>> Hi all!
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> I have updated the KIP-418 according to the new vision.
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> Matthias, thanks for your comment!
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>>>     >>>>>>>>>> I can see your point: this is to make the name similar to
>>>>>>     >>>>>>> String#split
>>>>>>     >>>>>>>>>> that also returns an array, right? But is it worth the
>>>>>>     loss of
>>>>>>     >>>>>>>> backwards
>>>>>>     >>>>>>>>>> compatibility? We can have overloaded branch() as well
>>>>>>     without
>>>>>>     >>>>>>>> affecting
>>>>>>     >>>>>>>>>> the existing code. Maybe the old array-based `branch`
>>>>> method
>>>>>>     >>>>> should
>>>>>>     >>>>>>> be
>>>>>>     >>>>>>>>>> deprecated, but this is a subject for discussion.
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>>>>>     >>>>> BranchingKStream#branch(),
>>>>>>     >>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>> BranchingKStream#default()
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default'
>>>>> is,
>>>>>>     >>>>>>> however, a
>>>>>>     >>>>>>>>>> reserved word, so unfortunately we cannot have a method
>>>>>>     with such
>>>>>>     >>>>>>> name
>>>>>>     >>>>>>>>> :-)
>>>>>>     >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument,
>>>>> but I
>>>>>>     >>>>> think
>>>>>>     >>>>>>> that
>>>>>>     >>>>>>>>>> is not required?
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> Absolutely! I think that was just copy-paste error or
>>>>>>     something.
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> Dear colleagues,
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> please revise the new version of the KIP and Paul's PR
>>>>>>     >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> Any new suggestions/objections?
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> Ivan
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>>
>>>>>>     >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>>>     >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems
>>>>> that
>>>>>>     >>>>>>> everybody
>>>>>>     >>>>>>>>>>> agrees that the current branch() method using arrays is
>>>>> not
>>>>>>     >>>>>> optimal.
>>>>>>     >>>>>>>>>>> I had a quick look into the PR and I like the overall
>>>>>>     proposal.
>>>>>>     >>>>>>> There
>>>>>>     >>>>>>>>>>> are some minor things we need to consider. I would
>>>>>>     recommend the
>>>>>>     >>>>>>>>>>> following renaming:
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>> KStream#branch() -> #split()
>>>>>>     >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
>>>>>>     >>>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>>>     BranchingKStream#default()
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>> It's just a suggestion to get slightly shorter method
>>>>> names.
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>> In the current PR, defaultBranch() does take an
>>>>>>     `Predicate` as
>>>>>>     >>>>>>>> argument,
>>>>>>     >>>>>>>>>>> but I think that is not required?
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>> Also, we should consider KIP-307, that was recently
>>>>>>     accepted and
>>>>>>     >>>>>> is
>>>>>>     >>>>>>>>>>> currently implemented:
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>     >>>>>>>>>>> Ie, we should add overloads that accepted a `Named`
>>>>>>     parameter.
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>> For the issue that the created `KStream` object are in
>>>>>>     different
>>>>>>     >>>>>>>> scopes:
>>>>>>     >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
>>>>>>     index)` method
>>>>>>     >>>>>>> that
>>>>>>     >>>>>>>>>>> returns the corresponding "branched" result `KStream`
>>>>>>     object?
>>>>>>     >>>>>> Maybe,
>>>>>>     >>>>>>>> the
>>>>>>     >>>>>>>>>>> second argument of `addBranch()` should not be a
>>>>>>     >>>>>> `Consumer<KStream>`
>>>>>>     >>>>>>>> but
>>>>>>     >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return
>>>>>>     whatever
>>>>>>     >>>>>> the
>>>>>>     >>>>>>>>>>> `Function` returns?
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>> Finally, I would also suggest to update the KIP with the
>>>>>>     current
>>>>>>     >>>>>>>>>>> proposal. That makes it easier to review.
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>> -Matthias
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>>
>>>>>>     >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>>>     >>>>>>>>>>>> Ivan,
>>>>>>     >>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it
>>>>>>     makes sense
>>>>>>     >>>>>> for
>>>>>>     >>>>>>>> you
>>>>>>     >>>>>>>>> to
>>>>>>     >>>>>>>>>>>> revise the KIP and continue the discussion.  Obviously
>>>>>>     we'll
>>>>>>     >>>>> need
>>>>>>     >>>>>>>> some
>>>>>>     >>>>>>>>>>>> buy-in from committers that have actual binding votes on
>>>>>>     >>>>> whether
>>>>>>     >>>>>>> the
>>>>>>     >>>>>>>>> KIP
>>>>>>     >>>>>>>>>>>> could be adopted.  It would be great to hear if they
>>>>>>     think this
>>>>>>     >>>>>> is
>>>>>>     >>>>>>> a
>>>>>>     >>>>>>>>> good
>>>>>>     >>>>>>>>>>>> idea overall.  I'm not sure if that happens just by
>>>>>>     starting a
>>>>>>     >>>>>>> vote,
>>>>>>     >>>>>>>>> or if
>>>>>>     >>>>>>>>>>>> there is generally some indication of interest
>>>>> beforehand.
>>>>>>     >>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>> That being said, I'll continue the discussion a bit:
>>>>>>     assuming
>>>>>>     >>>>> we
>>>>>>     >>>>>> do
>>>>>>     >>>>>>>>> move
>>>>>>     >>>>>>>>>>>> forward the solution of "stream.branch() returns
>>>>>>     >>>>>> KBranchedStream",
>>>>>>     >>>>>>> do
>>>>>>     >>>>>>>>> we
>>>>>>     >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"?  I
>>>>> would
>>>>>>     >>>>> favor
>>>>>>     >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs
>>>>> that
>>>>>>     >>>>>>> accomplish
>>>>>>     >>>>>>>>> the
>>>>>>     >>>>>>>>>>>> same thing is confusing, especially when they're fairly
>>>>>>     similar
>>>>>>     >>>>>>>>> anyway.  We
>>>>>>     >>>>>>>>>>>> just need to be sure we're not making something
>>>>>>     >>>>>>> impossible/difficult
>>>>>>     >>>>>>>>> that
>>>>>>     >>>>>>>>>>>> is currently possible/easy.
>>>>>>     >>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>> Regarding my PR - I think the general structure would
>>>>> work,
>>>>>>     >>>>> it's
>>>>>>     >>>>>>>> just a
>>>>>>     >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
>>>>>>     >>>>>>> particular,
>>>>>>     >>>>>>>>>>>> passing in the "predicates" and "children" lists which
>>>>> get
>>>>>>     >>>>>> modified
>>>>>>     >>>>>>>> in
>>>>>>     >>>>>>>>>>>> KBranchedStream but read from all the way
>>>>>>     KStreamLazyBranch is
>>>>>>     >>>>> a
>>>>>>     >>>>>>> bit
>>>>>>     >>>>>>>>>>>> complicated to follow.
>>>>>>     >>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>> Thanks,
>>>>>>     >>>>>>>>>>>> Paul
>>>>>>     >>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
>>>>>>     >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>     >>>>>>>>> wrote:
>>>>>>     >>>>>>>>>>>>> Hi Paul!
>>>>>>     >>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>> I read your code carefully and now I am fully
>>>>>>     convinced: your
>>>>>>     >>>>>>>> proposal
>>>>>>     >>>>>>>>>>>>> looks better and should work. We just have to document
>>>>> the
>>>>>>     >>>>>> crucial
>>>>>>     >>>>>>>>> fact
>>>>>>     >>>>>>>>>>>>> that KStream consumers are invoked as they're added.
>>>>>>     And then
>>>>>>     >>>>>> it's
>>>>>>     >>>>>>>> all
>>>>>>     >>>>>>>>>>>>> going to be very nice.
>>>>>>     >>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and
>>>>>>     resume the
>>>>>>     >>>>>>>>>>>>> discussion here, right?
>>>>>>     >>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
>>>>>>     >>>>> starting
>>>>>>     >>>>>>>> point
>>>>>>     >>>>>>>>> if
>>>>>>     >>>>>>>>>>>>> we go in this direction'? To me it looks like a good
>>>>>>     starting
>>>>>>     >>>>>>> point.
>>>>>>     >>>>>>>>> But
>>>>>>     >>>>>>>>>>>>> as a novice in this project I might miss some important
>>>>>>     >>>>> details.
>>>>>>     >>>>>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>> Ivan
>>>>>>     >>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>>>     >>>>>>>>>>>>>> Ivan,
>>>>>>     >>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
>>>>>>     >>>>> stream.branch()
>>>>>>     >>>>>>>>> solution
>>>>>>     >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
>>>>>>     >>>>> invoked
>>>>>>     >>>>>> as
>>>>>>     >>>>>>>>> they’re
>>>>>>     >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user
>>>>>>     still
>>>>>>     >>>>>> ought
>>>>>>     >>>>>>> to
>>>>>>     >>>>>>>>> be
>>>>>>     >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and
>>>>>>     depend on
>>>>>>     >>>>> the
>>>>>>     >>>>>>>>> branched
>>>>>>     >>>>>>>>>>>>> streams having been set.
>>>>>>     >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to
>>>>>>     access
>>>>>>     >>>>> the
>>>>>>     >>>>>>>>> branched
>>>>>>     >>>>>>>>>>>>> streams in the same scope as the original stream (that
>>>>>>     is, not
>>>>>>     >>>>>>>> inside
>>>>>>     >>>>>>>>> the
>>>>>>     >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
>>>>>>     >>>>> solutions.
>>>>>>     >>>>>> It
>>>>>>     >>>>>>>>> can be
>>>>>>     >>>>>>>>>>>>> worked around though.
>>>>>>     >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m
>>>>>>     excited
>>>>>>     >>>>> to
>>>>>>     >>>>>>>> hear
>>>>>>     >>>>>>>>>>>>> your thoughts!]
>>>>>>     >>>>>>>>>>>>>> Paul
>>>>>>     >>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
>>>>>>     >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>     >>>>>>>>> wrote:
>>>>>>     >>>>>>>>>>>>>>> Hi Paul!
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
>>>>>>     >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at
>>>>> first
>>>>>>     >>>>> glance,
>>>>>>     >>>>>>> but
>>>>>>     >>>>>>>>> ---
>>>>>>     >>>>>>>>>>>>>>>> the newly branched streams are not available in the
>>>>>>     same
>>>>>>     >>>>>> scope
>>>>>>     >>>>>>> as
>>>>>>     >>>>>>>>> each
>>>>>>     >>>>>>>>>>>>> other.  That is, if we wanted to merge them back
>>>>> together
>>>>>>     >>>>> again
>>>>>>     >>>>>> I
>>>>>>     >>>>>>>>> don't see
>>>>>>     >>>>>>>>>>>>> a way to do that.
>>>>>>     >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was
>>>>>>     just
>>>>>>     >>>>>> going
>>>>>>     >>>>>>> to
>>>>>>     >>>>>>>>>>>>> write in details about this issue.
>>>>>>     >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say
>>>>>>     we need
>>>>>>     >>>>> to
>>>>>>     >>>>>>>>> identify
>>>>>>     >>>>>>>>>>>>> customers who have bought coffee and made a purchase
>>>>>>     in the
>>>>>>     >>>>>>>>> electronics
>>>>>>     >>>>>>>>>>>>> store to give them coupons.
>>>>>>     >>>>>>>>>>>>>>> This is the code I usually write under these
>>>>>>     circumstances
>>>>>>     >>>>>> using
>>>>>>     >>>>>>>> my
>>>>>>     >>>>>>>>>>>>> 'brancher' class:
>>>>>>     >>>>>>>>>>>>>>> @Setter
>>>>>>     >>>>>>>>>>>>>>> class CouponIssuer{
>>>>>>     >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
>>>>>>     >>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>   KStream<...> coupons(){
>>>>>>     >>>>>>>>>>>>>>>       return
>>>>>>     >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>>>     >>>>>>>>>>>>>>>       /*In the real world the code here can be
>>>>>>     complex, so
>>>>>>     >>>>>>>>> creation of
>>>>>>     >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in
>>>>>>     order to
>>>>>>     >>>>>>>> separate
>>>>>>     >>>>>>>>>>>>> classes' responsibilities.*/
>>>>>>     >>>>>>>>>>>>>>>  }
>>>>>>     >>>>>>>>>>>>>>> }
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>>>     >>>>>>>>>>>>>>>     .branch(predicate1,
>>>>> couponIssuer::setCoffePurchases)
>>>>>>     >>>>>>>>>>>>>>>     .branch(predicate2,
>>>>>>     >>>>>> couponIssuer::setElectronicsPurchases)
>>>>>>     >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up
>>>>>>     everything
>>>>>>     >>>>>>>> later,
>>>>>>     >>>>>>>>>>>>> without the terminal operation!!!*/
>>>>>>     >>>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> Does this make sense?  In order to properly
>>>>>>     initialize the
>>>>>>     >>>>>>>>> CouponIssuer
>>>>>>     >>>>>>>>>>>>> we need the terminal operation to be called before
>>>>>>     >>>>>>>>> streamsBuilder.build()
>>>>>>     >>>>>>>>>>>>> is called.
>>>>>>     >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is
>>>>>>     essentially
>>>>>>     >>>>>> the
>>>>>>     >>>>>>>>> next
>>>>>>     >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts
>>>>>>     based on
>>>>>>     >>>>> my
>>>>>>     >>>>>>>>> experience,
>>>>>>     >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
>>>>>>     >>>>>>>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> Ivan
>>>>>>     >>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>>>     >>>>>>>>>>>>>>>> Ivan,
>>>>>>     >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a
>>>>>>     fluent
>>>>>>     >>>>> API
>>>>>>     >>>>>>>> based
>>>>>>     >>>>>>>>>>>>> off of
>>>>>>     >>>>>>>>>>>>>>>> KStream here
>>>>>>     (https://github.com/apache/kafka/pull/6512),
>>>>>>     >>>>>> and
>>>>>>     >>>>>>> I
>>>>>>     >>>>>>>>> think
>>>>>>     >>>>>>>>>>>>> I
>>>>>>     >>>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>>>     >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect earlier about
>>>>>>     >>>>>>> compatibility
>>>>>>     >>>>>>>>>>>>> issues,
>>>>>>     >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was unaware
>>>>>>     that Java
>>>>>>     >>>>> is
>>>>>>     >>>>>>>> smart
>>>>>>     >>>>>>>>>>>>> enough to
>>>>>>     >>>>>>>>>>>>>>>>    distinguish between a branch(varargs...)
>>>>>>     returning one
>>>>>>     >>>>>>> thing
>>>>>>     >>>>>>>>> and
>>>>>>     >>>>>>>>>>>>> branch()
>>>>>>     >>>>>>>>>>>>>>>>    with no arguments returning another thing.
>>>>>>     >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't actually
>>>>>>     need
>>>>>>     >>>>> it.
>>>>>>     >>>>>>> We
>>>>>>     >>>>>>>>> can
>>>>>>     >>>>>>>>>>>>> just
>>>>>>     >>>>>>>>>>>>>>>>    build up the branches in the KBranchedStream who
>>>>>>     shares
>>>>>>     >>>>>> its
>>>>>>     >>>>>>>>> state
>>>>>>     >>>>>>>>>>>>> with the
>>>>>>     >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do the
>>>>>>     branching.
>>>>>>     >>>>>>> It's
>>>>>>     >>>>>>>>> not
>>>>>>     >>>>>>>>>>>>> terribly
>>>>>>     >>>>>>>>>>>>>>>>    pretty in its current form, but I think it
>>>>>>     demonstrates
>>>>>>     >>>>>> its
>>>>>>     >>>>>>>>>>>>> feasibility.
>>>>>>     >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should
>>>>> be
>>>>>>     >>>>> final
>>>>>>     >>>>>> or
>>>>>>     >>>>>>>>> even a
>>>>>>     >>>>>>>>>>>>>>>> starting point if we go in this direction, I just
>>>>>>     wanted to
>>>>>>     >>>>>> see
>>>>>>     >>>>>>>> how
>>>>>>     >>>>>>>>>>>>>>>> challenging it would be to get the API working.
>>>>>>     >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing
>>>>>>     solution
>>>>>>     >>>>>>> could
>>>>>>     >>>>>>>> be
>>>>>>     >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
>>>>>>     >>>>> suggested
>>>>>>     >>>>>>>> was a
>>>>>>     >>>>>>>>>>>>>>>> possibility.  The reason is that the newly branched
>>>>>>     streams
>>>>>>     >>>>>> are
>>>>>>     >>>>>>>> not
>>>>>>     >>>>>>>>>>>>>>>> available in the same scope as each other.  That
>>>>>>     is, if we
>>>>>>     >>>>>>> wanted
>>>>>>     >>>>>>>>> to
>>>>>>     >>>>>>>>>>>>> merge
>>>>>>     >>>>>>>>>>>>>>>> them back together again I don't see a way to do
>>>>>>     that.  The
>>>>>>     >>>>>> KIP
>>>>>>     >>>>>>>>>>>>> proposal
>>>>>>     >>>>>>>>>>>>>>>> has the same issue, though - all this means is that
>>>>> for
>>>>>>     >>>>>> either
>>>>>>     >>>>>>>>>>>>> solution,
>>>>>>     >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the
>>>>>>     table.
>>>>>>     >>>>>>>>>>>>>>>> Thanks,
>>>>>>     >>>>>>>>>>>>>>>> Paul
>>>>>>     >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
>>>>>>     >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>>
>>>>>>     >>>>>>>>>>>>> wrote:
>>>>>>     >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to
>>>>> this
>>>>>>     >>>>>> point.
>>>>>>     >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that
>>>>>>     branch API
>>>>>>     >>>>>>> needs
>>>>>>     >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
>>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
>>>>>>     >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
>>>>>>     >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...).... //onTopOf
>>>>>>     returns
>>>>>>     >>>>>> its
>>>>>>     >>>>>>>>> argument
>>>>>>     >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code
>>>>> won't
>>>>>>     >>>>> make
>>>>>>     >>>>>>>> sense
>>>>>>     >>>>>>>>>>>>> until
>>>>>>     >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher
>>>>>>     instance
>>>>>>     >>>>>>>>> contrasts the
>>>>>>     >>>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> stream
>>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
>>>>>>     >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
>>>>>>     >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or noDefault(). Both
>>>>>>     >>>>>>>>> defaultBranch(..)
>>>>>>     >>>>>>>>>>>>> and
>>>>>>     >>>>>>>>>>>>>>>>> noDefault() return void
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface
>>>>> is
>>>>>>     >>>>>> defined.
>>>>>>     >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
>>>>>>     >>>>>>>> (defaultBranch(ks->)
>>>>>>     >>>>>>>>> and
>>>>>>     >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to
>>>>>>     miss the
>>>>>>     >>>>>> fact
>>>>>>     >>>>>>>>> that one
>>>>>>     >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these
>>>>>>     methods
>>>>>>     >>>>>> are
>>>>>>     >>>>>>>> not
>>>>>>     >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do
>>>>> better?
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> Ivan
>>>>>>     >>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>>     >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>>     >>>>>>>>>>>>>>>>>>> Paul,
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>> I see your point when you are talking about
>>>>>>     >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
>>>>>>     implemented the
>>>>>>     >>>>>>> easy
>>>>>>     >>>>>>>>> way.
>>>>>>     >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
>>>>> assumes
>>>>>>     >>>>> nothing
>>>>>>     >>>>>>>> will
>>>>>>     >>>>>>>>>>>>> reach
>>>>>>     >>>>>>>>>>>>>>>>>>>> the default branch,
>>>>>>     >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only
>>>>> option
>>>>>>     >>>>>> besides
>>>>>>     >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we
>>>>>>     want to
>>>>>>     >>>>>> just
>>>>>>     >>>>>>>>> silently
>>>>>>     >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
>>>>>>     predicate. 2)
>>>>>>     >>>>>>> Throwing
>>>>>>     >>>>>>>>> an
>>>>>>     >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing
>>>>>>     looks
>>>>>>     >>>>>> like a
>>>>>>     >>>>>>>> bad
>>>>>>     >>>>>>>>>>>>> idea.
>>>>>>     >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to
>>>>>>     emit a
>>>>>>     >>>>>>>> special
>>>>>>     >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly
>>>>> where
>>>>>>     >>>>>>> `default`
>>>>>>     >>>>>>>>> can
>>>>>>     >>>>>>>>>>>>> be
>>>>>>     >>>>>>>>>>>>>>>>>>> used.
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>>     >>>>> InternalTopologyBuilder
>>>>>>     >>>>>>> to
>>>>>>     >>>>>>>>> track
>>>>>>     >>>>>>>>>>>>>>>>>>>> dangling
>>>>>>     >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>>>     a clear
>>>>>>     >>>>>>> error
>>>>>>     >>>>>>>>>>>>> before it
>>>>>>     >>>>>>>>>>>>>>>>>>> becomes an issue.
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
>>>>>>     >>>>> compiled
>>>>>>     >>>>>>> and
>>>>>>     >>>>>>>>> run?
>>>>>>     >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't
>>>>>>     compile if
>>>>>>     >>>>> used
>>>>>>     >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
>>>>>>     method chain
>>>>>>     >>>>>>>> starting
>>>>>>     >>>>>>>>>>>>> from
>>>>>>     >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference
>>>>>>     between
>>>>>>     >>>>>>>> runtime
>>>>>>     >>>>>>>>> and
>>>>>>     >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
>>>>>>     >>>>> instantly
>>>>>>     >>>>>> on
>>>>>>     >>>>>>>>> unit
>>>>>>     >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a
>>>>>>     compilation
>>>>>>     >>>>>>>> failure.
>>>>>>     >>>>>>>>>>>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>> Ivan
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>>     >>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being
>>>>>>     required.
>>>>>>     >>>>>>> But
>>>>>>     >>>>>>>> is
>>>>>>     >>>>>>>>>>>>> that
>>>>>>     >>>>>>>>>>>>>>>>>>>> really
>>>>>>     >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't want a
>>>>>>     >>>>>> defaultBranch
>>>>>>     >>>>>>>>> they
>>>>>>     >>>>>>>>>>>>> can
>>>>>>     >>>>>>>>>>>>>>>>>>>> call
>>>>>>     >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?)
>>>>>>     just as
>>>>>>     >>>>>>>>> easily.  In
>>>>>>     >>>>>>>>>>>>>>>>>>>> fact I
>>>>>>     >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API
>>>>> - a
>>>>>>     >>>>> user
>>>>>>     >>>>>>>> could
>>>>>>     >>>>>>>>>>>>> specify
>>>>>>     >>>>>>>>>>>>>>>>> a
>>>>>>     >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach
>>>>> the
>>>>>>     >>>>>> default
>>>>>>     >>>>>>>>> branch,
>>>>>>     >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>> That
>>>>>>     >>>>> seems
>>>>>>     >>>>>>> like
>>>>>>     >>>>>>>>> an
>>>>>>     >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API,
>>>>>>     which allows
>>>>>>     >>>>>> for
>>>>>>     >>>>>>>> the
>>>>>>     >>>>>>>>>>>>> more
>>>>>>     >>>>>>>>>>>>>>>>>>>> subtle
>>>>>>     >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
>>>>> dropped.
>>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has
>>>>>>     to be
>>>>>>     >>>>>> well
>>>>>>     >>>>>>>>>>>>>>>>>>>> documented, but
>>>>>>     >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>>     >>>>> InternalTopologyBuilder
>>>>>>     >>>>>>> to
>>>>>>     >>>>>>>>> track
>>>>>>     >>>>>>>>>>>>>>>>>>>> dangling
>>>>>>     >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>>>     a clear
>>>>>>     >>>>>>> error
>>>>>>     >>>>>>>>>>>>> before it
>>>>>>     >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that there is
>>>>> a
>>>>>>     >>>>> "build
>>>>>>     >>>>>>>> step"
>>>>>>     >>>>>>>>>>>>> where
>>>>>>     >>>>>>>>>>>>>>>>> the
>>>>>>     >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>>>>     >>>>>> StreamsBuilder.build()
>>>>>>     >>>>>>> is
>>>>>>     >>>>>>>>>>>>> called.
>>>>>>     >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I
>>>>> agree
>>>>>>     >>>>> that
>>>>>>     >>>>>>> it's
>>>>>>     >>>>>>>>>>>>>>>>>>>> critical to
>>>>>>     >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input
>>>>>>     stream.
>>>>>>     >>>>>>> With
>>>>>>     >>>>>>>>> the
>>>>>>     >>>>>>>>>>>>>>>>> fluent
>>>>>>     >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all
>>>>> other
>>>>>>     >>>>>>> operations
>>>>>>     >>>>>>>>> do -
>>>>>>     >>>>>>>>>>>>> if
>>>>>>     >>>>>>>>>>>>>>>>> you
>>>>>>     >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
>>>>> multiple
>>>>>>     >>>>> times,
>>>>>>     >>>>>>> you
>>>>>>     >>>>>>>>> just
>>>>>>     >>>>>>>>>>>>>>>>>>>> need the
>>>>>>     >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many
>>>>>>     operations
>>>>>>     >>>>>> on
>>>>>>     >>>>>>> it
>>>>>>     >>>>>>>>> as
>>>>>>     >>>>>>>>>>>>> you
>>>>>>     >>>>>>>>>>>>>>>>>>>> desire.
>>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>> Best,
>>>>>>     >>>>>>>>>>>>>>>>>>>> Paul
>>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
>>>>>>     >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
>>>>>>     >>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>     >>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not
>>>>>>     always need
>>>>>>     >>>>>> the
>>>>>>     >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
>>>>> operation we
>>>>>>     >>>>> don't
>>>>>>     >>>>>>>> know
>>>>>>     >>>>>>>>>>>>> when to
>>>>>>     >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument,
>>>>>>     so we
>>>>>>     >>>>> can
>>>>>>     >>>>>> do
>>>>>>     >>>>>>>>>>>>> something
>>>>>>     >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
>>>>> special
>>>>>>     >>>>> object
>>>>>>     >>>>>>>>>>>>> construction
>>>>>>     >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods.
>>>>> But
>>>>>>     >>>>> here
>>>>>>     >>>>>> we
>>>>>>     >>>>>>>>> have a
>>>>>>     >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the
>>>>>>     flow,
>>>>>>     >>>>> so
>>>>>>     >>>>>> I
>>>>>>     >>>>>>>>> think
>>>>>>     >>>>>>>>>>>>> this
>>>>>>     >>>>>>>>>>>>>>>>> is
>>>>>>     >>>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this
>>>>>>     API, but I
>>>>>>     >>>>>> find
>>>>>>     >>>>>>>> the
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
>>>>>>     contrasts the
>>>>>>     >>>>>>> fluency
>>>>>>     >>>>>>>>> of
>>>>>>     >>>>>>>>>>>>> other
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd like to
>>>>>>     just call
>>>>>>     >>>>> a
>>>>>>     >>>>>>>>> method on
>>>>>>     >>>>>>>>>>>>> the
>>>>>>     >>>>>>>>>>>>>>>>>>>>> stream
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch
>>>>>>     cases
>>>>>>     >>>>> are
>>>>>>     >>>>>>>>> defined
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> fluently.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase)
>>>>>>     is very
>>>>>>     >>>>>> nice
>>>>>>     >>>>>>>>> and the
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> right
>>>>>>     >>>>>>>>>>>>>>>>>>>>> way
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around
>>>>>>     how we
>>>>>>     >>>>>>> specify
>>>>>>     >>>>>>>>> the
>>>>>>     >>>>>>>>>>>>> source
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> stream.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> Like:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
>>>>> this::handle1)
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
>>>>> this::handle2)
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>           .defaultBranch(this::handleDefault);
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
>>>>>>     >>>>>>>> KStreamBrancher
>>>>>>     >>>>>>>>> or
>>>>>>     >>>>>>>>>>>>>>>>>>>>> something,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
>>>>>>     terminated by
>>>>>>     >>>>>>>>>>>>> defaultBranch()
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> (which
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
>>>>>>     incompatible with
>>>>>>     >>>>> the
>>>>>>     >>>>>>>>> current
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> API, so
>>>>>>     >>>>>>>>>>>>>>>>>>>>> the
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a
>>>>>>     different
>>>>>>     >>>>>> name,
>>>>>>     >>>>>>>> but
>>>>>>     >>>>>>>>> that
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
>>>>>>     >>>>>> something
>>>>>>     >>>>>>>> like
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> branched()
>>>>>>     >>>>>>>>>>>>>>>>>>>>> or
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your
>>>>>>     KIP?  It
>>>>>>     >>>>>> seems
>>>>>>     >>>>>>>>> like it
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> does to
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching
>>>>>>     while also
>>>>>>     >>>>>>>> allowing
>>>>>>     >>>>>>>>> you
>>>>>>     >>>>>>>>>>>>> to
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
>>>>>>     KBranchedStreams
>>>>>>     >>>>>> if
>>>>>>     >>>>>>>>> desired.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
>>>>>>     >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String>
>>>>>>     ks){
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>           ks.filter(....).mapValues(...)
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
>>>>>>     String> ks){
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>           ks.selectKey(...).groupByKey()...
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> ......
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
>>>>>>     this::handleFirstCase)
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
>>>>>>     this::handleSecondCase)
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
>>>>> KafkaStreamsBrancher
>>>>>>     >>>>> takes a
>>>>>>     >>>>>>>>> Consumer
>>>>>>     >>>>>>>>>>>>> as a
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>> second
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the
>>>>>>     example in
>>>>>>     >>>>>> the
>>>>>>     >>>>>>>> KIP
>>>>>>     >>>>>>>>>>>>> shows
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> each
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
>>>>>>     >>>>>>>>> (KafkaStreams#to()
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> in this
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> case).
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would
>>>>> we
>>>>>>     >>>>> handle
>>>>>>     >>>>>>> the
>>>>>>     >>>>>>>>> case
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> where the
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to
>>>>> continue
>>>>>>     >>>>>>>> processing
>>>>>>     >>>>>>>>> and
>>>>>>     >>>>>>>>>>>>> not
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the
>>>>> branched
>>>>>>     >>>>>> stream
>>>>>>     >>>>>>>>>>>>> immediately?
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if
>>>>>>     we had
>>>>>>     >>>>>>>> something
>>>>>>     >>>>>>>>> like
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck
>>>>> <
>>>>>>     >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for
>>>>> KIP-
>>>>>>     >>>>> 418.
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
>>>>> KIP-418.
>>>>>>     >>>>> Please
>>>>>>     >>>>>>>> take
>>>>>>     >>>>>>>>> a
>>>>>>     >>>>>>>>>>>>> look
>>>>>>     >>>>>>>>>>>>>>>>> at
>>>>>>     >>>>>>>>>>>>>>>>>>>>> the
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any
>>>>>>     feedback :)
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>>>     >>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>>>     >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>>>>>     >>>>> https://github.com/apache/kafka/pull/6164
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>     >>>>>>>>>
>>>>>>     >

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to