Ivan,

That's a very good point about the "start" operator in the dynamic case. I
had no problem with "split()"; I was just questioning the necessity. Since
you've provided a proof of necessity, I'm in favor of the "split()" start
operator. Thanks!

Separately, I'm interested to see where the present discussion leads. I've
written enough Javascript code in my life to be suspicious of nested
closures. You have a good point about using method references (or indeed
function literals also work). It should be validating that this was also
the JS community's first approach to flattening the logic when their nested
closure situation got out of hand. Unfortunately, it's replacing nesting
with redirection, both of which disrupt code readability (but in different
ways for different reasons). In other words, I agree that function
references is *the* first-order solution if the nested code does indeed
become a problem.

However, the history of JS also tells us that function references aren't
the end of the story either, and you can see that by observing that there
have been two follow-on eras, as they continue trying to cope with the
consequences of living in such a callback-heavy language. First, you have
Futures/Promises, which essentially let you convert nested code to
method-chained code (Observables/FP is a popular variation on this). Most
lately, you have async/await, which is an effort to apply language (not
just API) syntax to the problem, and offer the "flattest" possible
programming style to solve the problem (because you get back to just one
code block per functional unit).

Stream-processing is a different domain, and Java+KStreams is nowhere near
as callback heavy as JS, so I don't think we have to take the JS story for
granted, but then again, I think we can derive some valuable lessons by
looking sideways to adjacent domains. I'm just bringing this up to inspire
further/deeper discussion. At the same time, just like JS, we can afford to
take an iterative approach to the problem.

Separately again, I'm interested in the post-branch merge (and I'd also add
join) problem that Paul brought up. We can clearly punt on it, by
terminating the nested branches with sink operators. But is there a DSL way
to do it?

Thanks again for your driving this,
-John

On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com> wrote:

> Ivan, I’ll definitely forfeit my point on the clumsiness of the
> branch(predicate, consumer) solution, I don’t see any real drawbacks for
> the dynamic case.
>
> IMO the one trade off to consider at this point is the scope question. I
> don’t know if I totally agree that “we rarely need them in the same scope”
> since merging the branches back together later seems like a perfectly
> plausible use case that can be a lot nicer when the branched streams are in
> the same scope. That being said, for the reasons Ivan listed, I think it is
> overall the better solution - working around the scope thing is easy enough
> if you need to.
>
> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev <iponoma...@mail.ru.invalid>
> wrote:
> >
> > Hello everyone, thank you all for joining the discussion!
> >
> > Well, I don't think the idea of named branches, be it a LinkedHashMap
> (no other Map will do, because order of definition matters) or `branch`
> method  taking name and Consumer has more advantages than drawbacks.
> >
> > In my opinion, the only real positive outcome from Michael's proposal is
> that all the returned branches are in the same scope. But 1) we rarely need
> them in the same scope 2) there is a workaround for the scope problem,
> described in the KIP.
> >
> > 'Inlining the complex logic' is not a problem, because we can use method
> references instead of lambdas. In real world scenarios you tend to split
> the complex logic to methods anyway, so the code is going to be clean.
> >
> > The drawbacks are strong. The cohesion between predicates and handlers
> is lost. We have to define predicates in one place, and handlers in
> another. This opens the door for bugs:
> >
> > - what if we forget to define a handler for a name? or a name for a
> handler?
> > - what if we misspell a name?
> > - what if we copy-paste and duplicate a name?
> >
> > What Michael propose would have been totally OK if we had been writing
> the API in Lua, Ruby or Python. In those languages the "dynamic naming"
> approach would have looked most concise and beautiful. But in Java we
> expect all the problems related to identifiers to be eliminated in compile
> time.
> >
> > Do we have to invent duck-typing for the Java API?
> >
> > And if we do, what advantage are we supposed to get besides having all
> the branches in the same scope? Michael, maybe I'm missing your point?
> >
> > ---
> >
> > Earlier in this discussion John Roesler also proposed to do without
> "start branching" operator, and later Paul mentioned that in the case when
> we have to add a dynamic number of branches, the current KIP is 'clumsier'
> compared to Michael's 'Map' solution. Let me address both comments here.
> >
> > 1) "Start branching" operator (I think that *split* is a good name for
> it indeed) is critical when we need to do a dynamic branching, see example
> below.
> >
> > 2) No, dynamic branching in current KIP is not clumsy at all. Imagine a
> real-world scenario when you need one branch per enum value (say,
> RecordType). You can have something like this:
> >
> > /*John:if we had to start with stream.branch(...) here, it would have
> been much messier.*/
> > KBranchedStream branched = stream.split();
> >
> > /*Not clumsy at all :-)*/
> > for (RecordType recordType : RecordType.values())
> >             branched = branched.branch((k, v) -> v.getRecType() ==
> recordType,
> >                     recordType::processRecords);
> >
> > Regards,
> >
> > Ivan
> >
> >
> > 02.05.2019 14:40, Matthias J. Sax пишет:
> >> I also agree with Michael's observation about the core problem of
> >> current `branch()` implementation.
> >>
> >> However, I also don't like to pass in a clumsy Map object. My thinking
> >> was more aligned with Paul's proposal to just add a name to each
> >> `branch()` statement and return a `Map<String,KStream>`.
> >>
> >> It makes the code easier to read, and also make the order of
> >> `Predicates` (that is essential) easier to grasp.
> >>
> >>>>>> Map<String, KStream<K, V>> branches = stream.split()
> >>>>>>    .branch("branchOne", Predicate<K, V>)
> >>>>>>    .branch( "branchTwo", Predicate<K, V>)
> >>>>>>    .defaultBranch("defaultBranch");
> >> An open question is the case for which no defaultBranch() should be
> >> specified. Atm, `split()` and `branch()` would return `BranchedKStream`
> >> and the call to `defaultBranch()` that returns the `Map` is mandatory
> >> (what is not the case atm). Or is this actually not a real problem,
> >> because users can just ignore the branch returned by `defaultBranch()`
> >> in the result `Map` ?
> >>
> >>
> >> About "inlining": So far, it seems to be a matter of personal
> >> preference. I can see arguments for both, but no "killer argument" yet
> >> that clearly make the case for one or the other.
> >>
> >>
> >> -Matthias
> >>
> >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
> >>> Perhaps inlining is the wrong terminology. It doesn’t require that a
> lambda with the full downstream topology be defined inline - it can be a
> method reference as with Ivan’s original suggestion.  The advantage of
> putting the predicate and its downstream logic (Consumer) together in
> branch() is that they are required to be near to each other.
> >>>
> >>> Ultimately the downstream code has to live somewhere, and deep branch
> trees will be hard to read regardless.
> >>>
> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis <
> michael.droga...@confluent.io> wrote:
> >>>>
> >>>> I'm less enthusiastic about inlining the branch logic with its
> downstream
> >>>> functionality. Programs that have deep branch trees will quickly
> become
> >>>> harder to read as a single unit.
> >>>>
> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com>
> wrote:
> >>>>>
> >>>>> Also +1 on the issues/goals as Michael outlined them, I think that
> sets a
> >>>>> great framework for the discussion.
> >>>>>
> >>>>> Regarding the SortedMap solution, my understanding is that the
> current
> >>>>> proposal in the KIP is what is in my PR which (pending naming
> decisions) is
> >>>>> roughly this:
> >>>>>
> >>>>> stream.split()
> >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
> >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
> >>>>>
> >>>>> Obviously some ordering is necessary, since branching as a construct
> >>>>> doesn't work without it, but this solution seems like it provides as
> much
> >>>>> associativity as the SortedMap solution, because each branch() call
> >>>>> directly associates the "conditional" with the "code block."  The
> value it
> >>>>> provides over the KIP solution is the accessing of streams in the
> same
> >>>>> scope.
> >>>>>
> >>>>> The KIP solution is less "dynamic" than the SortedMap solution in
> the sense
> >>>>> that it is slightly clumsier to add a dynamic number of branches,
> but it is
> >>>>> certainly possible.  It seems to me like the API should favor the
> "static"
> >>>>> case anyway, and should make it simple and readable to fluently
> declare and
> >>>>> access your branches in-line.  It also makes it impossible to ignore
> a
> >>>>> branch, and it is possible to build an (almost) identical SortedMap
> >>>>> solution on top of it.
> >>>>>
> >>>>> I could also see a middle ground where instead of a raw SortedMap
> being
> >>>>> taken in, branch() takes a name and not a Consumer.  Something like
> this:
> >>>>>
> >>>>> Map<String, KStream<K, V>> branches = stream.split()
> >>>>>    .branch("branchOne", Predicate<K, V>)
> >>>>>    .branch( "branchTwo", Predicate<K, V>)
> >>>>>    .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
> >>>>>
> >>>>> Pros for that solution:
> >>>>> - accessing branched KStreams in same scope
> >>>>> - no double brace initialization, hopefully slightly more readable
> than
> >>>>> SortedMap
> >>>>>
> >>>>> Cons
> >>>>> - downstream branch logic cannot be specified inline which makes it
> harder
> >>>>> to read top to bottom (like existing API and SortedMap, but unlike
> the KIP)
> >>>>> - you can forget to "handle" one of the branched streams (like
> existing
> >>>>> API and SortedMap, but unlike the KIP)
> >>>>>
> >>>>> (KBranchedStreams could even work *both* ways but perhaps that's
> overdoing
> >>>>> it).
> >>>>>
> >>>>> Overall I'm curious how important it is to be able to easily access
> the
> >>>>> branched KStream in the same scope as the original.  It's possible
> that it
> >>>>> doesn't need to be handled directly by the API, but instead left up
> to the
> >>>>> user.  I'm sort of in the middle on it.
> >>>>>
> >>>>> Paul
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> I'd like to +1 what Michael said about the issues with the existing
> >>>>> branch
> >>>>>> method, I agree with what he's outlined and I think we should
> proceed by
> >>>>>> trying to alleviate these problems. Specifically it seems important
> to be
> >>>>>> able to cleanly access the individual branches (eg by mapping
> >>>>>> name->stream), which I thought was the original intention of this
> KIP.
> >>>>>>
> >>>>>> That said, I don't think we should so easily give in to the double
> brace
> >>>>>> anti-pattern or force ours users into it if at all possible to
> >>>>> avoid...just
> >>>>>> my two cents.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Sophie
> >>>>>>
> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> >>>>>> michael.droga...@confluent.io> wrote:
> >>>>>>
> >>>>>>> I’d like to propose a different way of thinking about this. To me,
> >>>>> there
> >>>>>>> are three problems with the existing branch signature:
> >>>>>>>
> >>>>>>> 1. If you use it the way most people do, Java raises unsafe type
> >>>>>> warnings.
> >>>>>>> 2. The way in which you use the stream branches is positionally
> coupled
> >>>>>> to
> >>>>>>> the ordering of the conditionals.
> >>>>>>> 3. It is brittle to extend existing branch calls with additional
> code
> >>>>>>> paths.
> >>>>>>>
> >>>>>>> Using associative constructs instead of relying on ordered
> constructs
> >>>>>> would
> >>>>>>> be a stronger approach. Consider a signature that instead looks
> like
> >>>>>> this:
> >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String,
> Predicate<?
> >>>>>>> super K,? super V>>);
> >>>>>>>
> >>>>>>> Branches are given names in a map, and as a result, the API
> returns a
> >>>>>>> mapping of names to streams. The ordering of the conditionals is
> >>>>>> maintained
> >>>>>>> because it’s a sorted map. Insert order determines the order of
> >>>>>> evaluation.
> >>>>>>> This solves problem 1 because there are no more varargs. It solves
> >>>>>> problem
> >>>>>>> 2 because you no longer lean on ordering to access the branch
> you’re
> >>>>>>> interested in. It solves problem 3 because you can introduce
> another
> >>>>>>> conditional by simply attaching another name to the structure,
> rather
> >>>>>> than
> >>>>>>> messing with the existing indices.
> >>>>>>>
> >>>>>>> One of the drawbacks is that creating the map inline is
> historically
> >>>>>>> awkward in Java. I know it’s an anti-pattern to use voluminously,
> but
> >>>>>>> double brace initialization would clean up the aesthetics.
> >>>>>>>
> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io>
> >>>>> wrote:
> >>>>>>>> Hi Ivan,
> >>>>>>>>
> >>>>>>>> Thanks for the update.
> >>>>>>>>
> >>>>>>>> FWIW, I agree with Matthias that the current "start branching"
> >>>>> operator
> >>>>>>> is
> >>>>>>>> confusing when named the same way as the actual branches. "Split"
> >>>>> seems
> >>>>>>>> like a good name. Alternatively, we can do without a "start
> >>>>> branching"
> >>>>>>>> operator at all, and just do:
> >>>>>>>>
> >>>>>>>> stream
> >>>>>>>>      .branch(Predicate)
> >>>>>>>>      .branch(Predicate)
> >>>>>>>>      .defaultBranch();
> >>>>>>>>
> >>>>>>>> Tentatively, I think that this branching operation should be
> >>>>> terminal.
> >>>>>>> That
> >>>>>>>> way, we don't create ambiguity about how to use it. That is,
> `branch`
> >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is `void`,
> to
> >>>>>>>> enforce that it comes last, and that there is only one definition
> of
> >>>>>> the
> >>>>>>>> default branch. Potentially, we should log a warning if there's no
> >>>>>>> default,
> >>>>>>>> and additionally log a warning (or throw an exception) if a record
> >>>>>> falls
> >>>>>>>> though with no default.
> >>>>>>>>
> >>>>>>>> Thoughts?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
> >>>>> matth...@confluent.io
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for updating the KIP and your answers.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> this is to make the name similar to String#split
> >>>>>>>>>>> that also returns an array, right?
> >>>>>>>>> The intend was to avoid name duplication. The return type should
> >>>>>> _not_
> >>>>>>>>> be an array.
> >>>>>>>>>
> >>>>>>>>> The current proposal is
> >>>>>>>>>
> >>>>>>>>> stream.branch()
> >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>      .defaultBranch();
> >>>>>>>>>
> >>>>>>>>> IMHO, this reads a little odd, because the first `branch()` does
> >>>>> not
> >>>>>>>>> take any parameters and has different semantics than the later
> >>>>>>>>> `branch()` calls. Note, that from the code snippet above, it's
> >>>>> hidden
> >>>>>>>>> that the first call is `KStream#branch()` while the others are
> >>>>>>>>> `KBranchedStream#branch()` what makes reading the code harder.
> >>>>>>>>>
> >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`, I
> though
> >>>>>> it
> >>>>>>>>> might be better to also rename `KStream#branch()` to avoid the
> >>>>> naming
> >>>>>>>>> overlap that seems to be confusing. The following reads much
> >>>>> cleaner
> >>>>>> to
> >>>>>>>> me:
> >>>>>>>>> stream.split()
> >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>      .branch(Predicate)
> >>>>>>>>>      .defaultBranch();
> >>>>>>>>>
> >>>>>>>>> Maybe there is a better alternative to `split()` though to avoid
> >>>>> the
> >>>>>>>>> naming overlap.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately we
> >>>>> cannot
> >>>>>>> have
> >>>>>>>>> a method with such name :-)
> >>>>>>>>>
> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up with a
> >>>>> short
> >>>>>>>> name?
> >>>>>>>>>
> >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP with all
> >>>>> it's
> >>>>>>>>> methods? It will be part of public API and should be contained in
> >>>>> the
> >>>>>>>>> KIP. For example, it's unclear atm, what the return type of
> >>>>>>>>> `defaultBranch()` is.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> You did not comment on the idea to add a `KBranchedStream#get(int
> >>>>>>> index)
> >>>>>>>>> -> KStream` method to get the individually branched-KStreams.
> Would
> >>>>>> be
> >>>>>>>>> nice to get your feedback about it. It seems you suggest that
> users
> >>>>>>>>> would need to write custom utility code otherwise, to access
> them.
> >>>>> We
> >>>>>>>>> should discuss the pros and cons of both approaches. It feels
> >>>>>>>>> "incomplete" to me atm, if the API has no built-in support to get
> >>>>> the
> >>>>>>>>> branched-KStreams directly.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> >>>>>>>>>> Hi all!
> >>>>>>>>>>
> >>>>>>>>>> I have updated the KIP-418 according to the new vision.
> >>>>>>>>>>
> >>>>>>>>>> Matthias, thanks for your comment!
> >>>>>>>>>>
> >>>>>>>>>>> Renaming KStream#branch() -> #split()
> >>>>>>>>>> I can see your point: this is to make the name similar to
> >>>>>>> String#split
> >>>>>>>>>> that also returns an array, right? But is it worth the loss of
> >>>>>>>> backwards
> >>>>>>>>>> compatibility? We can have overloaded branch() as well without
> >>>>>>>> affecting
> >>>>>>>>>> the existing code. Maybe the old array-based `branch` method
> >>>>> should
> >>>>>>> be
> >>>>>>>>>> deprecated, but this is a subject for discussion.
> >>>>>>>>>>
> >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
> >>>>> BranchingKStream#branch(),
> >>>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >>>>>>>>>>
> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default' is,
> >>>>>>> however, a
> >>>>>>>>>> reserved word, so unfortunately we cannot have a method with
> such
> >>>>>>> name
> >>>>>>>>> :-)
> >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument, but I
> >>>>> think
> >>>>>>> that
> >>>>>>>>>> is not required?
> >>>>>>>>>>
> >>>>>>>>>> Absolutely! I think that was just copy-paste error or something.
> >>>>>>>>>>
> >>>>>>>>>> Dear colleagues,
> >>>>>>>>>>
> >>>>>>>>>> please revise the new version of the KIP and Paul's PR
> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
> >>>>>>>>>>
> >>>>>>>>>> Any new suggestions/objections?
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>>
> >>>>>>>>>> Ivan
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
> >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems that
> >>>>>>> everybody
> >>>>>>>>>>> agrees that the current branch() method using arrays is not
> >>>>>> optimal.
> >>>>>>>>>>> I had a quick look into the PR and I like the overall proposal.
> >>>>>>> There
> >>>>>>>>>>> are some minor things we need to consider. I would recommend
> the
> >>>>>>>>>>> following renaming:
> >>>>>>>>>>>
> >>>>>>>>>>> KStream#branch() -> #split()
> >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
> >>>>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >>>>>>>>>>>
> >>>>>>>>>>> It's just a suggestion to get slightly shorter method names.
> >>>>>>>>>>>
> >>>>>>>>>>> In the current PR, defaultBranch() does take an `Predicate` as
> >>>>>>>> argument,
> >>>>>>>>>>> but I think that is not required?
> >>>>>>>>>>>
> >>>>>>>>>>> Also, we should consider KIP-307, that was recently accepted
> and
> >>>>>> is
> >>>>>>>>>>> currently implemented:
> >>>>>>>>>>>
> >>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> >>>>>>>>>>> Ie, we should add overloads that accepted a `Named` parameter.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> For the issue that the created `KStream` object are in
> different
> >>>>>>>> scopes:
> >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int index)`
> method
> >>>>>>> that
> >>>>>>>>>>> returns the corresponding "branched" result `KStream` object?
> >>>>>> Maybe,
> >>>>>>>> the
> >>>>>>>>>>> second argument of `addBranch()` should not be a
> >>>>>> `Consumer<KStream>`
> >>>>>>>> but
> >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return whatever
> >>>>>> the
> >>>>>>>>>>> `Function` returns?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Finally, I would also suggest to update the KIP with the
> current
> >>>>>>>>>>> proposal. That makes it easier to review.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
> >>>>>>>>>>>> Ivan,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it makes sense
> >>>>>> for
> >>>>>>>> you
> >>>>>>>>> to
> >>>>>>>>>>>> revise the KIP and continue the discussion.  Obviously we'll
> >>>>> need
> >>>>>>>> some
> >>>>>>>>>>>> buy-in from committers that have actual binding votes on
> >>>>> whether
> >>>>>>> the
> >>>>>>>>> KIP
> >>>>>>>>>>>> could be adopted.  It would be great to hear if they think
> this
> >>>>>> is
> >>>>>>> a
> >>>>>>>>> good
> >>>>>>>>>>>> idea overall.  I'm not sure if that happens just by starting a
> >>>>>>> vote,
> >>>>>>>>> or if
> >>>>>>>>>>>> there is generally some indication of interest beforehand.
> >>>>>>>>>>>>
> >>>>>>>>>>>> That being said, I'll continue the discussion a bit: assuming
> >>>>> we
> >>>>>> do
> >>>>>>>>> move
> >>>>>>>>>>>> forward the solution of "stream.branch() returns
> >>>>>> KBranchedStream",
> >>>>>>> do
> >>>>>>>>> we
> >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"?  I would
> >>>>> favor
> >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs that
> >>>>>>> accomplish
> >>>>>>>>> the
> >>>>>>>>>>>> same thing is confusing, especially when they're fairly
> similar
> >>>>>>>>> anyway.  We
> >>>>>>>>>>>> just need to be sure we're not making something
> >>>>>>> impossible/difficult
> >>>>>>>>> that
> >>>>>>>>>>>> is currently possible/easy.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding my PR - I think the general structure would work,
> >>>>> it's
> >>>>>>>> just a
> >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
> >>>>>>> particular,
> >>>>>>>>>>>> passing in the "predicates" and "children" lists which get
> >>>>>> modified
> >>>>>>>> in
> >>>>>>>>>>>> KBranchedStream but read from all the way KStreamLazyBranch is
> >>>>> a
> >>>>>>> bit
> >>>>>>>>>>>> complicated to follow.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Paul
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
> >>>>>> iponoma...@mail.ru
> >>>>>>>>> wrote:
> >>>>>>>>>>>>> Hi Paul!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I read your code carefully and now I am fully convinced: your
> >>>>>>>> proposal
> >>>>>>>>>>>>> looks better and should work. We just have to document the
> >>>>>> crucial
> >>>>>>>>> fact
> >>>>>>>>>>>>> that KStream consumers are invoked as they're added. And then
> >>>>>> it's
> >>>>>>>> all
> >>>>>>>>>>>>> going to be very nice.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and resume
> the
> >>>>>>>>>>>>> discussion here, right?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
> >>>>> starting
> >>>>>>>> point
> >>>>>>>>> if
> >>>>>>>>>>>>> we go in this direction'? To me it looks like a good starting
> >>>>>>> point.
> >>>>>>>>> But
> >>>>>>>>>>>>> as a novice in this project I might miss some important
> >>>>> details.
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Ivan
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
> >>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
> >>>>> stream.branch()
> >>>>>>>>> solution
> >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
> >>>>> invoked
> >>>>>> as
> >>>>>>>>> they’re
> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user still
> >>>>>> ought
> >>>>>>> to
> >>>>>>>>> be
> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and depend on
> >>>>> the
> >>>>>>>>> branched
> >>>>>>>>>>>>> streams having been set.
> >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to access
> >>>>> the
> >>>>>>>>> branched
> >>>>>>>>>>>>> streams in the same scope as the original stream (that is,
> not
> >>>>>>>> inside
> >>>>>>>>> the
> >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
> >>>>> solutions.
> >>>>>> It
> >>>>>>>>> can be
> >>>>>>>>>>>>> worked around though.
> >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m excited
> >>>>> to
> >>>>>>>> hear
> >>>>>>>>>>>>> your thoughts!]
> >>>>>>>>>>>>>> Paul
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
> >>>>>> iponoma...@mail.ru
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Hi Paul!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
> >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at first
> >>>>> glance,
> >>>>>>> but
> >>>>>>>>> ---
> >>>>>>>>>>>>>>>> the newly branched streams are not available in the same
> >>>>>> scope
> >>>>>>> as
> >>>>>>>>> each
> >>>>>>>>>>>>> other.  That is, if we wanted to merge them back together
> >>>>> again
> >>>>>> I
> >>>>>>>>> don't see
> >>>>>>>>>>>>> a way to do that.
> >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was just
> >>>>>> going
> >>>>>>> to
> >>>>>>>>>>>>> write in details about this issue.
> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say we need
> >>>>> to
> >>>>>>>>> identify
> >>>>>>>>>>>>> customers who have bought coffee and made a purchase in the
> >>>>>>>>> electronics
> >>>>>>>>>>>>> store to give them coupons.
> >>>>>>>>>>>>>>> This is the code I usually write under these circumstances
> >>>>>> using
> >>>>>>>> my
> >>>>>>>>>>>>> 'brancher' class:
> >>>>>>>>>>>>>>> @Setter
> >>>>>>>>>>>>>>> class CouponIssuer{
> >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
> >>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>   KStream<...> coupons(){
> >>>>>>>>>>>>>>>       return
> >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
> >>>>>>>>>>>>>>>       /*In the real world the code here can be complex, so
> >>>>>>>>> creation of
> >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in order to
> >>>>>>>> separate
> >>>>>>>>>>>>> classes' responsibilities.*/
> >>>>>>>>>>>>>>>  }
> >>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
> >>>>>>>>>>>>>>>     .branch(predicate1, couponIssuer::setCoffePurchases)
> >>>>>>>>>>>>>>>     .branch(predicate2,
> >>>>>> couponIssuer::setElectronicsPurchases)
> >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up
> everything
> >>>>>>>> later,
> >>>>>>>>>>>>> without the terminal operation!!!*/
> >>>>>>>>>>>>>>> couponIssuer.coupons()...
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Does this make sense?  In order to properly initialize the
> >>>>>>>>> CouponIssuer
> >>>>>>>>>>>>> we need the terminal operation to be called before
> >>>>>>>>> streamsBuilder.build()
> >>>>>>>>>>>>> is called.
> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is
> essentially
> >>>>>> the
> >>>>>>>>> next
> >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts based on
> >>>>> my
> >>>>>>>>> experience,
> >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> >>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a fluent
> >>>>> API
> >>>>>>>> based
> >>>>>>>>>>>>> off of
> >>>>>>>>>>>>>>>> KStream here (https://github.com/apache/kafka/pull/6512),
> >>>>>> and
> >>>>>>> I
> >>>>>>>>> think
> >>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>> succeeded at removing both cons.
> >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect earlier about
> >>>>>>> compatibility
> >>>>>>>>>>>>> issues,
> >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was unaware that Java
> >>>>> is
> >>>>>>>> smart
> >>>>>>>>>>>>> enough to
> >>>>>>>>>>>>>>>>    distinguish between a branch(varargs...) returning one
> >>>>>>> thing
> >>>>>>>>> and
> >>>>>>>>>>>>> branch()
> >>>>>>>>>>>>>>>>    with no arguments returning another thing.
> >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't actually need
> >>>>> it.
> >>>>>>> We
> >>>>>>>>> can
> >>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>    build up the branches in the KBranchedStream who shares
> >>>>>> its
> >>>>>>>>> state
> >>>>>>>>>>>>> with the
> >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do the branching.
> >>>>>>> It's
> >>>>>>>>> not
> >>>>>>>>>>>>> terribly
> >>>>>>>>>>>>>>>>    pretty in its current form, but I think it demonstrates
> >>>>>> its
> >>>>>>>>>>>>> feasibility.
> >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should be
> >>>>> final
> >>>>>> or
> >>>>>>>>> even a
> >>>>>>>>>>>>>>>> starting point if we go in this direction, I just wanted
> to
> >>>>>> see
> >>>>>>>> how
> >>>>>>>>>>>>>>>> challenging it would be to get the API working.
> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing solution
> >>>>>>> could
> >>>>>>>> be
> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
> >>>>> suggested
> >>>>>>>> was a
> >>>>>>>>>>>>>>>> possibility.  The reason is that the newly branched
> streams
> >>>>>> are
> >>>>>>>> not
> >>>>>>>>>>>>>>>> available in the same scope as each other.  That is, if we
> >>>>>>> wanted
> >>>>>>>>> to
> >>>>>>>>>>>>> merge
> >>>>>>>>>>>>>>>> them back together again I don't see a way to do that.
> The
> >>>>>> KIP
> >>>>>>>>>>>>> proposal
> >>>>>>>>>>>>>>>> has the same issue, though - all this means is that for
> >>>>>> either
> >>>>>>>>>>>>> solution,
> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the table.
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Paul
> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
> >>>>>>>>> iponoma...@mail.ru>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to this
> >>>>>> point.
> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that branch API
> >>>>>>> needs
> >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> There are two potential ways to do it:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
> >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
> >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
> >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
> >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...).... //onTopOf returns
> >>>>>> its
> >>>>>>>>> argument
> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't
> >>>>> make
> >>>>>>>> sense
> >>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance
> >>>>>>>>> contrasts the
> >>>>>>>>>>>>>>>>> fluency of other KStream methods.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
> >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
> >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or noDefault(). Both
> >>>>>>>>> defaultBranch(..)
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> noDefault() return void
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface is
> >>>>>> defined.
> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
> >>>>>>>> (defaultBranch(ks->)
> >>>>>>>>> and
> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to miss the
> >>>>>> fact
> >>>>>>>>> that one
> >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these
> methods
> >>>>>> are
> >>>>>>>> not
> >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do better?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> >>>>>>>>>>>>>>>>>>> Paul,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I see your point when you are talking about
> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be implemented
> the
> >>>>>>> easy
> >>>>>>>>> way.
> >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that assumes
> >>>>> nothing
> >>>>>>>> will
> >>>>>>>>>>>>> reach
> >>>>>>>>>>>>>>>>>>>> the default branch,
> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only option
> >>>>>> besides
> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we want to
> >>>>>> just
> >>>>>>>>> silently
> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any predicate. 2)
> >>>>>>> Throwing
> >>>>>>>>> an
> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing looks
> >>>>>> like a
> >>>>>>>> bad
> >>>>>>>>>>>>> idea.
> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to emit a
> >>>>>>>> special
> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly where
> >>>>>>> `default`
> >>>>>>>>> can
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> used.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >>>>> InternalTopologyBuilder
> >>>>>>> to
> >>>>>>>>> track
> >>>>>>>>>>>>>>>>>>>> dangling
> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear
> >>>>>>> error
> >>>>>>>>>>>>> before it
> >>>>>>>>>>>>>>>>>>> becomes an issue.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
> >>>>> compiled
> >>>>>>> and
> >>>>>>>>> run?
> >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't compile if
> >>>>> used
> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a method chain
> >>>>>>>> starting
> >>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference between
> >>>>>>>> runtime
> >>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
> >>>>> instantly
> >>>>>> on
> >>>>>>>>> unit
> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a compilation
> >>>>>>>> failure.
> >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> >>>>>>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being
> required.
> >>>>>>> But
> >>>>>>>> is
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> really
> >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't want a
> >>>>>> defaultBranch
> >>>>>>>>> they
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>> call
> >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?) just
> as
> >>>>>>>>> easily.  In
> >>>>>>>>>>>>>>>>>>>> fact I
> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API - a
> >>>>> user
> >>>>>>>> could
> >>>>>>>>>>>>> specify
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach the
> >>>>>> default
> >>>>>>>>> branch,
> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.  That
> >>>>> seems
> >>>>>>> like
> >>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API, which
> allows
> >>>>>> for
> >>>>>>>> the
> >>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>> subtle
> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting dropped.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has to be
> >>>>>> well
> >>>>>>>>>>>>>>>>>>>> documented, but
> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
> >>>>> InternalTopologyBuilder
> >>>>>>> to
> >>>>>>>>> track
> >>>>>>>>>>>>>>>>>>>> dangling
> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a
> clear
> >>>>>>> error
> >>>>>>>>>>>>> before it
> >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that there is a
> >>>>> "build
> >>>>>>>> step"
> >>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
> >>>>>> StreamsBuilder.build()
> >>>>>>> is
> >>>>>>>>>>>>> called.
> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I agree
> >>>>> that
> >>>>>>> it's
> >>>>>>>>>>>>>>>>>>>> critical to
> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input
> stream.
> >>>>>>> With
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> fluent
> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all other
> >>>>>>> operations
> >>>>>>>>> do -
> >>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> want to process off the original KStream multiple
> >>>>> times,
> >>>>>>> you
> >>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>> need the
> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many
> operations
> >>>>>> on
> >>>>>>> it
> >>>>>>>>> as
> >>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> desire.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thoughts?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>> Paul
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
> >>>>>>>>> iponoma...@mail.ru
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hello Paul,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not always
> need
> >>>>>> the
> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal operation we
> >>>>> don't
> >>>>>>>> know
> >>>>>>>>>>>>> when to
> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we
> >>>>> can
> >>>>>> do
> >>>>>>>>>>>>> something
> >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of special
> >>>>> object
> >>>>>>>>>>>>> construction
> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods. But
> >>>>> here
> >>>>>> we
> >>>>>>>>> have a
> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the flow,
> >>>>> so
> >>>>>> I
> >>>>>>>>> think
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> still idiomatic.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> >>>>>>>>>>>>>>>>>>>>>> Ivan,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this API, but I
> >>>>>> find
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> onTopOff()
> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it contrasts the
> >>>>>>> fluency
> >>>>>>>>> of
> >>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call
> >>>>> a
> >>>>>>>>> method on
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch cases
> >>>>> are
> >>>>>>>>> defined
> >>>>>>>>>>>>>>>>>>>>>> fluently.
> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very
> >>>>>> nice
> >>>>>>>>> and the
> >>>>>>>>>>>>>>>>>>>>>> right
> >>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around how we
> >>>>>>> specify
> >>>>>>>>> the
> >>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>>>>> stream.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Like:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> stream.branch()
> >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1, this::handle1)
> >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2, this::handle2)
> >>>>>>>>>>>>>>>>>>>>>>           .defaultBranch(this::handleDefault);
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
> >>>>>>>> KStreamBrancher
> >>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>> something,
> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and terminated by
> >>>>>>>>>>>>> defaultBranch()
> >>>>>>>>>>>>>>>>>>>>>> (which
> >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously incompatible with
> >>>>> the
> >>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>> API, so
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a different
> >>>>>> name,
> >>>>>>>> but
> >>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
> >>>>>> something
> >>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>> branched()
> >>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It
> >>>>>> seems
> >>>>>>>>> like it
> >>>>>>>>>>>>>>>>>>>>>> does to
> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching while also
> >>>>>>>> allowing
> >>>>>>>>> you
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
> KBranchedStreams
> >>>>>> if
> >>>>>>>>> desired.
> >>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>> Paul
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
> >>>>>>>>>>>>>>>>>>>>>>>           ks.filter(....).mapValues(...)
> >>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
> >>>>>>>>>>>>>>>>>>>>>>>           ks.selectKey(...).groupByKey()...
> >>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> ......
> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
> >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1, this::handleFirstCase)
> >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2, this::handleSecondCase)
> >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Ivan
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher
> >>>>> takes a
> >>>>>>>>> Consumer
> >>>>>>>>>>>>> as a
> >>>>>>>>>>>>>>>>>>>>>>> second
> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the example in
> >>>>>> the
> >>>>>>>> KIP
> >>>>>>>>>>>>> shows
> >>>>>>>>>>>>>>>>>>>>>>>> each
> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
> >>>>>>>>> (KafkaStreams#to()
> >>>>>>>>>>>>>>>>>>>>>>>> in this
> >>>>>>>>>>>>>>>>>>>>>>>> case).
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would we
> >>>>> handle
> >>>>>>> the
> >>>>>>>>> case
> >>>>>>>>>>>>>>>>>>>>>>>> where the
> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to continue
> >>>>>>>> processing
> >>>>>>>>> and
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the branched
> >>>>>> stream
> >>>>>>>>>>>>> immediately?
> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if we had
> >>>>>>>> something
> >>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>>> this:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
> >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
> >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
> >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
> >>>>>>>>> bbej...@gmail.com
> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> All,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP-
> >>>>> 418.
> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418.
> >>>>> Please
> >>>>>>>> take
> >>>>>>>>> a
> >>>>>>>>>>>>> look
> >>>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback
> :)
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
> >>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
> >>>>> https://github.com/apache/kafka/pull/6164
> >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>
> >
>
>

Reply via email to