Matthias: I think that's pretty reasonable from my point of view. Good
suggestion.

On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Interesting discussion.
>
> I am wondering, if we cannot unify the advantage of both approaches:
>
>
>
> KStream#split() -> KBranchedStream
>
> // branch is not easily accessible in current scope
> KBranchedStream#branch(Predicate, Consumer<KStream>)
>   -> KBranchedStream
>
> // assign a name to the branch and
> // return the sub-stream to the current scope later
> //
> // can be simple as `#branch(p, s->s, "name")`
> // or also complex as `#branch(p, s->s.filter(...), "name")`
> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>   -> KBranchedStream
>
> // default branch is not easily accessible
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Cosumer<KStream>)
>   -> Map<String,KStream>
>
> // assign custom name to default-branch
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Function<KStream,KStream>, String)
>   -> Map<String,KStream>
>
> // assign a default name for default
> // return map of all named sub-stream into current scope
> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>   -> Map<String,KStream>
>
> // return map of all names sub-stream into current scope
> KBranchedStream#noDefaultBranch()
>   -> Map<String,KStream>
>
>
>
> Hence, for each sub-stream, the user can pick to add a name and return
> the branch "result" to the calling scope or not. The implementation can
> also check at runtime that all returned names are unique. The returned
> Map can be empty and it's also optional to use the Map.
>
> To me, it seems like a good way to get best of both worlds.
>
> Thoughts?
>
>
>
> -Matthias
>
>
>
>
> On 5/6/19 5:15 PM, John Roesler wrote:
> > Ivan,
> >
> > That's a very good point about the "start" operator in the dynamic case.
> > I had no problem with "split()"; I was just questioning the necessity.
> > Since you've provided a proof of necessity, I'm in favor of the
> > "split()" start operator. Thanks!
> >
> > Separately, I'm interested to see where the present discussion leads.
> > I've written enough Javascript code in my life to be suspicious of
> > nested closures. You have a good point about using method references (or
> > indeed function literals also work). It should be validating that this
> > was also the JS community's first approach to flattening the logic when
> > their nested closure situation got out of hand. Unfortunately, it's
> > replacing nesting with redirection, both of which disrupt code
> > readability (but in different ways for different reasons). In other
> > words, I agree that function references is *the* first-order solution if
> > the nested code does indeed become a problem.
> >
> > However, the history of JS also tells us that function references aren't
> > the end of the story either, and you can see that by observing that
> > there have been two follow-on eras, as they continue trying to cope with
> > the consequences of living in such a callback-heavy language. First, you
> > have Futures/Promises, which essentially let you convert nested code to
> > method-chained code (Observables/FP is a popular variation on this).
> > Most lately, you have async/await, which is an effort to apply language
> > (not just API) syntax to the problem, and offer the "flattest" possible
> > programming style to solve the problem (because you get back to just one
> > code block per functional unit).
> >
> > Stream-processing is a different domain, and Java+KStreams is nowhere
> > near as callback heavy as JS, so I don't think we have to take the JS
> > story for granted, but then again, I think we can derive some valuable
> > lessons by looking sideways to adjacent domains. I'm just bringing this
> > up to inspire further/deeper discussion. At the same time, just like JS,
> > we can afford to take an iterative approach to the problem.
> >
> > Separately again, I'm interested in the post-branch merge (and I'd also
> > add join) problem that Paul brought up. We can clearly punt on it, by
> > terminating the nested branches with sink operators. But is there a DSL
> > way to do it?
> >
> > Thanks again for your driving this,
> > -John
> >
> > On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com
> > <mailto:pgwha...@gmail.com>> wrote:
> >
> >     Ivan, I’ll definitely forfeit my point on the clumsiness of the
> >     branch(predicate, consumer) solution, I don’t see any real drawbacks
> >     for the dynamic case.
> >
> >     IMO the one trade off to consider at this point is the scope
> >     question. I don’t know if I totally agree that “we rarely need them
> >     in the same scope” since merging the branches back together later
> >     seems like a perfectly plausible use case that can be a lot nicer
> >     when the branched streams are in the same scope. That being said,
> >     for the reasons Ivan listed, I think it is overall the better
> >     solution - working around the scope thing is easy enough if you need
> >     to.
> >
> >     > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
> >     <iponoma...@mail.ru.invalid> wrote:
> >     >
> >     > Hello everyone, thank you all for joining the discussion!
> >     >
> >     > Well, I don't think the idea of named branches, be it a
> >     LinkedHashMap (no other Map will do, because order of definition
> >     matters) or `branch` method  taking name and Consumer has more
> >     advantages than drawbacks.
> >     >
> >     > In my opinion, the only real positive outcome from Michael's
> >     proposal is that all the returned branches are in the same scope.
> >     But 1) we rarely need them in the same scope 2) there is a
> >     workaround for the scope problem, described in the KIP.
> >     >
> >     > 'Inlining the complex logic' is not a problem, because we can use
> >     method references instead of lambdas. In real world scenarios you
> >     tend to split the complex logic to methods anyway, so the code is
> >     going to be clean.
> >     >
> >     > The drawbacks are strong. The cohesion between predicates and
> >     handlers is lost. We have to define predicates in one place, and
> >     handlers in another. This opens the door for bugs:
> >     >
> >     > - what if we forget to define a handler for a name? or a name for
> >     a handler?
> >     > - what if we misspell a name?
> >     > - what if we copy-paste and duplicate a name?
> >     >
> >     > What Michael propose would have been totally OK if we had been
> >     writing the API in Lua, Ruby or Python. In those languages the
> >     "dynamic naming" approach would have looked most concise and
> >     beautiful. But in Java we expect all the problems related to
> >     identifiers to be eliminated in compile time.
> >     >
> >     > Do we have to invent duck-typing for the Java API?
> >     >
> >     > And if we do, what advantage are we supposed to get besides having
> >     all the branches in the same scope? Michael, maybe I'm missing your
> >     point?
> >     >
> >     > ---
> >     >
> >     > Earlier in this discussion John Roesler also proposed to do
> >     without "start branching" operator, and later Paul mentioned that in
> >     the case when we have to add a dynamic number of branches, the
> >     current KIP is 'clumsier' compared to Michael's 'Map' solution. Let
> >     me address both comments here.
> >     >
> >     > 1) "Start branching" operator (I think that *split* is a good name
> >     for it indeed) is critical when we need to do a dynamic branching,
> >     see example below.
> >     >
> >     > 2) No, dynamic branching in current KIP is not clumsy at all.
> >     Imagine a real-world scenario when you need one branch per enum
> >     value (say, RecordType). You can have something like this:
> >     >
> >     > /*John:if we had to start with stream.branch(...) here, it would
> >     have been much messier.*/
> >     > KBranchedStream branched = stream.split();
> >     >
> >     > /*Not clumsy at all :-)*/
> >     > for (RecordType recordType : RecordType.values())
> >     >             branched = branched.branch((k, v) -> v.getRecType() ==
> >     recordType,
> >     >                     recordType::processRecords);
> >     >
> >     > Regards,
> >     >
> >     > Ivan
> >     >
> >     >
> >     > 02.05.2019 14:40, Matthias J. Sax пишет:
> >     >> I also agree with Michael's observation about the core problem of
> >     >> current `branch()` implementation.
> >     >>
> >     >> However, I also don't like to pass in a clumsy Map object. My
> >     thinking
> >     >> was more aligned with Paul's proposal to just add a name to each
> >     >> `branch()` statement and return a `Map<String,KStream>`.
> >     >>
> >     >> It makes the code easier to read, and also make the order of
> >     >> `Predicates` (that is essential) easier to grasp.
> >     >>
> >     >>>>>> Map<String, KStream<K, V>> branches = stream.split()
> >     >>>>>>    .branch("branchOne", Predicate<K, V>)
> >     >>>>>>    .branch( "branchTwo", Predicate<K, V>)
> >     >>>>>>    .defaultBranch("defaultBranch");
> >     >> An open question is the case for which no defaultBranch() should
> be
> >     >> specified. Atm, `split()` and `branch()` would return
> >     `BranchedKStream`
> >     >> and the call to `defaultBranch()` that returns the `Map` is
> mandatory
> >     >> (what is not the case atm). Or is this actually not a real
> problem,
> >     >> because users can just ignore the branch returned by
> >     `defaultBranch()`
> >     >> in the result `Map` ?
> >     >>
> >     >>
> >     >> About "inlining": So far, it seems to be a matter of personal
> >     >> preference. I can see arguments for both, but no "killer
> >     argument" yet
> >     >> that clearly make the case for one or the other.
> >     >>
> >     >>
> >     >> -Matthias
> >     >>
> >     >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
> >     >>> Perhaps inlining is the wrong terminology. It doesn’t require
> >     that a lambda with the full downstream topology be defined inline -
> >     it can be a method reference as with Ivan’s original suggestion.
> >     The advantage of putting the predicate and its downstream logic
> >     (Consumer) together in branch() is that they are required to be near
> >     to each other.
> >     >>>
> >     >>> Ultimately the downstream code has to live somewhere, and deep
> >     branch trees will be hard to read regardless.
> >     >>>
> >     >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
> >     <michael.droga...@confluent.io
> >     <mailto:michael.droga...@confluent.io>> wrote:
> >     >>>>
> >     >>>> I'm less enthusiastic about inlining the branch logic with its
> >     downstream
> >     >>>> functionality. Programs that have deep branch trees will
> >     quickly become
> >     >>>> harder to read as a single unit.
> >     >>>>
> >     >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
> >     <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote:
> >     >>>>>
> >     >>>>> Also +1 on the issues/goals as Michael outlined them, I think
> >     that sets a
> >     >>>>> great framework for the discussion.
> >     >>>>>
> >     >>>>> Regarding the SortedMap solution, my understanding is that the
> >     current
> >     >>>>> proposal in the KIP is what is in my PR which (pending naming
> >     decisions) is
> >     >>>>> roughly this:
> >     >>>>>
> >     >>>>> stream.split()
> >     >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >     >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >     >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
> >     >>>>>
> >     >>>>> Obviously some ordering is necessary, since branching as a
> >     construct
> >     >>>>> doesn't work without it, but this solution seems like it
> >     provides as much
> >     >>>>> associativity as the SortedMap solution, because each branch()
> >     call
> >     >>>>> directly associates the "conditional" with the "code block."
> >     The value it
> >     >>>>> provides over the KIP solution is the accessing of streams in
> >     the same
> >     >>>>> scope.
> >     >>>>>
> >     >>>>> The KIP solution is less "dynamic" than the SortedMap solution
> >     in the sense
> >     >>>>> that it is slightly clumsier to add a dynamic number of
> >     branches, but it is
> >     >>>>> certainly possible.  It seems to me like the API should favor
> >     the "static"
> >     >>>>> case anyway, and should make it simple and readable to
> >     fluently declare and
> >     >>>>> access your branches in-line.  It also makes it impossible to
> >     ignore a
> >     >>>>> branch, and it is possible to build an (almost) identical
> >     SortedMap
> >     >>>>> solution on top of it.
> >     >>>>>
> >     >>>>> I could also see a middle ground where instead of a raw
> >     SortedMap being
> >     >>>>> taken in, branch() takes a name and not a Consumer.  Something
> >     like this:
> >     >>>>>
> >     >>>>> Map<String, KStream<K, V>> branches = stream.split()
> >     >>>>>    .branch("branchOne", Predicate<K, V>)
> >     >>>>>    .branch( "branchTwo", Predicate<K, V>)
> >     >>>>>    .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
> >     >>>>>
> >     >>>>> Pros for that solution:
> >     >>>>> - accessing branched KStreams in same scope
> >     >>>>> - no double brace initialization, hopefully slightly more
> >     readable than
> >     >>>>> SortedMap
> >     >>>>>
> >     >>>>> Cons
> >     >>>>> - downstream branch logic cannot be specified inline which
> >     makes it harder
> >     >>>>> to read top to bottom (like existing API and SortedMap, but
> >     unlike the KIP)
> >     >>>>> - you can forget to "handle" one of the branched streams (like
> >     existing
> >     >>>>> API and SortedMap, but unlike the KIP)
> >     >>>>>
> >     >>>>> (KBranchedStreams could even work *both* ways but perhaps
> >     that's overdoing
> >     >>>>> it).
> >     >>>>>
> >     >>>>> Overall I'm curious how important it is to be able to easily
> >     access the
> >     >>>>> branched KStream in the same scope as the original.  It's
> >     possible that it
> >     >>>>> doesn't need to be handled directly by the API, but instead
> >     left up to the
> >     >>>>> user.  I'm sort of in the middle on it.
> >     >>>>>
> >     >>>>> Paul
> >     >>>>>
> >     >>>>>
> >     >>>>>
> >     >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
> >     <sop...@confluent.io <mailto:sop...@confluent.io>>
> >     >>>>> wrote:
> >     >>>>>
> >     >>>>>> I'd like to +1 what Michael said about the issues with the
> >     existing
> >     >>>>> branch
> >     >>>>>> method, I agree with what he's outlined and I think we should
> >     proceed by
> >     >>>>>> trying to alleviate these problems. Specifically it seems
> >     important to be
> >     >>>>>> able to cleanly access the individual branches (eg by mapping
> >     >>>>>> name->stream), which I thought was the original intention of
> >     this KIP.
> >     >>>>>>
> >     >>>>>> That said, I don't think we should so easily give in to the
> >     double brace
> >     >>>>>> anti-pattern or force ours users into it if at all possible to
> >     >>>>> avoid...just
> >     >>>>>> my two cents.
> >     >>>>>>
> >     >>>>>> Cheers,
> >     >>>>>> Sophie
> >     >>>>>>
> >     >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> >     >>>>>> michael.droga...@confluent.io
> >     <mailto:michael.droga...@confluent.io>> wrote:
> >     >>>>>>
> >     >>>>>>> I’d like to propose a different way of thinking about this.
> >     To me,
> >     >>>>> there
> >     >>>>>>> are three problems with the existing branch signature:
> >     >>>>>>>
> >     >>>>>>> 1. If you use it the way most people do, Java raises unsafe
> type
> >     >>>>>> warnings.
> >     >>>>>>> 2. The way in which you use the stream branches is
> >     positionally coupled
> >     >>>>>> to
> >     >>>>>>> the ordering of the conditionals.
> >     >>>>>>> 3. It is brittle to extend existing branch calls with
> >     additional code
> >     >>>>>>> paths.
> >     >>>>>>>
> >     >>>>>>> Using associative constructs instead of relying on ordered
> >     constructs
> >     >>>>>> would
> >     >>>>>>> be a stronger approach. Consider a signature that instead
> >     looks like
> >     >>>>>> this:
> >     >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String,
> >     Predicate<?
> >     >>>>>>> super K,? super V>>);
> >     >>>>>>>
> >     >>>>>>> Branches are given names in a map, and as a result, the API
> >     returns a
> >     >>>>>>> mapping of names to streams. The ordering of the
> conditionals is
> >     >>>>>> maintained
> >     >>>>>>> because it’s a sorted map. Insert order determines the order
> of
> >     >>>>>> evaluation.
> >     >>>>>>> This solves problem 1 because there are no more varargs. It
> >     solves
> >     >>>>>> problem
> >     >>>>>>> 2 because you no longer lean on ordering to access the
> >     branch you’re
> >     >>>>>>> interested in. It solves problem 3 because you can introduce
> >     another
> >     >>>>>>> conditional by simply attaching another name to the
> >     structure, rather
> >     >>>>>> than
> >     >>>>>>> messing with the existing indices.
> >     >>>>>>>
> >     >>>>>>> One of the drawbacks is that creating the map inline is
> >     historically
> >     >>>>>>> awkward in Java. I know it’s an anti-pattern to use
> >     voluminously, but
> >     >>>>>>> double brace initialization would clean up the aesthetics.
> >     >>>>>>>
> >     >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
> >     <j...@confluent.io <mailto:j...@confluent.io>>
> >     >>>>> wrote:
> >     >>>>>>>> Hi Ivan,
> >     >>>>>>>>
> >     >>>>>>>> Thanks for the update.
> >     >>>>>>>>
> >     >>>>>>>> FWIW, I agree with Matthias that the current "start
> branching"
> >     >>>>> operator
> >     >>>>>>> is
> >     >>>>>>>> confusing when named the same way as the actual branches.
> >     "Split"
> >     >>>>> seems
> >     >>>>>>>> like a good name. Alternatively, we can do without a "start
> >     >>>>> branching"
> >     >>>>>>>> operator at all, and just do:
> >     >>>>>>>>
> >     >>>>>>>> stream
> >     >>>>>>>>      .branch(Predicate)
> >     >>>>>>>>      .branch(Predicate)
> >     >>>>>>>>      .defaultBranch();
> >     >>>>>>>>
> >     >>>>>>>> Tentatively, I think that this branching operation should be
> >     >>>>> terminal.
> >     >>>>>>> That
> >     >>>>>>>> way, we don't create ambiguity about how to use it. That
> >     is, `branch`
> >     >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is
> >     `void`, to
> >     >>>>>>>> enforce that it comes last, and that there is only one
> >     definition of
> >     >>>>>> the
> >     >>>>>>>> default branch. Potentially, we should log a warning if
> >     there's no
> >     >>>>>>> default,
> >     >>>>>>>> and additionally log a warning (or throw an exception) if a
> >     record
> >     >>>>>> falls
> >     >>>>>>>> though with no default.
> >     >>>>>>>>
> >     >>>>>>>> Thoughts?
> >     >>>>>>>>
> >     >>>>>>>> Thanks,
> >     >>>>>>>> -John
> >     >>>>>>>>
> >     >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
> >     >>>>> matth...@confluent.io <mailto:matth...@confluent.io>
> >     >>>>>>>> wrote:
> >     >>>>>>>>
> >     >>>>>>>>> Thanks for updating the KIP and your answers.
> >     >>>>>>>>>
> >     >>>>>>>>>
> >     >>>>>>>>>> this is to make the name similar to String#split
> >     >>>>>>>>>>> that also returns an array, right?
> >     >>>>>>>>> The intend was to avoid name duplication. The return type
> >     should
> >     >>>>>> _not_
> >     >>>>>>>>> be an array.
> >     >>>>>>>>>
> >     >>>>>>>>> The current proposal is
> >     >>>>>>>>>
> >     >>>>>>>>> stream.branch()
> >     >>>>>>>>>      .branch(Predicate)
> >     >>>>>>>>>      .branch(Predicate)
> >     >>>>>>>>>      .defaultBranch();
> >     >>>>>>>>>
> >     >>>>>>>>> IMHO, this reads a little odd, because the first
> >     `branch()` does
> >     >>>>> not
> >     >>>>>>>>> take any parameters and has different semantics than the
> later
> >     >>>>>>>>> `branch()` calls. Note, that from the code snippet above,
> it's
> >     >>>>> hidden
> >     >>>>>>>>> that the first call is `KStream#branch()` while the others
> are
> >     >>>>>>>>> `KBranchedStream#branch()` what makes reading the code
> harder.
> >     >>>>>>>>>
> >     >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`,
> >     I though
> >     >>>>>> it
> >     >>>>>>>>> might be better to also rename `KStream#branch()` to avoid
> the
> >     >>>>> naming
> >     >>>>>>>>> overlap that seems to be confusing. The following reads
> much
> >     >>>>> cleaner
> >     >>>>>> to
> >     >>>>>>>> me:
> >     >>>>>>>>> stream.split()
> >     >>>>>>>>>      .branch(Predicate)
> >     >>>>>>>>>      .branch(Predicate)
> >     >>>>>>>>>      .defaultBranch();
> >     >>>>>>>>>
> >     >>>>>>>>> Maybe there is a better alternative to `split()` though to
> >     avoid
> >     >>>>> the
> >     >>>>>>>>> naming overlap.
> >     >>>>>>>>>
> >     >>>>>>>>>
> >     >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately
> we
> >     >>>>> cannot
> >     >>>>>>> have
> >     >>>>>>>>> a method with such name :-)
> >     >>>>>>>>>
> >     >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up
> >     with a
> >     >>>>> short
> >     >>>>>>>> name?
> >     >>>>>>>>>
> >     >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP
> >     with all
> >     >>>>> it's
> >     >>>>>>>>> methods? It will be part of public API and should be
> >     contained in
> >     >>>>> the
> >     >>>>>>>>> KIP. For example, it's unclear atm, what the return type of
> >     >>>>>>>>> `defaultBranch()` is.
> >     >>>>>>>>>
> >     >>>>>>>>>
> >     >>>>>>>>> You did not comment on the idea to add a
> >     `KBranchedStream#get(int
> >     >>>>>>> index)
> >     >>>>>>>>> -> KStream` method to get the individually
> >     branched-KStreams. Would
> >     >>>>>> be
> >     >>>>>>>>> nice to get your feedback about it. It seems you suggest
> >     that users
> >     >>>>>>>>> would need to write custom utility code otherwise, to
> >     access them.
> >     >>>>> We
> >     >>>>>>>>> should discuss the pros and cons of both approaches. It
> feels
> >     >>>>>>>>> "incomplete" to me atm, if the API has no built-in support
> >     to get
> >     >>>>> the
> >     >>>>>>>>> branched-KStreams directly.
> >     >>>>>>>>>
> >     >>>>>>>>>
> >     >>>>>>>>>
> >     >>>>>>>>> -Matthias
> >     >>>>>>>>>
> >     >>>>>>>>>
> >     >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> >     >>>>>>>>>> Hi all!
> >     >>>>>>>>>>
> >     >>>>>>>>>> I have updated the KIP-418 according to the new vision.
> >     >>>>>>>>>>
> >     >>>>>>>>>> Matthias, thanks for your comment!
> >     >>>>>>>>>>
> >     >>>>>>>>>>> Renaming KStream#branch() -> #split()
> >     >>>>>>>>>> I can see your point: this is to make the name similar to
> >     >>>>>>> String#split
> >     >>>>>>>>>> that also returns an array, right? But is it worth the
> >     loss of
> >     >>>>>>>> backwards
> >     >>>>>>>>>> compatibility? We can have overloaded branch() as well
> >     without
> >     >>>>>>>> affecting
> >     >>>>>>>>>> the existing code. Maybe the old array-based `branch`
> method
> >     >>>>> should
> >     >>>>>>> be
> >     >>>>>>>>>> deprecated, but this is a subject for discussion.
> >     >>>>>>>>>>
> >     >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
> >     >>>>> BranchingKStream#branch(),
> >     >>>>>>>>>> KBranchedStream#defaultBranch() ->
> BranchingKStream#default()
> >     >>>>>>>>>>
> >     >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default'
> is,
> >     >>>>>>> however, a
> >     >>>>>>>>>> reserved word, so unfortunately we cannot have a method
> >     with such
> >     >>>>>>> name
> >     >>>>>>>>> :-)
> >     >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument,
> but I
> >     >>>>> think
> >     >>>>>>> that
> >     >>>>>>>>>> is not required?
> >     >>>>>>>>>>
> >     >>>>>>>>>> Absolutely! I think that was just copy-paste error or
> >     something.
> >     >>>>>>>>>>
> >     >>>>>>>>>> Dear colleagues,
> >     >>>>>>>>>>
> >     >>>>>>>>>> please revise the new version of the KIP and Paul's PR
> >     >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
> >     >>>>>>>>>>
> >     >>>>>>>>>> Any new suggestions/objections?
> >     >>>>>>>>>>
> >     >>>>>>>>>> Regards,
> >     >>>>>>>>>>
> >     >>>>>>>>>> Ivan
> >     >>>>>>>>>>
> >     >>>>>>>>>>
> >     >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
> >     >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems
> that
> >     >>>>>>> everybody
> >     >>>>>>>>>>> agrees that the current branch() method using arrays is
> not
> >     >>>>>> optimal.
> >     >>>>>>>>>>> I had a quick look into the PR and I like the overall
> >     proposal.
> >     >>>>>>> There
> >     >>>>>>>>>>> are some minor things we need to consider. I would
> >     recommend the
> >     >>>>>>>>>>> following renaming:
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> KStream#branch() -> #split()
> >     >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
> >     >>>>>>>>>>> KBranchedStream#defaultBranch() ->
> >     BranchingKStream#default()
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> It's just a suggestion to get slightly shorter method
> names.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> In the current PR, defaultBranch() does take an
> >     `Predicate` as
> >     >>>>>>>> argument,
> >     >>>>>>>>>>> but I think that is not required?
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Also, we should consider KIP-307, that was recently
> >     accepted and
> >     >>>>>> is
> >     >>>>>>>>>>> currently implemented:
> >     >>>>>>>>>>>
> >     >>>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> >     >>>>>>>>>>> Ie, we should add overloads that accepted a `Named`
> >     parameter.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> For the issue that the created `KStream` object are in
> >     different
> >     >>>>>>>> scopes:
> >     >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
> >     index)` method
> >     >>>>>>> that
> >     >>>>>>>>>>> returns the corresponding "branched" result `KStream`
> >     object?
> >     >>>>>> Maybe,
> >     >>>>>>>> the
> >     >>>>>>>>>>> second argument of `addBranch()` should not be a
> >     >>>>>> `Consumer<KStream>`
> >     >>>>>>>> but
> >     >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return
> >     whatever
> >     >>>>>> the
> >     >>>>>>>>>>> `Function` returns?
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> Finally, I would also suggest to update the KIP with the
> >     current
> >     >>>>>>>>>>> proposal. That makes it easier to review.
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>> -Matthias
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>
> >     >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
> >     >>>>>>>>>>>> Ivan,
> >     >>>>>>>>>>>>
> >     >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it
> >     makes sense
> >     >>>>>> for
> >     >>>>>>>> you
> >     >>>>>>>>> to
> >     >>>>>>>>>>>> revise the KIP and continue the discussion.  Obviously
> >     we'll
> >     >>>>> need
> >     >>>>>>>> some
> >     >>>>>>>>>>>> buy-in from committers that have actual binding votes on
> >     >>>>> whether
> >     >>>>>>> the
> >     >>>>>>>>> KIP
> >     >>>>>>>>>>>> could be adopted.  It would be great to hear if they
> >     think this
> >     >>>>>> is
> >     >>>>>>> a
> >     >>>>>>>>> good
> >     >>>>>>>>>>>> idea overall.  I'm not sure if that happens just by
> >     starting a
> >     >>>>>>> vote,
> >     >>>>>>>>> or if
> >     >>>>>>>>>>>> there is generally some indication of interest
> beforehand.
> >     >>>>>>>>>>>>
> >     >>>>>>>>>>>> That being said, I'll continue the discussion a bit:
> >     assuming
> >     >>>>> we
> >     >>>>>> do
> >     >>>>>>>>> move
> >     >>>>>>>>>>>> forward the solution of "stream.branch() returns
> >     >>>>>> KBranchedStream",
> >     >>>>>>> do
> >     >>>>>>>>> we
> >     >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"?  I
> would
> >     >>>>> favor
> >     >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs
> that
> >     >>>>>>> accomplish
> >     >>>>>>>>> the
> >     >>>>>>>>>>>> same thing is confusing, especially when they're fairly
> >     similar
> >     >>>>>>>>> anyway.  We
> >     >>>>>>>>>>>> just need to be sure we're not making something
> >     >>>>>>> impossible/difficult
> >     >>>>>>>>> that
> >     >>>>>>>>>>>> is currently possible/easy.
> >     >>>>>>>>>>>>
> >     >>>>>>>>>>>> Regarding my PR - I think the general structure would
> work,
> >     >>>>> it's
> >     >>>>>>>> just a
> >     >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
> >     >>>>>>> particular,
> >     >>>>>>>>>>>> passing in the "predicates" and "children" lists which
> get
> >     >>>>>> modified
> >     >>>>>>>> in
> >     >>>>>>>>>>>> KBranchedStream but read from all the way
> >     KStreamLazyBranch is
> >     >>>>> a
> >     >>>>>>> bit
> >     >>>>>>>>>>>> complicated to follow.
> >     >>>>>>>>>>>>
> >     >>>>>>>>>>>> Thanks,
> >     >>>>>>>>>>>> Paul
> >     >>>>>>>>>>>>
> >     >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
> >     >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >     >>>>>>>>> wrote:
> >     >>>>>>>>>>>>> Hi Paul!
> >     >>>>>>>>>>>>>
> >     >>>>>>>>>>>>> I read your code carefully and now I am fully
> >     convinced: your
> >     >>>>>>>> proposal
> >     >>>>>>>>>>>>> looks better and should work. We just have to document
> the
> >     >>>>>> crucial
> >     >>>>>>>>> fact
> >     >>>>>>>>>>>>> that KStream consumers are invoked as they're added.
> >     And then
> >     >>>>>> it's
> >     >>>>>>>> all
> >     >>>>>>>>>>>>> going to be very nice.
> >     >>>>>>>>>>>>>
> >     >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and
> >     resume the
> >     >>>>>>>>>>>>> discussion here, right?
> >     >>>>>>>>>>>>>
> >     >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
> >     >>>>> starting
> >     >>>>>>>> point
> >     >>>>>>>>> if
> >     >>>>>>>>>>>>> we go in this direction'? To me it looks like a good
> >     starting
> >     >>>>>>> point.
> >     >>>>>>>>> But
> >     >>>>>>>>>>>>> as a novice in this project I might miss some important
> >     >>>>> details.
> >     >>>>>>>>>>>>> Regards,
> >     >>>>>>>>>>>>>
> >     >>>>>>>>>>>>> Ivan
> >     >>>>>>>>>>>>>
> >     >>>>>>>>>>>>>
> >     >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
> >     >>>>>>>>>>>>>> Ivan,
> >     >>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
> >     >>>>> stream.branch()
> >     >>>>>>>>> solution
> >     >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
> >     >>>>> invoked
> >     >>>>>> as
> >     >>>>>>>>> they’re
> >     >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user
> >     still
> >     >>>>>> ought
> >     >>>>>>> to
> >     >>>>>>>>> be
> >     >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and
> >     depend on
> >     >>>>> the
> >     >>>>>>>>> branched
> >     >>>>>>>>>>>>> streams having been set.
> >     >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to
> >     access
> >     >>>>> the
> >     >>>>>>>>> branched
> >     >>>>>>>>>>>>> streams in the same scope as the original stream (that
> >     is, not
> >     >>>>>>>> inside
> >     >>>>>>>>> the
> >     >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
> >     >>>>> solutions.
> >     >>>>>> It
> >     >>>>>>>>> can be
> >     >>>>>>>>>>>>> worked around though.
> >     >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m
> >     excited
> >     >>>>> to
> >     >>>>>>>> hear
> >     >>>>>>>>>>>>> your thoughts!]
> >     >>>>>>>>>>>>>> Paul
> >     >>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
> >     >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >     >>>>>>>>> wrote:
> >     >>>>>>>>>>>>>>> Hi Paul!
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
> >     >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at
> first
> >     >>>>> glance,
> >     >>>>>>> but
> >     >>>>>>>>> ---
> >     >>>>>>>>>>>>>>>> the newly branched streams are not available in the
> >     same
> >     >>>>>> scope
> >     >>>>>>> as
> >     >>>>>>>>> each
> >     >>>>>>>>>>>>> other.  That is, if we wanted to merge them back
> together
> >     >>>>> again
> >     >>>>>> I
> >     >>>>>>>>> don't see
> >     >>>>>>>>>>>>> a way to do that.
> >     >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was
> >     just
> >     >>>>>> going
> >     >>>>>>> to
> >     >>>>>>>>>>>>> write in details about this issue.
> >     >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say
> >     we need
> >     >>>>> to
> >     >>>>>>>>> identify
> >     >>>>>>>>>>>>> customers who have bought coffee and made a purchase
> >     in the
> >     >>>>>>>>> electronics
> >     >>>>>>>>>>>>> store to give them coupons.
> >     >>>>>>>>>>>>>>> This is the code I usually write under these
> >     circumstances
> >     >>>>>> using
> >     >>>>>>>> my
> >     >>>>>>>>>>>>> 'brancher' class:
> >     >>>>>>>>>>>>>>> @Setter
> >     >>>>>>>>>>>>>>> class CouponIssuer{
> >     >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
> >     >>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>   KStream<...> coupons(){
> >     >>>>>>>>>>>>>>>       return
> >     >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
> >     >>>>>>>>>>>>>>>       /*In the real world the code here can be
> >     complex, so
> >     >>>>>>>>> creation of
> >     >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in
> >     order to
> >     >>>>>>>> separate
> >     >>>>>>>>>>>>> classes' responsibilities.*/
> >     >>>>>>>>>>>>>>>  }
> >     >>>>>>>>>>>>>>> }
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
> >     >>>>>>>>>>>>>>>     .branch(predicate1,
> couponIssuer::setCoffePurchases)
> >     >>>>>>>>>>>>>>>     .branch(predicate2,
> >     >>>>>> couponIssuer::setElectronicsPurchases)
> >     >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up
> >     everything
> >     >>>>>>>> later,
> >     >>>>>>>>>>>>> without the terminal operation!!!*/
> >     >>>>>>>>>>>>>>> couponIssuer.coupons()...
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> Does this make sense?  In order to properly
> >     initialize the
> >     >>>>>>>>> CouponIssuer
> >     >>>>>>>>>>>>> we need the terminal operation to be called before
> >     >>>>>>>>> streamsBuilder.build()
> >     >>>>>>>>>>>>> is called.
> >     >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is
> >     essentially
> >     >>>>>> the
> >     >>>>>>>>> next
> >     >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts
> >     based on
> >     >>>>> my
> >     >>>>>>>>> experience,
> >     >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
> >     >>>>>>>>>>>>>>> Regards,
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> Ivan
> >     >>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> >     >>>>>>>>>>>>>>>> Ivan,
> >     >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a
> >     fluent
> >     >>>>> API
> >     >>>>>>>> based
> >     >>>>>>>>>>>>> off of
> >     >>>>>>>>>>>>>>>> KStream here
> >     (https://github.com/apache/kafka/pull/6512),
> >     >>>>>> and
> >     >>>>>>> I
> >     >>>>>>>>> think
> >     >>>>>>>>>>>>> I
> >     >>>>>>>>>>>>>>>> succeeded at removing both cons.
> >     >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect earlier about
> >     >>>>>>> compatibility
> >     >>>>>>>>>>>>> issues,
> >     >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was unaware
> >     that Java
> >     >>>>> is
> >     >>>>>>>> smart
> >     >>>>>>>>>>>>> enough to
> >     >>>>>>>>>>>>>>>>    distinguish between a branch(varargs...)
> >     returning one
> >     >>>>>>> thing
> >     >>>>>>>>> and
> >     >>>>>>>>>>>>> branch()
> >     >>>>>>>>>>>>>>>>    with no arguments returning another thing.
> >     >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't actually
> >     need
> >     >>>>> it.
> >     >>>>>>> We
> >     >>>>>>>>> can
> >     >>>>>>>>>>>>> just
> >     >>>>>>>>>>>>>>>>    build up the branches in the KBranchedStream who
> >     shares
> >     >>>>>> its
> >     >>>>>>>>> state
> >     >>>>>>>>>>>>> with the
> >     >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do the
> >     branching.
> >     >>>>>>> It's
> >     >>>>>>>>> not
> >     >>>>>>>>>>>>> terribly
> >     >>>>>>>>>>>>>>>>    pretty in its current form, but I think it
> >     demonstrates
> >     >>>>>> its
> >     >>>>>>>>>>>>> feasibility.
> >     >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should
> be
> >     >>>>> final
> >     >>>>>> or
> >     >>>>>>>>> even a
> >     >>>>>>>>>>>>>>>> starting point if we go in this direction, I just
> >     wanted to
> >     >>>>>> see
> >     >>>>>>>> how
> >     >>>>>>>>>>>>>>>> challenging it would be to get the API working.
> >     >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing
> >     solution
> >     >>>>>>> could
> >     >>>>>>>> be
> >     >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
> >     >>>>> suggested
> >     >>>>>>>> was a
> >     >>>>>>>>>>>>>>>> possibility.  The reason is that the newly branched
> >     streams
> >     >>>>>> are
> >     >>>>>>>> not
> >     >>>>>>>>>>>>>>>> available in the same scope as each other.  That
> >     is, if we
> >     >>>>>>> wanted
> >     >>>>>>>>> to
> >     >>>>>>>>>>>>> merge
> >     >>>>>>>>>>>>>>>> them back together again I don't see a way to do
> >     that.  The
> >     >>>>>> KIP
> >     >>>>>>>>>>>>> proposal
> >     >>>>>>>>>>>>>>>> has the same issue, though - all this means is that
> for
> >     >>>>>> either
> >     >>>>>>>>>>>>> solution,
> >     >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the
> >     table.
> >     >>>>>>>>>>>>>>>> Thanks,
> >     >>>>>>>>>>>>>>>> Paul
> >     >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
> >     >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>>
> >     >>>>>>>>>>>>> wrote:
> >     >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to
> this
> >     >>>>>> point.
> >     >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that
> >     branch API
> >     >>>>>>> needs
> >     >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> There are two potential ways to do it:
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
> >     >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
> >     >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
> >     >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
> >     >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...).... //onTopOf
> >     returns
> >     >>>>>> its
> >     >>>>>>>>> argument
> >     >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code
> won't
> >     >>>>> make
> >     >>>>>>>> sense
> >     >>>>>>>>>>>>> until
> >     >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher
> >     instance
> >     >>>>>>>>> contrasts the
> >     >>>>>>>>>>>>>>>>> fluency of other KStream methods.
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> stream
> >     >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
> >     >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
> >     >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or noDefault(). Both
> >     >>>>>>>>> defaultBranch(..)
> >     >>>>>>>>>>>>> and
> >     >>>>>>>>>>>>>>>>> noDefault() return void
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface
> is
> >     >>>>>> defined.
> >     >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
> >     >>>>>>>> (defaultBranch(ks->)
> >     >>>>>>>>> and
> >     >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to
> >     miss the
> >     >>>>>> fact
> >     >>>>>>>>> that one
> >     >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these
> >     methods
> >     >>>>>> are
> >     >>>>>>>> not
> >     >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do
> better?
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> Regards,
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> Ivan
> >     >>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >     >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> >     >>>>>>>>>>>>>>>>>>> Paul,
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>> I see your point when you are talking about
> >     >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
> >     implemented the
> >     >>>>>>> easy
> >     >>>>>>>>> way.
> >     >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
> assumes
> >     >>>>> nothing
> >     >>>>>>>> will
> >     >>>>>>>>>>>>> reach
> >     >>>>>>>>>>>>>>>>>>>> the default branch,
> >     >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only
> option
> >     >>>>>> besides
> >     >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we
> >     want to
> >     >>>>>> just
> >     >>>>>>>>> silently
> >     >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
> >     predicate. 2)
> >     >>>>>>> Throwing
> >     >>>>>>>>> an
> >     >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing
> >     looks
> >     >>>>>> like a
> >     >>>>>>>> bad
> >     >>>>>>>>>>>>> idea.
> >     >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to
> >     emit a
> >     >>>>>>>> special
> >     >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly
> where
> >     >>>>>>> `default`
> >     >>>>>>>>> can
> >     >>>>>>>>>>>>> be
> >     >>>>>>>>>>>>>>>>>>> used.
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >     >>>>> InternalTopologyBuilder
> >     >>>>>>> to
> >     >>>>>>>>> track
> >     >>>>>>>>>>>>>>>>>>>> dangling
> >     >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
> >     a clear
> >     >>>>>>> error
> >     >>>>>>>>>>>>> before it
> >     >>>>>>>>>>>>>>>>>>> becomes an issue.
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
> >     >>>>> compiled
> >     >>>>>>> and
> >     >>>>>>>>> run?
> >     >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't
> >     compile if
> >     >>>>> used
> >     >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
> >     method chain
> >     >>>>>>>> starting
> >     >>>>>>>>>>>>> from
> >     >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference
> >     between
> >     >>>>>>>> runtime
> >     >>>>>>>>> and
> >     >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
> >     >>>>> instantly
> >     >>>>>> on
> >     >>>>>>>>> unit
> >     >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a
> >     compilation
> >     >>>>>>>> failure.
> >     >>>>>>>>>>>>>>>>>>> Regards,
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>> Ivan
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> >     >>>>>>>>>>>>>>>>>>>> Ivan,
> >     >>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being
> >     required.
> >     >>>>>>> But
> >     >>>>>>>> is
> >     >>>>>>>>>>>>> that
> >     >>>>>>>>>>>>>>>>>>>> really
> >     >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't want a
> >     >>>>>> defaultBranch
> >     >>>>>>>>> they
> >     >>>>>>>>>>>>> can
> >     >>>>>>>>>>>>>>>>>>>> call
> >     >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?)
> >     just as
> >     >>>>>>>>> easily.  In
> >     >>>>>>>>>>>>>>>>>>>> fact I
> >     >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API
> - a
> >     >>>>> user
> >     >>>>>>>> could
> >     >>>>>>>>>>>>> specify
> >     >>>>>>>>>>>>>>>>> a
> >     >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach
> the
> >     >>>>>> default
> >     >>>>>>>>> branch,
> >     >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
> That
> >     >>>>> seems
> >     >>>>>>> like
> >     >>>>>>>>> an
> >     >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API,
> >     which allows
> >     >>>>>> for
> >     >>>>>>>> the
> >     >>>>>>>>>>>>> more
> >     >>>>>>>>>>>>>>>>>>>> subtle
> >     >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
> dropped.
> >     >>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has
> >     to be
> >     >>>>>> well
> >     >>>>>>>>>>>>>>>>>>>> documented, but
> >     >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >     >>>>> InternalTopologyBuilder
> >     >>>>>>> to
> >     >>>>>>>>> track
> >     >>>>>>>>>>>>>>>>>>>> dangling
> >     >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
> >     a clear
> >     >>>>>>> error
> >     >>>>>>>>>>>>> before it
> >     >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that there is
> a
> >     >>>>> "build
> >     >>>>>>>> step"
> >     >>>>>>>>>>>>> where
> >     >>>>>>>>>>>>>>>>> the
> >     >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
> >     >>>>>> StreamsBuilder.build()
> >     >>>>>>> is
> >     >>>>>>>>>>>>> called.
> >     >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I
> agree
> >     >>>>> that
> >     >>>>>>> it's
> >     >>>>>>>>>>>>>>>>>>>> critical to
> >     >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input
> >     stream.
> >     >>>>>>> With
> >     >>>>>>>>> the
> >     >>>>>>>>>>>>>>>>> fluent
> >     >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all
> other
> >     >>>>>>> operations
> >     >>>>>>>>> do -
> >     >>>>>>>>>>>>> if
> >     >>>>>>>>>>>>>>>>> you
> >     >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
> multiple
> >     >>>>> times,
> >     >>>>>>> you
> >     >>>>>>>>> just
> >     >>>>>>>>>>>>>>>>>>>> need the
> >     >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many
> >     operations
> >     >>>>>> on
> >     >>>>>>> it
> >     >>>>>>>>> as
> >     >>>>>>>>>>>>> you
> >     >>>>>>>>>>>>>>>>>>>> desire.
> >     >>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>> Thoughts?
> >     >>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>> Best,
> >     >>>>>>>>>>>>>>>>>>>> Paul
> >     >>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
> >     >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
> >     >>>>>>>>>>>>>>>>>>>> wrote:
> >     >>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>> Hello Paul,
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not
> >     always need
> >     >>>>>> the
> >     >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
> operation we
> >     >>>>> don't
> >     >>>>>>>> know
> >     >>>>>>>>>>>>> when to
> >     >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument,
> >     so we
> >     >>>>> can
> >     >>>>>> do
> >     >>>>>>>>>>>>> something
> >     >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
> special
> >     >>>>> object
> >     >>>>>>>>>>>>> construction
> >     >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods.
> But
> >     >>>>> here
> >     >>>>>> we
> >     >>>>>>>>> have a
> >     >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the
> >     flow,
> >     >>>>> so
> >     >>>>>> I
> >     >>>>>>>>> think
> >     >>>>>>>>>>>>> this
> >     >>>>>>>>>>>>>>>>> is
> >     >>>>>>>>>>>>>>>>>>>>> still idiomatic.
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>> Regards,
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>> Ivan
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> >     >>>>>>>>>>>>>>>>>>>>>> Ivan,
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this
> >     API, but I
> >     >>>>>> find
> >     >>>>>>>> the
> >     >>>>>>>>>>>>>>>>>>>>>> onTopOff()
> >     >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
> >     contrasts the
> >     >>>>>>> fluency
> >     >>>>>>>>> of
> >     >>>>>>>>>>>>> other
> >     >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd like to
> >     just call
> >     >>>>> a
> >     >>>>>>>>> method on
> >     >>>>>>>>>>>>> the
> >     >>>>>>>>>>>>>>>>>>>>> stream
> >     >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch
> >     cases
> >     >>>>> are
> >     >>>>>>>>> defined
> >     >>>>>>>>>>>>>>>>>>>>>> fluently.
> >     >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase)
> >     is very
> >     >>>>>> nice
> >     >>>>>>>>> and the
> >     >>>>>>>>>>>>>>>>>>>>>> right
> >     >>>>>>>>>>>>>>>>>>>>> way
> >     >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around
> >     how we
> >     >>>>>>> specify
> >     >>>>>>>>> the
> >     >>>>>>>>>>>>> source
> >     >>>>>>>>>>>>>>>>>>>>>> stream.
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>> Like:
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>> stream.branch()
> >     >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
> this::handle1)
> >     >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
> this::handle2)
> >     >>>>>>>>>>>>>>>>>>>>>>           .defaultBranch(this::handleDefault);
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
> >     >>>>>>>> KStreamBrancher
> >     >>>>>>>>> or
> >     >>>>>>>>>>>>>>>>>>>>> something,
> >     >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
> >     terminated by
> >     >>>>>>>>>>>>> defaultBranch()
> >     >>>>>>>>>>>>>>>>>>>>>> (which
> >     >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
> >     incompatible with
> >     >>>>> the
> >     >>>>>>>>> current
> >     >>>>>>>>>>>>>>>>>>>>>> API, so
> >     >>>>>>>>>>>>>>>>>>>>> the
> >     >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a
> >     different
> >     >>>>>> name,
> >     >>>>>>>> but
> >     >>>>>>>>> that
> >     >>>>>>>>>>>>>>>>>>>>>> seems
> >     >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
> >     >>>>>> something
> >     >>>>>>>> like
> >     >>>>>>>>>>>>>>>>>>>>>> branched()
> >     >>>>>>>>>>>>>>>>>>>>> or
> >     >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your
> >     KIP?  It
> >     >>>>>> seems
> >     >>>>>>>>> like it
> >     >>>>>>>>>>>>>>>>>>>>>> does to
> >     >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching
> >     while also
> >     >>>>>>>> allowing
> >     >>>>>>>>> you
> >     >>>>>>>>>>>>> to
> >     >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
> >     KBranchedStreams
> >     >>>>>> if
> >     >>>>>>>>> desired.
> >     >>>>>>>>>>>>>>>>>>>>>> Thanks,
> >     >>>>>>>>>>>>>>>>>>>>>> Paul
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> >     >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> >     >>>>>>>>>>>>>>>>>>>>>> wrote:
> >     >>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String>
> >     ks){
> >     >>>>>>>>>>>>>>>>>>>>>>>           ks.filter(....).mapValues(...)
> >     >>>>>>>>>>>>>>>>>>>>>>> }
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
> >     String> ks){
> >     >>>>>>>>>>>>>>>>>>>>>>>           ks.selectKey(...).groupByKey()...
> >     >>>>>>>>>>>>>>>>>>>>>>> }
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> ......
> >     >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
> >     >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
> >     this::handleFirstCase)
> >     >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
> >     this::handleSecondCase)
> >     >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> Regards,
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> Ivan
> >     >>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> >     >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
> KafkaStreamsBrancher
> >     >>>>> takes a
> >     >>>>>>>>> Consumer
> >     >>>>>>>>>>>>> as a
> >     >>>>>>>>>>>>>>>>>>>>>>> second
> >     >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the
> >     example in
> >     >>>>>> the
> >     >>>>>>>> KIP
> >     >>>>>>>>>>>>> shows
> >     >>>>>>>>>>>>>>>>>>>>>>>> each
> >     >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
> >     >>>>>>>>> (KafkaStreams#to()
> >     >>>>>>>>>>>>>>>>>>>>>>>> in this
> >     >>>>>>>>>>>>>>>>>>>>>>>> case).
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would
> we
> >     >>>>> handle
> >     >>>>>>> the
> >     >>>>>>>>> case
> >     >>>>>>>>>>>>>>>>>>>>>>>> where the
> >     >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to
> continue
> >     >>>>>>>> processing
> >     >>>>>>>>> and
> >     >>>>>>>>>>>>> not
> >     >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the
> branched
> >     >>>>>> stream
> >     >>>>>>>>>>>>> immediately?
> >     >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if
> >     we had
> >     >>>>>>>> something
> >     >>>>>>>>> like
> >     >>>>>>>>>>>>>>>>>>>>>>>> this:
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
> >     >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
> >     >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
> >     >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
> >     >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
> >     >>>>>>>>>>>>>>>>>>>>>>>> Bill
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck
> <
> >     >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
> >     >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >     >>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>> All,
> >     >>>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for
> KIP-
> >     >>>>> 418.
> >     >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
> >     >>>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
> >     >>>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
> KIP-418.
> >     >>>>> Please
> >     >>>>>>>> take
> >     >>>>>>>>> a
> >     >>>>>>>>>>>>> look
> >     >>>>>>>>>>>>>>>>> at
> >     >>>>>>>>>>>>>>>>>>>>> the
> >     >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any
> >     feedback :)
> >     >>>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
> >     >>>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >     >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> >     >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
> >     >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
> >     >>>>> https://github.com/apache/kafka/pull/6164
> >     >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >     >>>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
> >     >>>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>>>>>>>>>>>>>>>>>
> >     >>>>>>>>>
> >     >
> >
>

Reply via email to