I'm less enthusiastic about inlining the branch logic with its downstream
functionality. Programs that have deep branch trees will quickly become
harder to read as a single unit.

On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com> wrote:

> Also +1 on the issues/goals as Michael outlined them, I think that sets a
> great framework for the discussion.
>
> Regarding the SortedMap solution, my understanding is that the current
> proposal in the KIP is what is in my PR which (pending naming decisions) is
> roughly this:
>
> stream.split()
>     .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>     .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>     .defaultBranch(Consumer<KStream<K, V>>);
>
> Obviously some ordering is necessary, since branching as a construct
> doesn't work without it, but this solution seems like it provides as much
> associativity as the SortedMap solution, because each branch() call
> directly associates the "conditional" with the "code block."  The value it
> provides over the KIP solution is the accessing of streams in the same
> scope.
>
> The KIP solution is less "dynamic" than the SortedMap solution in the sense
> that it is slightly clumsier to add a dynamic number of branches, but it is
> certainly possible.  It seems to me like the API should favor the "static"
> case anyway, and should make it simple and readable to fluently declare and
> access your branches in-line.  It also makes it impossible to ignore a
> branch, and it is possible to build an (almost) identical SortedMap
> solution on top of it.
>
> I could also see a middle ground where instead of a raw SortedMap being
> taken in, branch() takes a name and not a Consumer.  Something like this:
>
> Map<String, KStream<K, V>> branches = stream.split()
>     .branch("branchOne", Predicate<K, V>)
>     .branch( "branchTwo", Predicate<K, V>)
>     .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
>
> Pros for that solution:
>  - accessing branched KStreams in same scope
>  - no double brace initialization, hopefully slightly more readable than
> SortedMap
>
> Cons
>  - downstream branch logic cannot be specified inline which makes it harder
> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>  - you can forget to "handle" one of the branched streams (like existing
> API and SortedMap, but unlike the KIP)
>
> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
> it).
>
> Overall I'm curious how important it is to be able to easily access the
> branched KStream in the same scope as the original.  It's possible that it
> doesn't need to be handled directly by the API, but instead left up to the
> user.  I'm sort of in the middle on it.
>
> Paul
>
>
>
> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > I'd like to +1 what Michael said about the issues with the existing
> branch
> > method, I agree with what he's outlined and I think we should proceed by
> > trying to alleviate these problems. Specifically it seems important to be
> > able to cleanly access the individual branches (eg by mapping
> > name->stream), which I thought was the original intention of this KIP.
> >
> > That said, I don't think we should so easily give in to the double brace
> > anti-pattern or force ours users into it if at all possible to
> avoid...just
> > my two cents.
> >
> > Cheers,
> > Sophie
> >
> > On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> > michael.droga...@confluent.io> wrote:
> >
> > > I’d like to propose a different way of thinking about this. To me,
> there
> > > are three problems with the existing branch signature:
> > >
> > > 1. If you use it the way most people do, Java raises unsafe type
> > warnings.
> > > 2. The way in which you use the stream branches is positionally coupled
> > to
> > > the ordering of the conditionals.
> > > 3. It is brittle to extend existing branch calls with additional code
> > > paths.
> > >
> > > Using associative constructs instead of relying on ordered constructs
> > would
> > > be a stronger approach. Consider a signature that instead looks like
> > this:
> > >
> > > Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<?
> > > super K,? super V>>);
> > >
> > > Branches are given names in a map, and as a result, the API returns a
> > > mapping of names to streams. The ordering of the conditionals is
> > maintained
> > > because it’s a sorted map. Insert order determines the order of
> > evaluation.
> > >
> > > This solves problem 1 because there are no more varargs. It solves
> > problem
> > > 2 because you no longer lean on ordering to access the branch you’re
> > > interested in. It solves problem 3 because you can introduce another
> > > conditional by simply attaching another name to the structure, rather
> > than
> > > messing with the existing indices.
> > >
> > > One of the drawbacks is that creating the map inline is historically
> > > awkward in Java. I know it’s an anti-pattern to use voluminously, but
> > > double brace initialization would clean up the aesthetics.
> > >
> > > On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io>
> wrote:
> > >
> > > > Hi Ivan,
> > > >
> > > > Thanks for the update.
> > > >
> > > > FWIW, I agree with Matthias that the current "start branching"
> operator
> > > is
> > > > confusing when named the same way as the actual branches. "Split"
> seems
> > > > like a good name. Alternatively, we can do without a "start
> branching"
> > > > operator at all, and just do:
> > > >
> > > > stream
> > > >       .branch(Predicate)
> > > >       .branch(Predicate)
> > > >       .defaultBranch();
> > > >
> > > > Tentatively, I think that this branching operation should be
> terminal.
> > > That
> > > > way, we don't create ambiguity about how to use it. That is, `branch`
> > > > should return `KBranchedStream`, while `defaultBranch` is `void`, to
> > > > enforce that it comes last, and that there is only one definition of
> > the
> > > > default branch. Potentially, we should log a warning if there's no
> > > default,
> > > > and additionally log a warning (or throw an exception) if a record
> > falls
> > > > though with no default.
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > Thanks for updating the KIP and your answers.
> > > > >
> > > > >
> > > > > >  this is to make the name similar to String#split
> > > > > >> that also returns an array, right?
> > > > >
> > > > > The intend was to avoid name duplication. The return type should
> > _not_
> > > > > be an array.
> > > > >
> > > > > The current proposal is
> > > > >
> > > > > stream.branch()
> > > > >       .branch(Predicate)
> > > > >       .branch(Predicate)
> > > > >       .defaultBranch();
> > > > >
> > > > > IMHO, this reads a little odd, because the first `branch()` does
> not
> > > > > take any parameters and has different semantics than the later
> > > > > `branch()` calls. Note, that from the code snippet above, it's
> hidden
> > > > > that the first call is `KStream#branch()` while the others are
> > > > > `KBranchedStream#branch()` what makes reading the code harder.
> > > > >
> > > > > Because I suggested to rename `addBranch()` -> `branch()`, I though
> > it
> > > > > might be better to also rename `KStream#branch()` to avoid the
> naming
> > > > > overlap that seems to be confusing. The following reads much
> cleaner
> > to
> > > > me:
> > > > >
> > > > > stream.split()
> > > > >       .branch(Predicate)
> > > > >       .branch(Predicate)
> > > > >       .defaultBranch();
> > > > >
> > > > > Maybe there is a better alternative to `split()` though to avoid
> the
> > > > > naming overlap.
> > > > >
> > > > >
> > > > > > 'default' is, however, a reserved word, so unfortunately we
> cannot
> > > have
> > > > > a method with such name :-)
> > > > >
> > > > > Bummer. Didn't consider this. Maybe we can still come up with a
> short
> > > > name?
> > > > >
> > > > >
> > > > > Can you add the interface `KBranchedStream` to the KIP with all
> it's
> > > > > methods? It will be part of public API and should be contained in
> the
> > > > > KIP. For example, it's unclear atm, what the return type of
> > > > > `defaultBranch()` is.
> > > > >
> > > > >
> > > > > You did not comment on the idea to add a `KBranchedStream#get(int
> > > index)
> > > > > -> KStream` method to get the individually branched-KStreams. Would
> > be
> > > > > nice to get your feedback about it. It seems you suggest that users
> > > > > would need to write custom utility code otherwise, to access them.
> We
> > > > > should discuss the pros and cons of both approaches. It feels
> > > > > "incomplete" to me atm, if the API has no built-in support to get
> the
> > > > > branched-KStreams directly.
> > > > >
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > > > > > Hi all!
> > > > > >
> > > > > > I have updated the KIP-418 according to the new vision.
> > > > > >
> > > > > > Matthias, thanks for your comment!
> > > > > >
> > > > > >> Renaming KStream#branch() -> #split()
> > > > > >
> > > > > > I can see your point: this is to make the name similar to
> > > String#split
> > > > > > that also returns an array, right? But is it worth the loss of
> > > > backwards
> > > > > > compatibility? We can have overloaded branch() as well without
> > > > affecting
> > > > > > the existing code. Maybe the old array-based `branch` method
> should
> > > be
> > > > > > deprecated, but this is a subject for discussion.
> > > > > >
> > > > > >> Renaming KBranchedStream#addBranch() ->
> BranchingKStream#branch(),
> > > > > > KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > > > > >
> > > > > > Totally agree with 'addBranch->branch' rename. 'default' is,
> > > however, a
> > > > > > reserved word, so unfortunately we cannot have a method with such
> > > name
> > > > > :-)
> > > > > >
> > > > > >> defaultBranch() does take an `Predicate` as argument, but I
> think
> > > that
> > > > > > is not required?
> > > > > >
> > > > > > Absolutely! I think that was just copy-paste error or something.
> > > > > >
> > > > > > Dear colleagues,
> > > > > >
> > > > > > please revise the new version of the KIP and Paul's PR
> > > > > > (https://github.com/apache/kafka/pull/6512)
> > > > > >
> > > > > > Any new suggestions/objections?
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Ivan
> > > > > >
> > > > > >
> > > > > > 11.04.2019 11:47, Matthias J. Sax пишет:
> > > > > >> Thanks for driving the discussion of this KIP. It seems that
> > > everybody
> > > > > >> agrees that the current branch() method using arrays is not
> > optimal.
> > > > > >>
> > > > > >> I had a quick look into the PR and I like the overall proposal.
> > > There
> > > > > >> are some minor things we need to consider. I would recommend the
> > > > > >> following renaming:
> > > > > >>
> > > > > >> KStream#branch() -> #split()
> > > > > >> KBranchedStream#addBranch() -> BranchingKStream#branch()
> > > > > >> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > > > > >>
> > > > > >> It's just a suggestion to get slightly shorter method names.
> > > > > >>
> > > > > >> In the current PR, defaultBranch() does take an `Predicate` as
> > > > argument,
> > > > > >> but I think that is not required?
> > > > > >>
> > > > > >> Also, we should consider KIP-307, that was recently accepted and
> > is
> > > > > >> currently implemented:
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > > > >>
> > > > > >> Ie, we should add overloads that accepted a `Named` parameter.
> > > > > >>
> > > > > >>
> > > > > >> For the issue that the created `KStream` object are in different
> > > > scopes:
> > > > > >> could we extend `KBranchedStream` with a `get(int index)` method
> > > that
> > > > > >> returns the corresponding "branched" result `KStream` object?
> > Maybe,
> > > > the
> > > > > >> second argument of `addBranch()` should not be a
> > `Consumer<KStream>`
> > > > but
> > > > > >> a `Function<KStream,KStream>` and `get()` could return whatever
> > the
> > > > > >> `Function` returns?
> > > > > >>
> > > > > >>
> > > > > >> Finally, I would also suggest to update the KIP with the current
> > > > > >> proposal. That makes it easier to review.
> > > > > >>
> > > > > >>
> > > > > >> -Matthias
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On 3/31/19 12:22 PM, Paul Whalen wrote:
> > > > > >>> Ivan,
> > > > > >>>
> > > > > >>> I'm a bit of a novice here as well, but I think it makes sense
> > for
> > > > you
> > > > > to
> > > > > >>> revise the KIP and continue the discussion.  Obviously we'll
> need
> > > > some
> > > > > >>> buy-in from committers that have actual binding votes on
> whether
> > > the
> > > > > KIP
> > > > > >>> could be adopted.  It would be great to hear if they think this
> > is
> > > a
> > > > > good
> > > > > >>> idea overall.  I'm not sure if that happens just by starting a
> > > vote,
> > > > > or if
> > > > > >>> there is generally some indication of interest beforehand.
> > > > > >>>
> > > > > >>> That being said, I'll continue the discussion a bit: assuming
> we
> > do
> > > > > move
> > > > > >>> forward the solution of "stream.branch() returns
> > KBranchedStream",
> > > do
> > > > > we
> > > > > >>> deprecate "stream.branch(...) returns KStream[]"?  I would
> favor
> > > > > >>> deprecating, since having two mutually exclusive APIs that
> > > accomplish
> > > > > the
> > > > > >>> same thing is confusing, especially when they're fairly similar
> > > > > anyway.  We
> > > > > >>> just need to be sure we're not making something
> > > impossible/difficult
> > > > > that
> > > > > >>> is currently possible/easy.
> > > > > >>>
> > > > > >>> Regarding my PR - I think the general structure would work,
> it's
> > > > just a
> > > > > >>> little sloppy overall in terms of naming and clarity. In
> > > particular,
> > > > > >>> passing in the "predicates" and "children" lists which get
> > modified
> > > > in
> > > > > >>> KBranchedStream but read from all the way KStreamLazyBranch is
> a
> > > bit
> > > > > >>> complicated to follow.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Paul
> > > > > >>>
> > > > > >>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
> > iponoma...@mail.ru
> > > >
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Hi Paul!
> > > > > >>>>
> > > > > >>>> I read your code carefully and now I am fully convinced: your
> > > > proposal
> > > > > >>>> looks better and should work. We just have to document the
> > crucial
> > > > > fact
> > > > > >>>> that KStream consumers are invoked as they're added. And then
> > it's
> > > > all
> > > > > >>>> going to be very nice.
> > > > > >>>>
> > > > > >>>> What shall we do now? I should re-write the KIP and resume the
> > > > > >>>> discussion here, right?
> > > > > >>>>
> > > > > >>>> Why are you telling that your PR 'should not be even a
> starting
> > > > point
> > > > > if
> > > > > >>>> we go in this direction'? To me it looks like a good starting
> > > point.
> > > > > But
> > > > > >>>> as a novice in this project I might miss some important
> details.
> > > > > >>>>
> > > > > >>>> Regards,
> > > > > >>>>
> > > > > >>>> Ivan
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> 28.03.2019 17:38, Paul Whalen пишет:
> > > > > >>>>> Ivan,
> > > > > >>>>>
> > > > > >>>>> Maybe I’m missing the point, but I believe the
> stream.branch()
> > > > > solution
> > > > > >>>> supports this. The couponIssuer::set* consumers will be
> invoked
> > as
> > > > > they’re
> > > > > >>>> added, not during streamsBuilder.build(). So the user still
> > ought
> > > to
> > > > > be
> > > > > >>>> able to call couponIssuer.coupons() afterward and depend on
> the
> > > > > branched
> > > > > >>>> streams having been set.
> > > > > >>>>> The issue I mean to point out is that it is hard to access
> the
> > > > > branched
> > > > > >>>> streams in the same scope as the original stream (that is, not
> > > > inside
> > > > > the
> > > > > >>>> couponIssuer), which is a problem with both proposed
> solutions.
> > It
> > > > > can be
> > > > > >>>> worked around though.
> > > > > >>>>> [Also, great to hear additional interest in 401, I’m excited
> to
> > > > hear
> > > > > >>>> your thoughts!]
> > > > > >>>>> Paul
> > > > > >>>>>
> > > > > >>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
> > iponoma...@mail.ru
> > > >
> > > > > wrote:
> > > > > >>>>>>
> > > > > >>>>>> Hi Paul!
> > > > > >>>>>>
> > > > > >>>>>> The idea to postpone the wiring of branches to the
> > > > > >>>> streamsBuilder.build() also looked great for me at first
> glance,
> > > but
> > > > > ---
> > > > > >>>>>>> the newly branched streams are not available in the same
> > scope
> > > as
> > > > > each
> > > > > >>>> other.  That is, if we wanted to merge them back together
> again
> > I
> > > > > don't see
> > > > > >>>> a way to do that.
> > > > > >>>>>> You just took the words right out of my mouth, I was just
> > going
> > > to
> > > > > >>>> write in details about this issue.
> > > > > >>>>>> Consider the example from Bill's book, p. 101: say we need
> to
> > > > > identify
> > > > > >>>> customers who have bought coffee and made a purchase in the
> > > > > electronics
> > > > > >>>> store to give them coupons.
> > > > > >>>>>> This is the code I usually write under these circumstances
> > using
> > > > my
> > > > > >>>> 'brancher' class:
> > > > > >>>>>> @Setter
> > > > > >>>>>> class CouponIssuer{
> > > > > >>>>>>    private KStream<....> coffePurchases;
> > > > > >>>>>>    private KStream<....> electronicsPurchases;
> > > > > >>>>>>
> > > > > >>>>>>    KStream<...> coupons(){
> > > > > >>>>>>        return
> > > > > coffePurchases.join(electronicsPurchases...)...whatever
> > > > > >>>>>>
> > > > > >>>>>>        /*In the real world the code here can be complex, so
> > > > > creation of
> > > > > >>>> a separate CouponIssuer class is fully justified, in order to
> > > > separate
> > > > > >>>> classes' responsibilities.*/
> > > > > >>>>>>   }
> > > > > >>>>>> }
> > > > > >>>>>>
> > > > > >>>>>> CouponIssuer couponIssuer = new CouponIssuer();
> > > > > >>>>>>
> > > > > >>>>>> new KafkaStreamsBrancher<....>()
> > > > > >>>>>>      .branch(predicate1, couponIssuer::setCoffePurchases)
> > > > > >>>>>>      .branch(predicate2,
> > couponIssuer::setElectronicsPurchases)
> > > > > >>>>>>      .onTopOf(transactionStream);
> > > > > >>>>>>
> > > > > >>>>>> /*Alas, this won't work if we're going to wire up everything
> > > > later,
> > > > > >>>> without the terminal operation!!!*/
> > > > > >>>>>> couponIssuer.coupons()...
> > > > > >>>>>>
> > > > > >>>>>> Does this make sense?  In order to properly initialize the
> > > > > CouponIssuer
> > > > > >>>> we need the terminal operation to be called before
> > > > > streamsBuilder.build()
> > > > > >>>> is called.
> > > > > >>>>>> [BTW Paul, I just found out that your KIP-401 is essentially
> > the
> > > > > next
> > > > > >>>> KIP I was going to write here. I have some thoughts based on
> my
> > > > > experience,
> > > > > >>>> so I will join the discussion on KIP-401 soon.]
> > > > > >>>>>> Regards,
> > > > > >>>>>>
> > > > > >>>>>> Ivan
> > > > > >>>>>>
> > > > > >>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> > > > > >>>>>>> Ivan,
> > > > > >>>>>>> I tried to make a very rough proof of concept of a fluent
> API
> > > > based
> > > > > >>>> off of
> > > > > >>>>>>> KStream here (https://github.com/apache/kafka/pull/6512),
> > and
> > > I
> > > > > think
> > > > > >>>> I
> > > > > >>>>>>> succeeded at removing both cons.
> > > > > >>>>>>>     - Compatibility: I was incorrect earlier about
> > > compatibility
> > > > > >>>> issues,
> > > > > >>>>>>>     there aren't any direct ones.  I was unaware that Java
> is
> > > > smart
> > > > > >>>> enough to
> > > > > >>>>>>>     distinguish between a branch(varargs...) returning one
> > > thing
> > > > > and
> > > > > >>>> branch()
> > > > > >>>>>>>     with no arguments returning another thing.
> > > > > >>>>>>>     - Requiring a terminal method: We don't actually need
> it.
> > > We
> > > > > can
> > > > > >>>> just
> > > > > >>>>>>>     build up the branches in the KBranchedStream who shares
> > its
> > > > > state
> > > > > >>>> with the
> > > > > >>>>>>>     ProcessorSupplier that will actually do the branching.
> > > It's
> > > > > not
> > > > > >>>> terribly
> > > > > >>>>>>>     pretty in its current form, but I think it demonstrates
> > its
> > > > > >>>> feasibility.
> > > > > >>>>>>> To be clear, I don't think that pull request should be
> final
> > or
> > > > > even a
> > > > > >>>>>>> starting point if we go in this direction, I just wanted to
> > see
> > > > how
> > > > > >>>>>>> challenging it would be to get the API working.
> > > > > >>>>>>> I will say though, that I'm not sure the existing solution
> > > could
> > > > be
> > > > > >>>>>>> deprecated in favor of this, which I had originally
> suggested
> > > > was a
> > > > > >>>>>>> possibility.  The reason is that the newly branched streams
> > are
> > > > not
> > > > > >>>>>>> available in the same scope as each other.  That is, if we
> > > wanted
> > > > > to
> > > > > >>>> merge
> > > > > >>>>>>> them back together again I don't see a way to do that.  The
> > KIP
> > > > > >>>> proposal
> > > > > >>>>>>> has the same issue, though - all this means is that for
> > either
> > > > > >>>> solution,
> > > > > >>>>>>> deprecating the existing branch(...) is not on the table.
> > > > > >>>>>>> Thanks,
> > > > > >>>>>>> Paul
> > > > > >>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
> > > > > iponoma...@mail.ru>
> > > > > >>>> wrote:
> > > > > >>>>>>>> OK, let me summarize what we have discussed up to this
> > point.
> > > > > >>>>>>>>
> > > > > >>>>>>>> First, it seems that it's commonly agreed that branch API
> > > needs
> > > > > >>>>>>>> improvement. Motivation is given in the KIP.
> > > > > >>>>>>>>
> > > > > >>>>>>>> There are two potential ways to do it:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 1. (as origianlly proposed)
> > > > > >>>>>>>>
> > > > > >>>>>>>> new KafkaStreamsBrancher<..>()
> > > > > >>>>>>>>     .branch(predicate1, ks ->..)
> > > > > >>>>>>>>     .branch(predicate2, ks->..)
> > > > > >>>>>>>>     .defaultBranch(ks->..) //optional
> > > > > >>>>>>>>     .onTopOf(stream).mapValues(...).... //onTopOf returns
> > its
> > > > > argument
> > > > > >>>>>>>>
> > > > > >>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't
> make
> > > > sense
> > > > > >>>> until
> > > > > >>>>>>>> all the necessary ingredients are provided.
> > > > > >>>>>>>>
> > > > > >>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance
> > > > > contrasts the
> > > > > >>>>>>>> fluency of other KStream methods.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 2. (as Paul proposes)
> > > > > >>>>>>>>
> > > > > >>>>>>>> stream
> > > > > >>>>>>>>     .branch(predicate1, ks ->...)
> > > > > >>>>>>>>     .branch(predicate2, ks->...)
> > > > > >>>>>>>>     .defaultBranch(ks->...) //or noDefault(). Both
> > > > > defaultBranch(..)
> > > > > >>>> and
> > > > > >>>>>>>> noDefault() return void
> > > > > >>>>>>>>
> > > > > >>>>>>>> PROS: Generally follows the way KStreams interface is
> > defined.
> > > > > >>>>>>>>
> > > > > >>>>>>>> CONS: We need to define two terminal methods
> > > > (defaultBranch(ks->)
> > > > > and
> > > > > >>>>>>>> noDefault()). And for a user it is very easy to miss the
> > fact
> > > > > that one
> > > > > >>>>>>>> of the terminal methods should be called. If these methods
> > are
> > > > not
> > > > > >>>>>>>> called, we can throw an exception in runtime.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Colleagues, what are your thoughts? Can we do better?
> > > > > >>>>>>>>
> > > > > >>>>>>>> Regards,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Ivan
> > > > > >>>>>>>>
> > > > > >>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> > > > > >>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> > > > > >>>>>>>>>> Paul,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I see your point when you are talking about
> > > > > >>>>>>>>>> stream..branch..branch...default..
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Still, I believe that this cannot not be implemented the
> > > easy
> > > > > way.
> > > > > >>>>>>>>>> Maybe we all should think further.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Let me comment on two of your ideas.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> user could specify a terminal method that assumes
> nothing
> > > > will
> > > > > >>>> reach
> > > > > >>>>>>>>>>> the default branch,
> > > > > >>>>>>>>>> throwing an exception if such a case occurs.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 1) OK, apparently this should not be the only option
> > besides
> > > > > >>>>>>>>>> `default`, because there are scenarios when we want to
> > just
> > > > > silently
> > > > > >>>>>>>>>> drop the messages that didn't match any predicate. 2)
> > > Throwing
> > > > > an
> > > > > >>>>>>>>>> exception in the middle of data flow processing looks
> > like a
> > > > bad
> > > > > >>>> idea.
> > > > > >>>>>>>>>> In stream processing paradigm, I would prefer to emit a
> > > > special
> > > > > >>>>>>>>>> message to a dedicated stream. This is exactly where
> > > `default`
> > > > > can
> > > > > >>>> be
> > > > > >>>>>>>>>> used.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> it would be fairly easily for the
> InternalTopologyBuilder
> > > to
> > > > > track
> > > > > >>>>>>>>>>> dangling
> > > > > >>>>>>>>>> branches that haven't been terminated and raise a clear
> > > error
> > > > > >>>> before it
> > > > > >>>>>>>>>> becomes an issue.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> You mean a runtime exception, when the program is
> compiled
> > > and
> > > > > run?
> > > > > >>>>>>>>>> Well,  I'd prefer an API that simply won't compile if
> used
> > > > > >>>>>>>>>> incorrectly. Can we build such an API as a method chain
> > > > starting
> > > > > >>>> from
> > > > > >>>>>>>>>> KStream object? There is a huge cost difference between
> > > > runtime
> > > > > and
> > > > > >>>>>>>>>> compile-time errors. Even if a failure uncovers
> instantly
> > on
> > > > > unit
> > > > > >>>>>>>>>> tests, it costs more for the project than a compilation
> > > > failure.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Regards,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Ivan
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> > > > > >>>>>>>>>>> Ivan,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Good point about the terminal operation being required.
> > > But
> > > > is
> > > > > >>>> that
> > > > > >>>>>>>>>>> really
> > > > > >>>>>>>>>>> such a bad thing?  If the user doesn't want a
> > defaultBranch
> > > > > they
> > > > > >>>> can
> > > > > >>>>>>>>>>> call
> > > > > >>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as
> > > > > easily.  In
> > > > > >>>>>>>>>>> fact I
> > > > > >>>>>>>>>>> think it creates an opportunity for a nicer API - a
> user
> > > > could
> > > > > >>>> specify
> > > > > >>>>>>>> a
> > > > > >>>>>>>>>>> terminal method that assumes nothing will reach the
> > default
> > > > > branch,
> > > > > >>>>>>>>>>> throwing an exception if such a case occurs.  That
> seems
> > > like
> > > > > an
> > > > > >>>>>>>>>>> improvement over the current branch() API, which allows
> > for
> > > > the
> > > > > >>>> more
> > > > > >>>>>>>>>>> subtle
> > > > > >>>>>>>>>>> behavior of records unexpectedly getting dropped.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> The need for a terminal operation certainly has to be
> > well
> > > > > >>>>>>>>>>> documented, but
> > > > > >>>>>>>>>>> it would be fairly easily for the
> InternalTopologyBuilder
> > > to
> > > > > track
> > > > > >>>>>>>>>>> dangling
> > > > > >>>>>>>>>>> branches that haven't been terminated and raise a clear
> > > error
> > > > > >>>> before it
> > > > > >>>>>>>>>>> becomes an issue.  Especially now that there is a
> "build
> > > > step"
> > > > > >>>> where
> > > > > >>>>>>>> the
> > > > > >>>>>>>>>>> topology is actually wired up, when
> > StreamsBuilder.build()
> > > is
> > > > > >>>> called.
> > > > > >>>>>>>>>>> Regarding onTopOf() returning its argument, I agree
> that
> > > it's
> > > > > >>>>>>>>>>> critical to
> > > > > >>>>>>>>>>> allow users to do other operations on the input stream.
> > > With
> > > > > the
> > > > > >>>>>>>> fluent
> > > > > >>>>>>>>>>> solution, it ought to work the same way all other
> > > operations
> > > > > do -
> > > > > >>>> if
> > > > > >>>>>>>> you
> > > > > >>>>>>>>>>> want to process off the original KStream multiple
> times,
> > > you
> > > > > just
> > > > > >>>>>>>>>>> need the
> > > > > >>>>>>>>>>> stream as a variable so you can call as many operations
> > on
> > > it
> > > > > as
> > > > > >>>> you
> > > > > >>>>>>>>>>> desire.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Thoughts?
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Best,
> > > > > >>>>>>>>>>> Paul
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
> > > > > iponoma...@mail.ru
> > > > > >>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> Hello Paul,
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> I afraid this won't work because we do not always need
> > the
> > > > > >>>>>>>>>>>> defaultBranch. And without a terminal operation we
> don't
> > > > know
> > > > > >>>> when to
> > > > > >>>>>>>>>>>> finalize and build the 'branch switch'.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we
> can
> > do
> > > > > >>>> something
> > > > > >>>>>>>>>>>> more with the original branch after branching.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> I understand your point that the need of special
> object
> > > > > >>>> construction
> > > > > >>>>>>>>>>>> contrasts the fluency of most KStream methods. But
> here
> > we
> > > > > have a
> > > > > >>>>>>>>>>>> special case: we build the switch to split the flow,
> so
> > I
> > > > > think
> > > > > >>>> this
> > > > > >>>>>>>> is
> > > > > >>>>>>>>>>>> still idiomatic.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Regards,
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Ivan
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> > > > > >>>>>>>>>>>>> Ivan,
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> I think it's a great idea to improve this API, but I
> > find
> > > > the
> > > > > >>>>>>>>>>>>> onTopOff()
> > > > > >>>>>>>>>>>>> mechanism a little confusing since it contrasts the
> > > fluency
> > > > > of
> > > > > >>>> other
> > > > > >>>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call
> a
> > > > > method on
> > > > > >>>> the
> > > > > >>>>>>>>>>>> stream
> > > > > >>>>>>>>>>>>> so it still reads top to bottom if the branch cases
> are
> > > > > defined
> > > > > >>>>>>>>>>>>> fluently.
> > > > > >>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very
> > nice
> > > > > and the
> > > > > >>>>>>>>>>>>> right
> > > > > >>>>>>>>>>>> way
> > > > > >>>>>>>>>>>>> to do things, but what if we flipped around how we
> > > specify
> > > > > the
> > > > > >>>> source
> > > > > >>>>>>>>>>>>> stream.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Like:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> stream.branch()
> > > > > >>>>>>>>>>>>>            .addBranch(predicate1, this::handle1)
> > > > > >>>>>>>>>>>>>            .addBranch(predicate2, this::handle2)
> > > > > >>>>>>>>>>>>>            .defaultBranch(this::handleDefault);
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
> > > > KStreamBrancher
> > > > > or
> > > > > >>>>>>>>>>>> something,
> > > > > >>>>>>>>>>>>> which is added to by addBranch() and terminated by
> > > > > >>>> defaultBranch()
> > > > > >>>>>>>>>>>>> (which
> > > > > >>>>>>>>>>>>> returns void).  This is obviously incompatible with
> the
> > > > > current
> > > > > >>>>>>>>>>>>> API, so
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>> new stream.branch() would have to have a different
> > name,
> > > > but
> > > > > that
> > > > > >>>>>>>>>>>>> seems
> > > > > >>>>>>>>>>>>> like a fairly small problem - we could call it
> > something
> > > > like
> > > > > >>>>>>>>>>>>> branched()
> > > > > >>>>>>>>>>>> or
> > > > > >>>>>>>>>>>>> branchedStreams() and deprecate the old API.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It
> > seems
> > > > > like it
> > > > > >>>>>>>>>>>>> does to
> > > > > >>>>>>>>>>>>> me, allowing for clear in-line branching while also
> > > > allowing
> > > > > you
> > > > > >>>> to
> > > > > >>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams
> > if
> > > > > desired.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>> Paul
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> > > > > >>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Hi Bill,
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thank you for your reply!
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> This is how I usually do it:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
> > > > > >>>>>>>>>>>>>>            ks.filter(....).mapValues(...)
> > > > > >>>>>>>>>>>>>> }
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
> > > > > >>>>>>>>>>>>>>            ks.selectKey(...).groupByKey()...
> > > > > >>>>>>>>>>>>>> }
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> ......
> > > > > >>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
> > > > > >>>>>>>>>>>>>>       .addBranch(predicate1, this::handleFirstCase)
> > > > > >>>>>>>>>>>>>>       .addBranch(predicate2, this::handleSecondCase)
> > > > > >>>>>>>>>>>>>>       .onTopOf(....)
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Regards,
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Ivan
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> > > > > >>>>>>>>>>>>>>> Hi Ivan,
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks for the KIP.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher
> takes a
> > > > > Consumer
> > > > > >>>> as a
> > > > > >>>>>>>>>>>>>> second
> > > > > >>>>>>>>>>>>>>> argument which returns nothing, and the example in
> > the
> > > > KIP
> > > > > >>>> shows
> > > > > >>>>>>>>>>>>>>> each
> > > > > >>>>>>>>>>>>>>> stream from the branch using a terminal node
> > > > > (KafkaStreams#to()
> > > > > >>>>>>>>>>>>>>> in this
> > > > > >>>>>>>>>>>>>>> case).
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Maybe I've missed something, but how would we
> handle
> > > the
> > > > > case
> > > > > >>>>>>>>>>>>>>> where the
> > > > > >>>>>>>>>>>>>>> user has created a branch but wants to continue
> > > > processing
> > > > > and
> > > > > >>>> not
> > > > > >>>>>>>>>>>>>>> necessarily use a terminal node on the branched
> > stream
> > > > > >>>> immediately?
> > > > > >>>>>>>>>>>>>>> For example, using today's logic as is if we had
> > > > something
> > > > > like
> > > > > >>>>>>>>>>>>>>> this:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> KStream<String, String>[] branches =
> > > > > >>>>>>>>>>>>>>> originalStream.branch(predicate1,
> > > > > >>>>>>>>>>>>>>> predicate2);
> > > > > >>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
> > > > > >>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks!
> > > > > >>>>>>>>>>>>>>> Bill
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
> > > > > bbej...@gmail.com
> > > > > >>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> All,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP-
> 418.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Here's the original message:
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Hello,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418.
> Please
> > > > take
> > > > > a
> > > > > >>>> look
> > > > > >>>>>>>> at
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :)
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> KIP-418:
> > > > > >>>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> > > > > >>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> > > > > >>>> https://issues.apache.org/jira/browse/KAFKA-5488
> > > > > >>>>>>>>>>>>>>>> PR#6164:
> https://github.com/apache/kafka/pull/6164
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Regards,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Ivan Ponomarev
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to