Ivan,

Good point about the terminal operation being required.  But is that really
such a bad thing?  If the user doesn't want a defaultBranch they can call
some other terminal method (noDefaultBranch()?) just as easily.  In fact I
think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well documented, but
it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's critical to
allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just need the
stream as a variable so you can call as many operations on it as you desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <iponoma...@mail.ru> wrote:

> Hello Paul,
>
> I afraid this won't work because we do not always need the
> defaultBranch. And without a terminal operation we don't know when to
> finalize and build the 'branch switch'.
>
> In my proposal, onTopOf returns its argument, so we can do something
> more with the original branch after branching.
>
> I understand your point that the need of special object construction
> contrasts the fluency of most KStream methods. But here we have a
> special case: we build the switch to split the flow, so I think this is
> still idiomatic.
>
> Regards,
>
> Ivan
>
>
>
> 24.03.2019 4:02, Paul Whalen пишет:
> > Ivan,
> >
> > I think it's a great idea to improve this API, but I find the onTopOff()
> > mechanism a little confusing since it contrasts the fluency of other
> > KStream method calls.  Ideally I'd like to just call a method on the
> stream
> > so it still reads top to bottom if the branch cases are defined fluently.
> > I think the addBranch(predicate, handleCase) is very nice and the right
> way
> > to do things, but what if we flipped around how we specify the source
> > stream.
> >
> > Like:
> >
> > stream.branch()
> >          .addBranch(predicate1, this::handle1)
> >          .addBranch(predicate2, this::handle2)
> >          .defaultBranch(this::handleDefault);
> >
> > Where branch() returns a KBranchedStreams or KStreamBrancher or
> something,
> > which is added to by addBranch() and terminated by defaultBranch() (which
> > returns void).  This is obviously incompatible with the current API, so
> the
> > new stream.branch() would have to have a different name, but that seems
> > like a fairly small problem - we could call it something like branched()
> or
> > branchedStreams() and deprecate the old API.
> >
> > Does this satisfy the motivations of your KIP?  It seems like it does to
> > me, allowing for clear in-line branching while also allowing you to
> > dynamically build of branches off of KBranchedStreams if desired.
> >
> > Thanks,
> > Paul
> >
> >
> >
> > On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> <iponoma...@mail.ru.invalid>
> > wrote:
> >
> >> Hi Bill,
> >>
> >> Thank you for your reply!
> >>
> >> This is how I usually do it:
> >>
> >> void handleFirstCase(KStream<String, String> ks){
> >>          ks.filter(....).mapValues(...)
> >> }
> >>
> >>
> >> void handleSecondCase(KStream<String, String> ks){
> >>          ks.selectKey(...).groupByKey()...
> >> }
> >>
> >> ......
> >> new KafkaStreamsBrancher<String, String>()
> >>     .addBranch(predicate1, this::handleFirstCase)
> >>     .addBranch(predicate2, this::handleSecondCase)
> >>     .onTopOf(....)
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >> 22.03.2019 1:34, Bill Bejeck пишет:
> >>> Hi Ivan,
> >>>
> >>> Thanks for the KIP.
> >>>
> >>> I have one question, the KafkaStreamsBrancher takes a Consumer as a
> >> second
> >>> argument which returns nothing, and the example in the KIP shows each
> >>> stream from the branch using a terminal node (KafkaStreams#to() in this
> >>> case).
> >>>
> >>> Maybe I've missed something, but how would we handle the case where the
> >>> user has created a branch but wants to continue processing and not
> >>> necessarily use a terminal node on the branched stream immediately?
> >>>
> >>> For example, using today's logic as is if we had something like this:
> >>>
> >>> KStream<String, String>[] branches = originalStream.branch(predicate1,
> >>> predicate2);
> >>> branches[0].filter(....).mapValues(...)..
> >>> branches[1].selectKey(...).groupByKey().....
> >>>
> >>>
> >>> Thanks!
> >>> Bill
> >>>
> >>>
> >>>
> >>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com> wrote:
> >>>
> >>>> All,
> >>>>
> >>>> I'd like to jump-start the discussion for KIP- 418.
> >>>>
> >>>> Here's the original message:
> >>>>
> >>>> Hello,
> >>>>
> >>>> I'd like to start a discussion about KIP-418. Please take a look at
> the
> >>>> KIP if you can, I would appreciate any feedback :)
> >>>>
> >>>> KIP-418:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>>> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488
> >>>>
> >>>> PR#6164: https://github.com/apache/kafka/pull/6164
> >>>>
> >>>> Regards,
> >>>>
> >>>> Ivan Ponomarev
> >>>>
> >>>>
> >>
>
>

Reply via email to