Hello everyone, thank you all for joining the discussion!

Well, I don't think the idea of named branches, be it a LinkedHashMap (no other Map will do, because order of definition matters) or `branch` method  taking name and Consumer has more advantages than drawbacks.

In my opinion, the only real positive outcome from Michael's proposal is that all the returned branches are in the same scope. But 1) we rarely need them in the same scope 2) there is a workaround for the scope problem, described in the KIP.

'Inlining the complex logic' is not a problem, because we can use method references instead of lambdas. In real world scenarios you tend to split the complex logic to methods anyway, so the code is going to be clean.

The drawbacks are strong. The cohesion between predicates and handlers is lost. We have to define predicates in one place, and handlers in another. This opens the door for bugs:

- what if we forget to define a handler for a name? or a name for a handler?
- what if we misspell a name?
- what if we copy-paste and duplicate a name?

What Michael propose would have been totally OK if we had been writing the API in Lua, Ruby or Python. In those languages the "dynamic naming" approach would have looked most concise and beautiful. But in Java we expect all the problems related to identifiers to be eliminated in compile time.

Do we have to invent duck-typing for the Java API?

And if we do, what advantage are we supposed to get besides having all the branches in the same scope? Michael, maybe I'm missing your point?

---

Earlier in this discussion John Roesler also proposed to do without "start branching" operator, and later Paul mentioned that in the case when we have to add a dynamic number of branches, the current KIP is 'clumsier' compared to Michael's 'Map' solution. Let me address both comments here.

1) "Start branching" operator (I think that *split* is a good name for it indeed) is critical when we need to do a dynamic branching, see example below.

2) No, dynamic branching in current KIP is not clumsy at all. Imagine a real-world scenario when you need one branch per enum value (say, RecordType). You can have something like this:

/*John:if we had to start with stream.branch(...) here, it would have been much messier.*/
KBranchedStream branched = stream.split();

/*Not clumsy at all :-)*/
for (RecordType recordType : RecordType.values())
            branched = branched.branch((k, v) -> v.getRecType() == recordType,
                    recordType::processRecords);

Regards,

Ivan


02.05.2019 14:40, Matthias J. Sax пишет:
I also agree with Michael's observation about the core problem of
current `branch()` implementation.

However, I also don't like to pass in a clumsy Map object. My thinking
was more aligned with Paul's proposal to just add a name to each
`branch()` statement and return a `Map<String,KStream>`.

It makes the code easier to read, and also make the order of
`Predicates` (that is essential) easier to grasp.

Map<String, KStream<K, V>> branches = stream.split()
    .branch("branchOne", Predicate<K, V>)
    .branch( "branchTwo", Predicate<K, V>)
    .defaultBranch("defaultBranch");
An open question is the case for which no defaultBranch() should be
specified. Atm, `split()` and `branch()` would return `BranchedKStream`
and the call to `defaultBranch()` that returns the `Map` is mandatory
(what is not the case atm). Or is this actually not a real problem,
because users can just ignore the branch returned by `defaultBranch()`
in the result `Map` ?


About "inlining": So far, it seems to be a matter of personal
preference. I can see arguments for both, but no "killer argument" yet
that clearly make the case for one or the other.


-Matthias

On 5/1/19 6:26 PM, Paul Whalen wrote:
Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
with the full downstream topology be defined inline - it can be a method 
reference as with Ivan’s original suggestion.  The advantage of putting the 
predicate and its downstream logic (Consumer) together in branch() is that they 
are required to be near to each other.

Ultimately the downstream code has to live somewhere, and deep branch trees 
will be hard to read regardless.

On May 1, 2019, at 1:07 PM, Michael Drogalis <michael.droga...@confluent.io> 
wrote:

I'm less enthusiastic about inlining the branch logic with its downstream
functionality. Programs that have deep branch trees will quickly become
harder to read as a single unit.

On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com> wrote:

Also +1 on the issues/goals as Michael outlined them, I think that sets a
great framework for the discussion.

Regarding the SortedMap solution, my understanding is that the current
proposal in the KIP is what is in my PR which (pending naming decisions) is
roughly this:

stream.split()
    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
    .defaultBranch(Consumer<KStream<K, V>>);

Obviously some ordering is necessary, since branching as a construct
doesn't work without it, but this solution seems like it provides as much
associativity as the SortedMap solution, because each branch() call
directly associates the "conditional" with the "code block."  The value it
provides over the KIP solution is the accessing of streams in the same
scope.

The KIP solution is less "dynamic" than the SortedMap solution in the sense
that it is slightly clumsier to add a dynamic number of branches, but it is
certainly possible.  It seems to me like the API should favor the "static"
case anyway, and should make it simple and readable to fluently declare and
access your branches in-line.  It also makes it impossible to ignore a
branch, and it is possible to build an (almost) identical SortedMap
solution on top of it.

I could also see a middle ground where instead of a raw SortedMap being
taken in, branch() takes a name and not a Consumer.  Something like this:

Map<String, KStream<K, V>> branches = stream.split()
    .branch("branchOne", Predicate<K, V>)
    .branch( "branchTwo", Predicate<K, V>)
    .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);

Pros for that solution:
- accessing branched KStreams in same scope
- no double brace initialization, hopefully slightly more readable than
SortedMap

Cons
- downstream branch logic cannot be specified inline which makes it harder
to read top to bottom (like existing API and SortedMap, but unlike the KIP)
- you can forget to "handle" one of the branched streams (like existing
API and SortedMap, but unlike the KIP)

(KBranchedStreams could even work *both* ways but perhaps that's overdoing
it).

Overall I'm curious how important it is to be able to easily access the
branched KStream in the same scope as the original.  It's possible that it
doesn't need to be handled directly by the API, but instead left up to the
user.  I'm sort of in the middle on it.

Paul



On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

I'd like to +1 what Michael said about the issues with the existing
branch
method, I agree with what he's outlined and I think we should proceed by
trying to alleviate these problems. Specifically it seems important to be
able to cleanly access the individual branches (eg by mapping
name->stream), which I thought was the original intention of this KIP.

That said, I don't think we should so easily give in to the double brace
anti-pattern or force ours users into it if at all possible to
avoid...just
my two cents.

Cheers,
Sophie

On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
michael.droga...@confluent.io> wrote:

I’d like to propose a different way of thinking about this. To me,
there
are three problems with the existing branch signature:

1. If you use it the way most people do, Java raises unsafe type
warnings.
2. The way in which you use the stream branches is positionally coupled
to
the ordering of the conditionals.
3. It is brittle to extend existing branch calls with additional code
paths.

Using associative constructs instead of relying on ordered constructs
would
be a stronger approach. Consider a signature that instead looks like
this:
Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<?
super K,? super V>>);

Branches are given names in a map, and as a result, the API returns a
mapping of names to streams. The ordering of the conditionals is
maintained
because it’s a sorted map. Insert order determines the order of
evaluation.
This solves problem 1 because there are no more varargs. It solves
problem
2 because you no longer lean on ordering to access the branch you’re
interested in. It solves problem 3 because you can introduce another
conditional by simply attaching another name to the structure, rather
than
messing with the existing indices.

One of the drawbacks is that creating the map inline is historically
awkward in Java. I know it’s an anti-pattern to use voluminously, but
double brace initialization would clean up the aesthetics.

On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io>
wrote:
Hi Ivan,

Thanks for the update.

FWIW, I agree with Matthias that the current "start branching"
operator
is
confusing when named the same way as the actual branches. "Split"
seems
like a good name. Alternatively, we can do without a "start
branching"
operator at all, and just do:

stream
      .branch(Predicate)
      .branch(Predicate)
      .defaultBranch();

Tentatively, I think that this branching operation should be
terminal.
That
way, we don't create ambiguity about how to use it. That is, `branch`
should return `KBranchedStream`, while `defaultBranch` is `void`, to
enforce that it comes last, and that there is only one definition of
the
default branch. Potentially, we should log a warning if there's no
default,
and additionally log a warning (or throw an exception) if a record
falls
though with no default.

Thoughts?

Thanks,
-John

On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
matth...@confluent.io
wrote:

Thanks for updating the KIP and your answers.


this is to make the name similar to String#split
that also returns an array, right?
The intend was to avoid name duplication. The return type should
_not_
be an array.

The current proposal is

stream.branch()
      .branch(Predicate)
      .branch(Predicate)
      .defaultBranch();

IMHO, this reads a little odd, because the first `branch()` does
not
take any parameters and has different semantics than the later
`branch()` calls. Note, that from the code snippet above, it's
hidden
that the first call is `KStream#branch()` while the others are
`KBranchedStream#branch()` what makes reading the code harder.

Because I suggested to rename `addBranch()` -> `branch()`, I though
it
might be better to also rename `KStream#branch()` to avoid the
naming
overlap that seems to be confusing. The following reads much
cleaner
to
me:
stream.split()
      .branch(Predicate)
      .branch(Predicate)
      .defaultBranch();

Maybe there is a better alternative to `split()` though to avoid
the
naming overlap.


'default' is, however, a reserved word, so unfortunately we
cannot
have
a method with such name :-)

Bummer. Didn't consider this. Maybe we can still come up with a
short
name?

Can you add the interface `KBranchedStream` to the KIP with all
it's
methods? It will be part of public API and should be contained in
the
KIP. For example, it's unclear atm, what the return type of
`defaultBranch()` is.


You did not comment on the idea to add a `KBranchedStream#get(int
index)
-> KStream` method to get the individually branched-KStreams. Would
be
nice to get your feedback about it. It seems you suggest that users
would need to write custom utility code otherwise, to access them.
We
should discuss the pros and cons of both approaches. It feels
"incomplete" to me atm, if the API has no built-in support to get
the
branched-KStreams directly.



-Matthias


On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
Hi all!

I have updated the KIP-418 according to the new vision.

Matthias, thanks for your comment!

Renaming KStream#branch() -> #split()
I can see your point: this is to make the name similar to
String#split
that also returns an array, right? But is it worth the loss of
backwards
compatibility? We can have overloaded branch() as well without
affecting
the existing code. Maybe the old array-based `branch` method
should
be
deprecated, but this is a subject for discussion.

Renaming KBranchedStream#addBranch() ->
BranchingKStream#branch(),
KBranchedStream#defaultBranch() -> BranchingKStream#default()

Totally agree with 'addBranch->branch' rename. 'default' is,
however, a
reserved word, so unfortunately we cannot have a method with such
name
:-)
defaultBranch() does take an `Predicate` as argument, but I
think
that
is not required?

Absolutely! I think that was just copy-paste error or something.

Dear colleagues,

please revise the new version of the KIP and Paul's PR
(https://github.com/apache/kafka/pull/6512)

Any new suggestions/objections?

Regards,

Ivan


11.04.2019 11:47, Matthias J. Sax пишет:
Thanks for driving the discussion of this KIP. It seems that
everybody
agrees that the current branch() method using arrays is not
optimal.
I had a quick look into the PR and I like the overall proposal.
There
are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as
argument,
but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and
is
currently implemented:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different
scopes:
could we extend `KBranchedStream` with a `get(int index)` method
that
returns the corresponding "branched" result `KStream` object?
Maybe,
the
second argument of `addBranch()` should not be a
`Consumer<KStream>`
but
a `Function<KStream,KStream>` and `get()` could return whatever
the
`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias



On 3/31/19 12:22 PM, Paul Whalen wrote:
Ivan,

I'm a bit of a novice here as well, but I think it makes sense
for
you
to
revise the KIP and continue the discussion.  Obviously we'll
need
some
buy-in from committers that have actual binding votes on
whether
the
KIP
could be adopted.  It would be great to hear if they think this
is
a
good
idea overall.  I'm not sure if that happens just by starting a
vote,
or if
there is generally some indication of interest beforehand.

That being said, I'll continue the discussion a bit: assuming
we
do
move
forward the solution of "stream.branch() returns
KBranchedStream",
do
we
deprecate "stream.branch(...) returns KStream[]"?  I would
favor
deprecating, since having two mutually exclusive APIs that
accomplish
the
same thing is confusing, especially when they're fairly similar
anyway.  We
just need to be sure we're not making something
impossible/difficult
that
is currently possible/easy.

Regarding my PR - I think the general structure would work,
it's
just a
little sloppy overall in terms of naming and clarity. In
particular,
passing in the "predicates" and "children" lists which get
modified
in
KBranchedStream but read from all the way KStreamLazyBranch is
a
bit
complicated to follow.

Thanks,
Paul

On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
iponoma...@mail.ru
wrote:
Hi Paul!

I read your code carefully and now I am fully convinced: your
proposal
looks better and should work. We just have to document the
crucial
fact
that KStream consumers are invoked as they're added. And then
it's
all
going to be very nice.

What shall we do now? I should re-write the KIP and resume the
discussion here, right?

Why are you telling that your PR 'should not be even a
starting
point
if
we go in this direction'? To me it looks like a good starting
point.
But
as a novice in this project I might miss some important
details.
Regards,

Ivan


28.03.2019 17:38, Paul Whalen пишет:
Ivan,

Maybe I’m missing the point, but I believe the
stream.branch()
solution
supports this. The couponIssuer::set* consumers will be
invoked
as
they’re
added, not during streamsBuilder.build(). So the user still
ought
to
be
able to call couponIssuer.coupons() afterward and depend on
the
branched
streams having been set.
The issue I mean to point out is that it is hard to access
the
branched
streams in the same scope as the original stream (that is, not
inside
the
couponIssuer), which is a problem with both proposed
solutions.
It
can be
worked around though.
[Also, great to hear additional interest in 401, I’m excited
to
hear
your thoughts!]
Paul

On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
iponoma...@mail.ru
wrote:
Hi Paul!

The idea to postpone the wiring of branches to the
streamsBuilder.build() also looked great for me at first
glance,
but
---
the newly branched streams are not available in the same
scope
as
each
other.  That is, if we wanted to merge them back together
again
I
don't see
a way to do that.
You just took the words right out of my mouth, I was just
going
to
write in details about this issue.
Consider the example from Bill's book, p. 101: say we need
to
identify
customers who have bought coffee and made a purchase in the
electronics
store to give them coupons.
This is the code I usually write under these circumstances
using
my
'brancher' class:
@Setter
class CouponIssuer{
   private KStream<....> coffePurchases;
   private KStream<....> electronicsPurchases;

   KStream<...> coupons(){
       return
coffePurchases.join(electronicsPurchases...)...whatever
       /*In the real world the code here can be complex, so
creation of
a separate CouponIssuer class is fully justified, in order to
separate
classes' responsibilities.*/
  }
}

CouponIssuer couponIssuer = new CouponIssuer();

new KafkaStreamsBrancher<....>()
     .branch(predicate1, couponIssuer::setCoffePurchases)
     .branch(predicate2,
couponIssuer::setElectronicsPurchases)
     .onTopOf(transactionStream);

/*Alas, this won't work if we're going to wire up everything
later,
without the terminal operation!!!*/
couponIssuer.coupons()...

Does this make sense?  In order to properly initialize the
CouponIssuer
we need the terminal operation to be called before
streamsBuilder.build()
is called.
[BTW Paul, I just found out that your KIP-401 is essentially
the
next
KIP I was going to write here. I have some thoughts based on
my
experience,
so I will join the discussion on KIP-401 soon.]
Regards,

Ivan

28.03.2019 6:29, Paul Whalen пишет:
Ivan,
I tried to make a very rough proof of concept of a fluent
API
based
off of
KStream here (https://github.com/apache/kafka/pull/6512),
and
I
think
I
succeeded at removing both cons.
    - Compatibility: I was incorrect earlier about
compatibility
issues,
    there aren't any direct ones.  I was unaware that Java
is
smart
enough to
    distinguish between a branch(varargs...) returning one
thing
and
branch()
    with no arguments returning another thing.
    - Requiring a terminal method: We don't actually need
it.
We
can
just
    build up the branches in the KBranchedStream who shares
its
state
with the
    ProcessorSupplier that will actually do the branching.
It's
not
terribly
    pretty in its current form, but I think it demonstrates
its
feasibility.
To be clear, I don't think that pull request should be
final
or
even a
starting point if we go in this direction, I just wanted to
see
how
challenging it would be to get the API working.
I will say though, that I'm not sure the existing solution
could
be
deprecated in favor of this, which I had originally
suggested
was a
possibility.  The reason is that the newly branched streams
are
not
available in the same scope as each other.  That is, if we
wanted
to
merge
them back together again I don't see a way to do that.  The
KIP
proposal
has the same issue, though - all this means is that for
either
solution,
deprecating the existing branch(...) is not on the table.
Thanks,
Paul
On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
iponoma...@mail.ru>
wrote:
OK, let me summarize what we have discussed up to this
point.
First, it seems that it's commonly agreed that branch API
needs
improvement. Motivation is given in the KIP.

There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
    .branch(predicate1, ks ->..)
    .branch(predicate2, ks->..)
    .defaultBranch(ks->..) //optional
    .onTopOf(stream).mapValues(...).... //onTopOf returns
its
argument
PROS: 1) Fully backwards compatible. 2) The code won't
make
sense
until
all the necessary ingredients are provided.

CONS: The need to create a KafkaStreamsBrancher instance
contrasts the
fluency of other KStream methods.

2. (as Paul proposes)

stream
    .branch(predicate1, ks ->...)
    .branch(predicate2, ks->...)
    .defaultBranch(ks->...) //or noDefault(). Both
defaultBranch(..)
and
noDefault() return void

PROS: Generally follows the way KStreams interface is
defined.
CONS: We need to define two terminal methods
(defaultBranch(ks->)
and
noDefault()). And for a user it is very easy to miss the
fact
that one
of the terminal methods should be called. If these methods
are
not
called, we can throw an exception in runtime.

Colleagues, what are your thoughts? Can we do better?

Regards,

Ivan

27.03.2019 18:46, Ivan Ponomarev пишет:
25.03.2019 17:43, Ivan Ponomarev пишет:
Paul,

I see your point when you are talking about
stream..branch..branch...default..

Still, I believe that this cannot not be implemented the
easy
way.
Maybe we all should think further.

Let me comment on two of your ideas.

user could specify a terminal method that assumes
nothing
will
reach
the default branch,
throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option
besides
`default`, because there are scenarios when we want to
just
silently
drop the messages that didn't match any predicate. 2)
Throwing
an
exception in the middle of data flow processing looks
like a
bad
idea.
In stream processing paradigm, I would prefer to emit a
special
message to a dedicated stream. This is exactly where
`default`
can
be
used.

it would be fairly easily for the
InternalTopologyBuilder
to
track
dangling
branches that haven't been terminated and raise a clear
error
before it
becomes an issue.

You mean a runtime exception, when the program is
compiled
and
run?
Well,  I'd prefer an API that simply won't compile if
used
incorrectly. Can we build such an API as a method chain
starting
from
KStream object? There is a huge cost difference between
runtime
and
compile-time errors. Even if a failure uncovers
instantly
on
unit
tests, it costs more for the project than a compilation
failure.
Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:
Ivan,

Good point about the terminal operation being required.
But
is
that
really
such a bad thing?  If the user doesn't want a
defaultBranch
they
can
call
some other terminal method (noDefaultBranch()?) just as
easily.  In
fact I
think it creates an opportunity for a nicer API - a
user
could
specify
a
terminal method that assumes nothing will reach the
default
branch,
throwing an exception if such a case occurs.  That
seems
like
an
improvement over the current branch() API, which allows
for
the
more
subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be
well
documented, but
it would be fairly easily for the
InternalTopologyBuilder
to
track
dangling
branches that haven't been terminated and raise a clear
error
before it
becomes an issue.  Especially now that there is a
"build
step"
where
the
topology is actually wired up, when
StreamsBuilder.build()
is
called.
Regarding onTopOf() returning its argument, I agree
that
it's
critical to
allow users to do other operations on the input stream.
With
the
fluent
solution, it ought to work the same way all other
operations
do -
if
you
want to process off the original KStream multiple
times,
you
just
need the
stream as a variable so you can call as many operations
on
it
as
you
desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
iponoma...@mail.ru
wrote:

Hello Paul,

I afraid this won't work because we do not always need
the
defaultBranch. And without a terminal operation we
don't
know
when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we
can
do
something
more with the original branch after branching.

I understand your point that the need of special
object
construction
contrasts the fluency of most KStream methods. But
here
we
have a
special case: we build the switch to split the flow,
so
I
think
this
is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:
Ivan,

I think it's a great idea to improve this API, but I
find
the
onTopOff()
mechanism a little confusing since it contrasts the
fluency
of
other
KStream method calls.  Ideally I'd like to just call
a
method on
the
stream
so it still reads top to bottom if the branch cases
are
defined
fluently.
I think the addBranch(predicate, handleCase) is very
nice
and the
right
way
to do things, but what if we flipped around how we
specify
the
source
stream.

Like:

stream.branch()
           .addBranch(predicate1, this::handle1)
           .addBranch(predicate2, this::handle2)
           .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or
KStreamBrancher
or
something,
which is added to by addBranch() and terminated by
defaultBranch()
(which
returns void).  This is obviously incompatible with
the
current
API, so
the
new stream.branch() would have to have a different
name,
but
that
seems
like a fairly small problem - we could call it
something
like
branched()
or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It
seems
like it
does to
me, allowing for clear in-line branching while also
allowing
you
to
dynamically build of branches off of KBranchedStreams
if
desired.
Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
<iponoma...@mail.ru.invalid>
wrote:

Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream<String, String> ks){
           ks.filter(....).mapValues(...)
}


void handleSecondCase(KStream<String, String> ks){
           ks.selectKey(...).groupByKey()...
}

......
new KafkaStreamsBrancher<String, String>()
      .addBranch(predicate1, this::handleFirstCase)
      .addBranch(predicate2, this::handleSecondCase)
      .onTopOf(....)

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:
Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher
takes a
Consumer
as a
second
argument which returns nothing, and the example in
the
KIP
shows
each
stream from the branch using a terminal node
(KafkaStreams#to()
in this
case).

Maybe I've missed something, but how would we
handle
the
case
where the
user has created a branch but wants to continue
processing
and
not
necessarily use a terminal node on the branched
stream
immediately?
For example, using today's logic as is if we had
something
like
this:

KStream<String, String>[] branches =
originalStream.branch(predicate1,
predicate2);
branches[0].filter(....).mapValues(...)..
branches[1].selectKey(...).groupByKey().....


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
bbej...@gmail.com
wrote:

All,

I'd like to jump-start the discussion for KIP-
418.
Here's the original message:

Hello,

I'd like to start a discussion about KIP-418.
Please
take
a
look
at
the
KIP if you can, I would appreciate any feedback :)

KIP-418:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
JIRA KAFKA-5488:
https://issues.apache.org/jira/browse/KAFKA-5488
PR#6164:
https://github.com/apache/kafka/pull/6164
Regards,

Ivan Ponomarev




Reply via email to