Paul,

I see your point when you are talking about stream..branch..branch...default..

Still, I believe that this cannot not be implemented the easy way. Maybe we all should think further.

Let me comment on two of your ideas.

user could specify a terminal method that assumes nothing will reach the 
default branch,
throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option besides `default`, because there are scenarios when we want to just silently drop the messages that didn't match any predicate. 2) Throwing an exception in the middle of data flow processing looks like a bad idea. In stream processing paradigm, I would prefer to emit a special message to a dedicated stream. This is exactly where `default` can be used.

it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.

You mean a runtime exception, when the program is compiled and run? Well,  I'd prefer an API that simply won't compile if used incorrectly. Can we build such an API as a method chain starting from KStream object? There is a huge cost difference between runtime and compile-time errors. Even if a failure uncovers instantly on unit tests, it costs more for the project than a compilation failure.

Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:
Ivan,

Good point about the terminal operation being required.  But is that really
such a bad thing?  If the user doesn't want a defaultBranch they can call
some other terminal method (noDefaultBranch()?) just as easily.  In fact I
think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well documented, but
it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's critical to
allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just need the
stream as a variable so you can call as many operations on it as you desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <iponoma...@mail.ru> wrote:

Hello Paul,

I afraid this won't work because we do not always need the
defaultBranch. And without a terminal operation we don't know when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do something
more with the original branch after branching.

I understand your point that the need of special object construction
contrasts the fluency of most KStream methods. But here we have a
special case: we build the switch to split the flow, so I think this is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:
Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the
stream
so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right
way
to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
          .addBranch(predicate1, this::handle1)
          .addBranch(predicate2, this::handle2)
          .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or
something,
which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so
the
new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched()
or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
<iponoma...@mail.ru.invalid>
wrote:

Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream<String, String> ks){
          ks.filter(....).mapValues(...)
}


void handleSecondCase(KStream<String, String> ks){
          ks.selectKey(...).groupByKey()...
}

......
new KafkaStreamsBrancher<String, String>()
     .addBranch(predicate1, this::handleFirstCase)
     .addBranch(predicate2, this::handleSecondCase)
     .onTopOf(....)

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:
Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a
second
argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream<String, String>[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter(....).mapValues(...)..
branches[1].selectKey(...).groupByKey().....


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com> wrote:

All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a look at
the
KIP if you can, I would appreciate any feedback :)

KIP-418:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev




Reply via email to