Hello John,

1.
---------------------------------------------

> Perhaps it would be better to stick with "as" for now
> and just file a Jira to switch them all at the same time [for compatibility with Kotlin]

Fully agree! BTW it's really not a big problem: in Kotlin they have a standard workaround (https://kotlinlang.org/docs/reference/java-interop.html#escaping-for-java-identifiers-that-are-keywords-in-kotlin). So actually this should be a very low priority issue, if an issue at all.

> I don't understand how your new proposed
> methods would work any differently than the ones you already
> had proposed in the KIP. It seems like you'd still have to provide
> the generic type parameters on the first static factory call. Can you
> explain how your new interface proposal differs from the existing KIP?

In the KIP, I didn't clarify what methods should be static. Now I propose the following methods:

non-static: withChain(Function), withName(String).

static: as(String), with(Function), with(Function, String).

The overloaded `with` version that provides both Function and name can be used without causing type inference problem!!

2.
----------------------------

> Regarding making the K,V types covariant also, yes, that would indeed
> be nice, but I'm not sure it will actually work.

What I'm keeping in mind is the following example: imagine

static KStream<String, Integer> func(KStream<String, Number> s) {
        return s.mapValues(n -> (Integer) n + 1);
}

BranchedKStream<String, Number> b =
    s.split().branch((k, v) -> isInteger(v),
               //Won't compile!!
               Branched.with(Me::func));

The simple workaround here is to change `func`'s return type from KStream<...Integer> to KStream<...Number>.

[On the other hand, we already agreed to remove `withJavaConsumer` from `Branched`, so during code migration I will have to modify my functions' return types anyway -- I mean, from `void` to `KStream`!! ]

> the map you're returning is Map<K,V>, and of course a K is not the same as "? extends K", so it doesn't seem compatible.

I think what you actually meant here is that KStream<? extends K, ? extends V> is not fit as a value for Map<String, KStream<K, V>>. This particularly is not a problem, since KStream<? extends K, ? extends V> can be safely explicitly cast to KStream<K, V>, and be put to the map.

BUT, I do really afraid of pitfalls of nested wildcard types. So maybe for now it's better to just admit that API is not absolutely perfect and accept it as is, that is

Function<? super KStream<K, V>, ? extends KStream<K, V>>

Regards,

Ivan


21.05.2020 17:59, John Roesler пишет:
Hello Ivan,

Thanks for the refinement. Actually, I did not know that "as" would
clash with a Kotlin operator. Maybe we should depart from convention
and just avoid methods named "as" in the future.

The convention is that "as(String name)" is used for the static factory
method, whereas "withName(String name)" is an instance method
inherited from NamedOperation. If you wish to propose to avoid "as"
for compatibility with Kotlin, I might suggest "fromName(String name)",
although it's somewhat dubious, since all the other configuration
classes use "as". Perhaps it would be better to stick with "as" for now
and just file a Jira to switch them all at the same time.

Re. 3:
Regarding the type inference problem, yes, it's a blemish on all of our
configuraion objects. The problem is that Java infers the type
based on the _first_ method in the chain. While it does consider what
the recipient of the method result wants, it only considers the _next_
recipient.

Thus, if you call as("foo") and immediately assign it to a
Branched<String,String> variable, java infers the type correctly. But
when the "next recipient" is a chained method call, like "withChain",
then the chained method doesn't bound the type (by definition,
withChain is defined on Branched<Object, Object>, so Java will take
the broadest possible inferece and bind the type to
Branched<Object, Object>, at which point, it can't be revised anymore.

As a user of Java, this is exceedingly annoying, since it doesn't seem
that hard to recursively consider the entire context when inferring the
generic type parameters, but this is what we have to work with.

To be honest, though, I don't understand how your new proposed
methods would work any differently than the ones you already
had proposed in the KIP. It seems like you'd still have to provide
the generic type parameters on the first static factory call. Can you
explain how your new interface proposal differs from the existing KIP?

Re. 4:
Regarding making the K,V types covariant also, yes, that would indeed
be nice, but I'm not sure it will actually work. You might want to give it a
try. In the past, we've run into soe truly strange interactions between the
Java type inferencer and lambdas (and/or anonymous inner classes) in
combination with nested covariant types.

Another issue is that the value type of the map you're returning is
Map<K,V>, and of course a K is not the same as "? extends K", so it
doesn't seem compatible.

Thanks again,
-John

On Thu, May 21, 2020, at 04:20, Ivan Ponomarev wrote:
Hi,

Thanks Matthias for your suggestion: yes, I agree that getting rid of
`with[Java]Consumer` makes this thing 'as simple as possible, but not
simpler'.

I made some quick API mocking in my IDE and tried to implement examples
from KIP.

1. Having to return something from lambda is not a very big deal.

2. For a moment I thouht that I won't be able to use method references
for already written stream consumers, but then I realized that I can
just change my methods from returning void to returning the input
parameter and use references to them. Not very convenient, but passable.

So, I'm ready to agree: 1) we use only functions, no consumer 2) when
function returns null, we don't insert it into the resulting map.

Usually it's better to implement a non-perfect, but workable solution as
a first approximation. And later we can always add to `Branched`
anything we want.

3. Do we have any guidelines on how parameter classes like Branched
should be built? First of all, it seems that `as` now is more preferred
than `withName` (although as you probably know it clashes with Kotlin's
`as` operator).

Then, while trying to mock the APIs, I found out that my Java cannot
infer types in the following construction:

.branch((key, value) -> value == null,
     Branched.as("foo").withChain(s -> s.mapValues(...)))


so I have to write

.branch((key, value) -> value == null,
     Branched.<String, String>as("foo").withChain(s -> s.mapValues(...)))


This is not tolerable IMO, so this is the list of `Branched` methods
that I came to (will you please validate it):

static <K, V> Branched<K, V> as(String name);

static <K, V> Branched<K, V> with(Function<? super KStream<K, V>, ?
extends KStream<K, V>> chain);

static <K, V> Branched<K, V> with(Function<? super KStream<K, V>, ?
extends KStream<K, V>> chain, String name);

//non-static!
Branched<K, V> withChain(Function<? super KStream<K, V>, ? extends
KStream<K, V>> chain);


4. And one more. What do you think, do we need that flexibility:

Function<? super KStream<K, V>, ? extends KStream<K, V>> chain

vs.

Function<? super KStream<? super K, ? super V>, ? extends KStream<?
extends K, ? extends V>> chain

??

Regards,

Ivan


21.05.2020 6:54, John Roesler пишет:
Thanks for this thought, Matthias,

Your idea has a few aspects I find attractive:
1. There’s no ambiguity at all about what will be in the map, because there’s 
only one thing that could be there, which is whatever is returned from the 
chain function.
2. We keep the API smaller. Thanks to the extensible way this KIP is designed, 
it would be trivially easy to add the “terminal” chain later. As you say, fewer 
concepts leads to an API that is easier to learn.
3. We get to side-step the naming of this method. Although I didn’t complain 
about withJavaConsumer, it was only because I couldn’t think of a better name. 
Still, it’s somewhat unsatisfying to name a method after its argument type, 
since this provides no information at all about what the method does. I was 
willing to accept it because I didn’t have an alternative, but I would be happy 
to skip this method for now to avoid the problem until we have more inspiration.

The only con I see is that it makes the code a little less ergonomic to write 
when you don’t want to return the result of the chain (such as when the chain 
is terminal), since I’m your example, you have to declare a block with a return 
statement at the end. It’s not ideal, but it doesn’t seem too bad to me.

Lastly, on the null question, I’d be fine with allowing a null result, which 
would just remove the branch from the returned map. It seems nicer than forcing 
people to pick a stream to return when their chain is terminal and they don’t 
want to use the result later.

Thanks again for sharing the idea,
John

On Wed, May 20, 2020, at 18:17, Matthias J. Sax wrote:
Thanks for updating the KIP!

I guess the only open question is about `Branched.withJavaConsumer` and
its relationship to the returned `Map`.

Originally, we discussed two main patterns:

   (1) split a stream and return the substreams for futher processing
   (2) split a stream and modify the substreams with in-place method chaining

To combine both patterns we wanted to allow for

    -> split a stream, modify the substreams, and return the _modified_
substreams for further processing

But is it also an issue? With Kafka Streams, we can split the topology graph at 
any point. Technically, it's OK to do both: feed the KStream to a 
[Java]Consumer AND save it in resulting Map. If one doesn't need the stream in 
the Map, one simply does not extract it from there

That is of course possible. However, it introduces some "hidded" semantics:

   - using `withChain` I get the modified sub-stream
   - using `withJavaConsumer` I get the unmodifed sub-stream

This seems to be quite subtle to me.



  From my understanding the original idea of `withJavaConsumer` was to
model a terminal operation, ie, it should be similar to:

Branched.withChain(s -> {
    s.to();
    return null;
})

However, I am not sure if we should even allow `withChain()` to return
`null`? IMHO, we should throw an exception for this case to avoid a `key
-> null` entry in the returned Map.

Following this train of through, and if we want to allow the "return
null" pattern in general, we need `withJavaConsumer` that does not add
an entry to the Map.

Following your proposal, the semantics of `withJavaConsumer` could also
be achieved with `withChain`:

Branched.withChain(s -> {
    s.to();
    return s;
})

Thus, for you proposal `withJavaConumer` is purely syntactic sugar,
while for the first proposal it adds new functionality (if `return null`
is not allowed, using `withChain()` is not possible to "hide a
sub-stream in the result). Furthermore, we might need to allow `return
null` in your prosal to allow uses to "hide" a sub-stream in the Map.



I guess I can be convinced either way. However, if we follow your
proposal, I am wondering if we need `withJavaConsumer` at all? Its
benefit seems to be small? Also, having a reduced API is usually
preferable as it's simpler to learn.



-Matthias




On 5/15/20 3:12 PM, Ivan Ponomarev wrote:
Hello, John, hello Matthias!

Thank you very much for your detailed feedback!

-----------------------------------------

John,

It looks like you missed my reply on Apr 23rd.

For some unknown reason it didn't reach my inbox, fortunately we have
all the emails on the web.

1. Can you propose to deprecate (but not remove) the existing ‘branch’
method?

Done, in "Compatibility, Deprecation, and Migration Plan" section.

2. [Explain why 'branch' operator is superior to branching directly
off of the parent KStream for the needs of dynamic branching]

Done, see an ugly counterexample in 'Dynamic Branching' section.

3. [`withConsumer` is confusing with Kafka Consumer... maybe `withSink`?]

As Mathhias noted, `withSink` can also be confusing. I renamed this
method to `withJavaConsumer` per Matthias' suggestion.

4. ...It seems like there are two disjoint use cases: EITHER using
chain and the result map OR using just the sink

This is discussed below.

----------------------------------------------

Mathhias,

1. [We should rename `KBranchedStream` -> `BranchedKStream`]

Done.

2. [Ambiguous phrase about 'parameterless' version of the `branch`
method]

Fixed.


3. Overview of newly added methods/interfaces

Done in `Proposed Changes` section.


4. [Concerning John's note] > I don't think that using both
`withChain()` and `withConsumer()` is the
issue, as the KIP clearly states that the result of `withChain()` will
be given to the `Consumer`.

Yes, I agree!

The issue is really with the `Consumer` and the returned `Map` of
`defautBranch()` and `noDefaultBranch()`. Maybe a reasonable
implementation would be to not add the "branch" to the result map if
`withConsumer` is used?

But is it also an issue? With Kafka Streams, we can split the topology
graph at any point. Technically, it's OK to do both: feed the KStream to
a [Java]Consumer AND save it in resulting Map. If one doesn't need the
stream in the Map, one simply does not extract it from there :-)

In the current version of KIP it is assumed that the returned map
contains ALL the branches, either tagged with IDs explicitly set by the
programmer, or with some default auto-generated ids. Dealing with this
map is the user's responsibility.

What seems to me to be an issue is introducing exclusions to this
general rule, like 'swallowing' some streams by provided
[Java]Consumers. This can make things complicated. What if a user
provides both the name of the branch and a [Java]Consumer? What do they
mean in this case? Should we 'swallow' the stream or save it to the map?
There's no point in 'saving the space' in this map, so maybe just leave
it as it is?

----

I rewrote the KIP and also fixed a couple of typos.

Looking forward for your feedback again!

Regards,

Ivan.



08.05.2020 22:55, Matthias J. Sax пишет:
Thanks for updating the KIP!

I also have some minor comment:



(1) We should rename `KBranchedStream` -> `BranchedKStream`

(Most classed follow this naming pattern now, eg, CoGroupedKStream,
TimeWindowedKStream etc -- there is just the "legacy" `KGroupedStream`
and `KGroupedKTable` that we cannot rename without a breaking change...
so we just keep them.)



(2) Quote:

Both branch and defaultBranch operations also have overloaded
parameterless alternatives.

I think `branch()` always needs to take a `Predicate` and assume you
meant that `Branched` is optional. Can you maybe rephrase it accordingly
as `branch()` would not be "parameterless".



(3) Can you maybe add an overview in the "Public Interface" section) of
newly added and deprecated methods/classes (cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup#KIP-150-Kafka-StreamsCogroup-PublicInterfaces)




(4) What is unclear from the KIP is the interaction of `withConsumer()`
and the finally returned `Map<String, KStream>`. This related to John's
4th comment:

It seems like there are really two disjoint use cases: EITHER using
chain and the result map OR using just the sink.

I don't think that using both `withChain()` and `withConsumer()` is the
issue though, as the KIP clearly states that the result of `withChain()`
will be given to the `Consumer`. The issue is really with the `Consumer`
and the returned `Map` of `defautBranch()` and `noDefaultBranch()`.

Maybe a reasonable implementation would be to not add the "branch" to
the result map if `withConsumer` is used? As long as we clearly document
it in the JavaDocs, this might be fine?



(5) Reply to John's comments:

3. Reading the KIP, I found ‘withConsumer’ confusing; I thought you
were talking about the kafka Consumer interface (which doesn’t make
sense, of course). I get that you were referring to the java Consumer
interface, but we should still probably to to avoid the ambiguity.
Just throwing out a suggestion, how about ‘withSink’?

IMHO, `withSink` has the issue that it might be confused with a "sink
node", ie., writing the KStream to a topic.

Maybe `withJavaConsumer` would make it less ambiguous?




-Matthias




On 5/8/20 7:13 AM, John Roesler wrote:
Hi Ivan,

It looks like you missed my reply on Apr 23rd. I think it’s close,
but I had a few last comments.

Thanks,
John

On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote:
Hello everyone,

will someone please take a look at the reworked KIP?

I believe that now it follows design principles and takes into account
all the arguments discussed here.


Regards,

Ivan


23.04.2020 2:45, Ivan Ponomarev пишет:
Hi,

I have read the John's "DSL design principles" and have completely
rewritten the KIP, see
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream




This version includes all the previous discussion results and follows
the design principles, with one exception.

The exception is

branch(Predicate<K,V> predicate, Branched<K,V> branched)

which formally violates 'no more than one parameter' rule, but I think
here it is justified.

We must provide a predicate for a branch and don't need to provide one
for the default branch. Thus for both operations we may use a single
Branched parameter class, with an extra method parameter for `branch`.

Since predicate is a natural, necessary part of a branch, no
'proliferation of overloads, deprecations, etc.' is expected here
as it
is said in the rationale for the 'single parameter rule'.

WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:
Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a
`Named`
config object to name operators. It seems reasonable to me to
build on
this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar


-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in
June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for
this
KIP) worked for me -- that's another reason why I gave up pushing
the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference<KStream<...>> result = new AtomicReference<>();
new KafkaStreamBrancher<....>()
        .branch(....)
        .defaultBranch(result::set)
        .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post
from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer<KStream>)
      -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
      -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer<KStream>)
      -> Map<String,KStream>
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function<KStream,KStream>, String)
      -> Map<String,KStream>
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function<KStream,KStream>)
      -> Map<String,KStream>
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
      -> Map<String,KStream>

I believe this would satisfy everyone. Optional names seems to be
a good
idea: when you don't need to have the branches in the same scope,
you
just don't use names and you don't risk making your code brittle.
Or,
you might want to add names just for debugging purposes. Or,
finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:
I am moving this KIP into "inactive status". Feel free to resume
the
KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to
do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:
Ivan,

did you see my last reply? What do you think about my proposal
to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:
Thanks for the input John!

under your suggestion, it seems that the name is required

If you want to get the `KStream` as part of the `Map` back
using a
`Function`, yes. If you follow the "embedded chaining" pattern
using a
`Consumer`, no.

Allowing for a default name via `split()` can of course be done.
Similarly, using `Named` instead of `String` is possible.

I wanted to sketch out a high level proposal to merge both
patterns
only. Your suggestions to align the new API with the existing API
make
totally sense.



One follow up question: Would `Named` be optional or required in
`split()` and `branch()`? It's unclear from your example.

If both are mandatory, what do we gain by it? The returned
`Map` only
contains the corresponding branches, so why should we prefix
all of
them? If only `Named` is mandatory in `branch()`, but optional in
`split()`, the same question raises?

Requiring `Named` in `split()` seems only to make sense, if
`Named` is
optional in `branch()` and we generate `-X` suffix using a
counter
for
different branch name. However, this might lead to the problem of
changing names if branches are added/removed. Also, how would the
names
be generated if `Consumer` is mixed in (ie, not all branches are
returned in the `Map`).

If `Named` is optional for both, it could happen that a user
misses to
specify a name for a branch what would lead to runtime issues.


Hence, I am actually in favor to not allow a default name but
keep
`split()` without parameter and make `Named` in `branch()`
required
if a
`Function` is used. This makes it explicit to the user that
specifying a
name is required if a `Function` is used.



About

KBranchedStream#branch(BranchConfig)

I don't think that the branching predicate is a configuration and
hence
would not include it in a configuration object.

         withChain(...);

Similar, `withChain()` (that would only take a `Consumer`?)
does not
seem to be a configuration. We can also not prevent a user to
call
`withName()` in combination of `withChain()` what does not
make sense
IMHO. We could of course throw an RTE but not have a compile time
check
seems less appealing. Also, it could happen that neither
`withChain()`
not `withName()` is called and the branch is missing in the
returned
`Map` what lead to runtime issues, too.

Hence, I don't think that we should add `BranchConfig`. A config
object
is helpful if each configuration can be set independently of all
others,
but this seems not to be the case here. If we add new
configuration
later, we can also just move forward by deprecating the
methods that
accept `Named` and add new methods that accepted
`BranchConfig` (that
would of course implement `Named`).


Thoughts?


@Ivan, what do you think about the general idea to blend the
two main
approaches of returning a `Map` plus an "embedded chaining"?



-Matthias



On 6/4/19 10:33 AM, John Roesler wrote:
Thanks for the idea, Matthias, it does seem like this would
satisfy
everyone. Returning the map from the terminal operations also
solves
the problem of merging/joining the branched streams, if we want
to add
support for the compliment later on.

Under your suggestion, it seems that the name is required.
Otherwise,
we wouldn't have keys for the map to return. I this this is
actually
not too bad, since experience has taught us that, although
names for
operations are not required to define stream processing
logic, it
does
significantly improve the operational experience when you can
map
the
topology, logs, metrics, etc. back to the source code. Since you
wouldn't (have to) reference the name to chain extra
processing onto
the branch (thanks to the second argument), you can avoid the
"unchecked name" problem that Ivan pointed out.

In the current implementation of Branch, you can name the branch
operator itself, and then all the branches get index-suffixed
names
built from the branch operator name. I guess under this
proposal, we
could naturally append the branch name to the branching operator
name,
like this:

        stream.split(Named.withName("mysplit")) //creates node
"mysplit"
                   .branch(..., ..., "abranch") // creates node
"mysplit-abranch"
                   .defaultBranch(...) // creates node
"mysplit-default"

It does make me wonder about the DSL syntax itself, though.

We don't have a defined grammar, so there's plenty of room to
debate
the "best" syntax in the context of each operation, but in
general,
the KStream DSL operators follow this pattern:

         operator(function, config_object?) OR
operator(config_object)

where config_object is often just Named in the "function"
variant.
Even when the config_object isn't a Named, but some other config
class, that config class _always_ implements NamedOperation.

Here, we're introducing a totally different pattern:

       operator(function, function, string)

where the string is the name.
My first question is whether the name should instead be
specified
with
the NamedOperation interface.

My second question is whether we should just roll all these
arguments
up into a config object like:

        KBranchedStream#branch(BranchConfig)

        interface BranchConfig extends NamedOperation {
         withPredicate(...);
         withChain(...);
         withName(...);
       }

Although I guess we'd like to call BranchConfig something
more like
"Branched", even if I don't particularly like that pattern.

This makes the source code a little noisier, but it also
makes us
more
future-proof, as we can deal with a wide range of alternatives
purely
in the config interface, and never have to deal with adding
overloads
to the KBranchedStream if/when we decide we want the name to be
optional, or the KStream->KStream to be optional.

WDYT?

Thanks,
-John

On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
<michael.droga...@confluent.io> wrote:

Matthias: I think that's pretty reasonable from my point of
view.
Good
suggestion.

On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax
<matth...@confluent.io>
wrote:

Interesting discussion.

I am wondering, if we cannot unify the advantage of both
approaches:



KStream#split() -> KBranchedStream

// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer<KStream>)
       -> KBranchedStream

// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function<KStream,KStream>,
String)
       -> KBranchedStream

// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer<KStream>)
       -> Map<String,KStream>

// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function<KStream,KStream>, String)
       -> Map<String,KStream>

// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function<KStream,KStream>)
       -> Map<String,KStream>

// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
       -> Map<String,KStream>



Hence, for each sub-stream, the user can pick to add a name
and
return
the branch "result" to the calling scope or not. The
implementation can
also check at runtime that all returned names are unique. The
returned
Map can be empty and it's also optional to use the Map.

To me, it seems like a good way to get best of both worlds.

Thoughts?



-Matthias




On 5/6/19 5:15 PM, John Roesler wrote:
Ivan,

That's a very good point about the "start" operator in the
dynamic case.
I had no problem with "split()"; I was just questioning the
necessity.
Since you've provided a proof of necessity, I'm in favor
of the
"split()" start operator. Thanks!

Separately, I'm interested to see where the present
discussion
leads.
I've written enough Javascript code in my life to be
suspicious of
nested closures. You have a good point about using method
references (or
indeed function literals also work). It should be validating
that this
was also the JS community's first approach to flattening the
logic when
their nested closure situation got out of hand.
Unfortunately,
it's
replacing nesting with redirection, both of which disrupt
code
readability (but in different ways for different reasons). In
other
words, I agree that function references is *the* first-order
solution if
the nested code does indeed become a problem.

However, the history of JS also tells us that function
references aren't
the end of the story either, and you can see that by
observing
that
there have been two follow-on eras, as they continue
trying to
cope with
the consequences of living in such a callback-heavy language.
First, you
have Futures/Promises, which essentially let you convert
nested
code to
method-chained code (Observables/FP is a popular variation on
this).
Most lately, you have async/await, which is an effort to
apply
language
(not just API) syntax to the problem, and offer the
"flattest"
possible
programming style to solve the problem (because you get
back to
just one
code block per functional unit).

Stream-processing is a different domain, and Java+KStreams is
nowhere
near as callback heavy as JS, so I don't think we have to
take
the JS
story for granted, but then again, I think we can derive some
valuable
lessons by looking sideways to adjacent domains. I'm just
bringing this
up to inspire further/deeper discussion. At the same time,
just
like JS,
we can afford to take an iterative approach to the problem.

Separately again, I'm interested in the post-branch merge
(and
I'd also
add join) problem that Paul brought up. We can clearly
punt on
it, by
terminating the nested branches with sink operators. But is
there a DSL
way to do it?

Thanks again for your driving this,
-John

On Thu, May 2, 2019 at 7:39 PM Paul Whalen
<pgwha...@gmail.com
<mailto:pgwha...@gmail.com>> wrote:

         Ivan, I’ll definitely forfeit my point on the
clumsiness of
the
         branch(predicate, consumer) solution, I don’t see
any real
drawbacks
         for the dynamic case.

         IMO the one trade off to consider at this point is the
scope
         question. I don’t know if I totally agree that “we
rarely
need them
         in the same scope” since merging the branches back
together
later
         seems like a perfectly plausible use case that can
be a lot
nicer
         when the branched streams are in the same scope.
That being
said,
         for the reasons Ivan listed, I think it is overall the
better
         solution - working around the scope thing is easy
enough if
you need
         to.

         > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
         <iponoma...@mail.ru.invalid> wrote:
         >
         > Hello everyone, thank you all for joining the
discussion!
         >
         > Well, I don't think the idea of named branches,
be it a
         LinkedHashMap (no other Map will do, because order of
definition
         matters) or `branch` method  taking name and Consumer
has more
         advantages than drawbacks.
         >
         > In my opinion, the only real positive outcome from
Michael's
         proposal is that all the returned branches are in
the same
scope.
         But 1) we rarely need them in the same scope 2)
there is a
         workaround for the scope problem, described in the
KIP.
         >
         > 'Inlining the complex logic' is not a problem,
because we
can use
         method references instead of lambdas. In real world
scenarios you
         tend to split the complex logic to methods anyway,
so the
code is
         going to be clean.
         >
         > The drawbacks are strong. The cohesion between
predicates
and
         handlers is lost. We have to define predicates in one
place, and
         handlers in another. This opens the door for bugs:
         >
         > - what if we forget to define a handler for a
name? or a
name for
         a handler?
         > - what if we misspell a name?
         > - what if we copy-paste and duplicate a name?
         >
         > What Michael propose would have been totally OK
if we had
been
         writing the API in Lua, Ruby or Python. In those
languages the
         "dynamic naming" approach would have looked most
concise
and
         beautiful. But in Java we expect all the problems
related to
         identifiers to be eliminated in compile time.
         >
         > Do we have to invent duck-typing for the Java API?
         >
         > And if we do, what advantage are we supposed to get
besides having
         all the branches in the same scope? Michael, maybe I'm
missing your
         point?
         >
         > ---
         >
         > Earlier in this discussion John Roesler also
proposed
to do
         without "start branching" operator, and later Paul
mentioned that in
         the case when we have to add a dynamic number of
branches, the
         current KIP is 'clumsier' compared to Michael's 'Map'
solution. Let
         me address both comments here.
         >
         > 1) "Start branching" operator (I think that
*split* is a
good name
         for it indeed) is critical when we need to do a
dynamic
branching,
         see example below.
         >
         > 2) No, dynamic branching in current KIP is not
clumsy at
all.
         Imagine a real-world scenario when you need one
branch per
enum
         value (say, RecordType). You can have something
like this:
         >
         > /*John:if we had to start with stream.branch(...)
here,
it would
         have been much messier.*/
         > KBranchedStream branched = stream.split();
         >
         > /*Not clumsy at all :-)*/
         > for (RecordType recordType : RecordType.values())
         >             branched = branched.branch((k, v) ->
v.getRecType() ==
         recordType,
         >                     recordType::processRecords);
         >
         > Regards,
         >
         > Ivan
         >
         >
         > 02.05.2019 14:40, Matthias J. Sax пишет:
         >> I also agree with Michael's observation about
the core
problem of
         >> current `branch()` implementation.
         >>
         >> However, I also don't like to pass in a clumsy Map
object. My
         thinking
         >> was more aligned with Paul's proposal to just
add a name
to each
         >> `branch()` statement and return a
`Map<String,KStream>`.
         >>
         >> It makes the code easier to read, and also make the
order of
         >> `Predicates` (that is essential) easier to grasp.
         >>
         >>>>>> Map<String, KStream<K, V>> branches =
stream.split()
         >>>>>>    .branch("branchOne", Predicate<K, V>)
         >>>>>>    .branch( "branchTwo", Predicate<K, V>)
         >>>>>>    .defaultBranch("defaultBranch");
         >> An open question is the case for which no
defaultBranch() should
be
         >> specified. Atm, `split()` and `branch()` would
return
         `BranchedKStream`
         >> and the call to `defaultBranch()` that returns the
`Map` is
mandatory
         >> (what is not the case atm). Or is this actually
not a
real
problem,
         >> because users can just ignore the branch
returned by
         `defaultBranch()`
         >> in the result `Map` ?
         >>
         >>
         >> About "inlining": So far, it seems to be a
matter of
personal
         >> preference. I can see arguments for both, but no
"killer
         argument" yet
         >> that clearly make the case for one or the other.
         >>
         >>
         >> -Matthias
         >>
         >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
         >>> Perhaps inlining is the wrong terminology. It
doesn’t
require
         that a lambda with the full downstream topology be
defined
inline -
         it can be a method reference as with Ivan’s original
suggestion.
         The advantage of putting the predicate and its
downstream
logic
         (Consumer) together in branch() is that they are
required
to be near
         to each other.
         >>>
         >>> Ultimately the downstream code has to live
somewhere,
and deep
         branch trees will be hard to read regardless.
         >>>
         >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
         <michael.droga...@confluent.io
         <mailto:michael.droga...@confluent.io>> wrote:
         >>>>
         >>>> I'm less enthusiastic about inlining the
branch logic
with its
         downstream
         >>>> functionality. Programs that have deep branch
trees
will
         quickly become
         >>>> harder to read as a single unit.
         >>>>
         >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
         <pgwha...@gmail.com <mailto:pgwha...@gmail.com>>
wrote:
         >>>>>
         >>>>> Also +1 on the issues/goals as Michael
outlined them,
I think
         that sets a
         >>>>> great framework for the discussion.
         >>>>>
         >>>>> Regarding the SortedMap solution, my
understanding is
that the
         current
         >>>>> proposal in the KIP is what is in my PR which
(pending naming
         decisions) is
         >>>>> roughly this:
         >>>>>
         >>>>> stream.split()
         >>>>>    .branch(Predicate<K, V>,
Consumer<KStream<K, V>>)
         >>>>>    .branch(Predicate<K, V>,
Consumer<KStream<K, V>>)
         >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
         >>>>>
         >>>>> Obviously some ordering is necessary, since
branching
as a
         construct
         >>>>> doesn't work without it, but this solution seems
like it
         provides as much
         >>>>> associativity as the SortedMap solution,
because each
branch()
         call
         >>>>> directly associates the "conditional" with
the "code
block."
         The value it
         >>>>> provides over the KIP solution is the
accessing of
streams in
         the same
         >>>>> scope.
         >>>>>
         >>>>> The KIP solution is less "dynamic" than the
SortedMap
solution
         in the sense
         >>>>> that it is slightly clumsier to add a dynamic
number of
         branches, but it is
         >>>>> certainly possible.  It seems to me like the API
should favor
         the "static"
         >>>>> case anyway, and should make it simple and
readable to
         fluently declare and
         >>>>> access your branches in-line.  It also makes it
impossible to
         ignore a
         >>>>> branch, and it is possible to build an (almost)
identical
         SortedMap
         >>>>> solution on top of it.
         >>>>>
         >>>>> I could also see a middle ground where
instead of
a raw
         SortedMap being
         >>>>> taken in, branch() takes a name and not a
Consumer.
Something
         like this:
         >>>>>
         >>>>> Map<String, KStream<K, V>> branches =
stream.split()
         >>>>>    .branch("branchOne", Predicate<K, V>)
         >>>>>    .branch( "branchTwo", Predicate<K, V>)
         >>>>>    .defaultBranch("defaultBranch",
Consumer<KStream<K, V>>);
         >>>>>
         >>>>> Pros for that solution:
         >>>>> - accessing branched KStreams in same scope
         >>>>> - no double brace initialization, hopefully
slightly
more
         readable than
         >>>>> SortedMap
         >>>>>
         >>>>> Cons
         >>>>> - downstream branch logic cannot be specified
inline
which
         makes it harder
         >>>>> to read top to bottom (like existing API and
SortedMap, but
         unlike the KIP)
         >>>>> - you can forget to "handle" one of the branched
streams (like
         existing
         >>>>> API and SortedMap, but unlike the KIP)
         >>>>>
         >>>>> (KBranchedStreams could even work *both* ways
but
perhaps
         that's overdoing
         >>>>> it).
         >>>>>
         >>>>> Overall I'm curious how important it is to be
able to
easily
         access the
         >>>>> branched KStream in the same scope as the
original.
It's
         possible that it
         >>>>> doesn't need to be handled directly by the
API, but
instead
         left up to the
         >>>>> user.  I'm sort of in the middle on it.
         >>>>>
         >>>>> Paul
         >>>>>
         >>>>>
         >>>>>
         >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie
Blee-Goldman
         <sop...@confluent.io <mailto:sop...@confluent.io>>
         >>>>> wrote:
         >>>>>
         >>>>>> I'd like to +1 what Michael said about the
issues
with the
         existing
         >>>>> branch
         >>>>>> method, I agree with what he's outlined and
I think
we should
         proceed by
         >>>>>> trying to alleviate these problems.
Specifically it
seems
         important to be
         >>>>>> able to cleanly access the individual
branches (eg
by mapping
         >>>>>> name->stream), which I thought was the original
intention of
         this KIP.
         >>>>>>
         >>>>>> That said, I don't think we should so easily
give in
to the
         double brace
         >>>>>> anti-pattern or force ours users into it if
at all
possible to
         >>>>> avoid...just
         >>>>>> my two cents.
         >>>>>>
         >>>>>> Cheers,
         >>>>>> Sophie
         >>>>>>
         >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael
Drogalis <
         >>>>>> michael.droga...@confluent.io
         <mailto:michael.droga...@confluent.io>> wrote:
         >>>>>>
         >>>>>>> I’d like to propose a different way of
thinking
about this.
         To me,
         >>>>> there
         >>>>>>> are three problems with the existing branch
signature:
         >>>>>>>
         >>>>>>> 1. If you use it the way most people do, Java
raises unsafe
type
         >>>>>> warnings.
         >>>>>>> 2. The way in which you use the stream
branches is
         positionally coupled
         >>>>>> to
         >>>>>>> the ordering of the conditionals.
         >>>>>>> 3. It is brittle to extend existing branch
calls
with
         additional code
         >>>>>>> paths.
         >>>>>>>
         >>>>>>> Using associative constructs instead of
relying on
ordered
         constructs
         >>>>>> would
         >>>>>>> be a stronger approach. Consider a
signature that
instead
         looks like
         >>>>>> this:
         >>>>>>> Map<String, KStream<K,V>>
KStream#branch(SortedMap<String,
         Predicate<?
         >>>>>>> super K,? super V>>);
         >>>>>>>
         >>>>>>> Branches are given names in a map, and as a
result,
the API
         returns a
         >>>>>>> mapping of names to streams. The ordering
of the
conditionals is
         >>>>>> maintained
         >>>>>>> because it’s a sorted map. Insert order
determines
the order
of
         >>>>>> evaluation.
         >>>>>>> This solves problem 1 because there are no
more
varargs. It
         solves
         >>>>>> problem
         >>>>>>> 2 because you no longer lean on ordering to
access the
         branch you’re
         >>>>>>> interested in. It solves problem 3 because
you can
introduce
         another
         >>>>>>> conditional by simply attaching another
name to the
         structure, rather
         >>>>>> than
         >>>>>>> messing with the existing indices.
         >>>>>>>
         >>>>>>> One of the drawbacks is that creating the map
inline is
         historically
         >>>>>>> awkward in Java. I know it’s an
anti-pattern to use
         voluminously, but
         >>>>>>> double brace initialization would clean up the
aesthetics.
         >>>>>>>
         >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
         <j...@confluent.io <mailto:j...@confluent.io>>
         >>>>> wrote:
         >>>>>>>> Hi Ivan,
         >>>>>>>>
         >>>>>>>> Thanks for the update.
         >>>>>>>>
         >>>>>>>> FWIW, I agree with Matthias that the current
"start
branching"
         >>>>> operator
         >>>>>>> is
         >>>>>>>> confusing when named the same way as the
actual
branches.
         "Split"
         >>>>> seems
         >>>>>>>> like a good name. Alternatively, we can do
without
a "start
         >>>>> branching"
         >>>>>>>> operator at all, and just do:
         >>>>>>>>
         >>>>>>>> stream
         >>>>>>>>      .branch(Predicate)
         >>>>>>>>      .branch(Predicate)
         >>>>>>>>      .defaultBranch();
         >>>>>>>>
         >>>>>>>> Tentatively, I think that this branching
operation
should be
         >>>>> terminal.
         >>>>>>> That
         >>>>>>>> way, we don't create ambiguity about how
to use
it. That
         is, `branch`
         >>>>>>>> should return `KBranchedStream`, while
`defaultBranch` is
         `void`, to
         >>>>>>>> enforce that it comes last, and that there
is only
one
         definition of
         >>>>>> the
         >>>>>>>> default branch. Potentially, we should log a
warning if
         there's no
         >>>>>>> default,
         >>>>>>>> and additionally log a warning (or throw an
exception) if a
         record
         >>>>>> falls
         >>>>>>>> though with no default.
         >>>>>>>>
         >>>>>>>> Thoughts?
         >>>>>>>>
         >>>>>>>> Thanks,
         >>>>>>>> -John
         >>>>>>>>
         >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias
J. Sax <
         >>>>> matth...@confluent.io
<mailto:matth...@confluent.io>
         >>>>>>>> wrote:
         >>>>>>>>
         >>>>>>>>> Thanks for updating the KIP and your
answers.
         >>>>>>>>>
         >>>>>>>>>
         >>>>>>>>>> this is to make the name similar to
String#split
         >>>>>>>>>>> that also returns an array, right?
         >>>>>>>>> The intend was to avoid name duplication.
The
return type
         should
         >>>>>> _not_
         >>>>>>>>> be an array.
         >>>>>>>>>
         >>>>>>>>> The current proposal is
         >>>>>>>>>
         >>>>>>>>> stream.branch()
         >>>>>>>>>      .branch(Predicate)
         >>>>>>>>>      .branch(Predicate)
         >>>>>>>>>      .defaultBranch();
         >>>>>>>>>
         >>>>>>>>> IMHO, this reads a little odd, because
the first
         `branch()` does
         >>>>> not
         >>>>>>>>> take any parameters and has different
semantics
than the
later
         >>>>>>>>> `branch()` calls. Note, that from the code
snippet above,
it's
         >>>>> hidden
         >>>>>>>>> that the first call is `KStream#branch()`
while
the others
are
         >>>>>>>>> `KBranchedStream#branch()` what makes
reading the
code
harder.
         >>>>>>>>>
         >>>>>>>>> Because I suggested to rename
`addBranch()` ->
`branch()`,
         I though
         >>>>>> it
         >>>>>>>>> might be better to also rename
`KStream#branch()`
to avoid
the
         >>>>> naming
         >>>>>>>>> overlap that seems to be confusing. The
following
reads
much
         >>>>> cleaner
         >>>>>> to
         >>>>>>>> me:
         >>>>>>>>> stream.split()
         >>>>>>>>>      .branch(Predicate)
         >>>>>>>>>      .branch(Predicate)
         >>>>>>>>>      .defaultBranch();
         >>>>>>>>>
         >>>>>>>>> Maybe there is a better alternative to
`split()`
though to
         avoid
         >>>>> the
         >>>>>>>>> naming overlap.
         >>>>>>>>>
         >>>>>>>>>
         >>>>>>>>>> 'default' is, however, a reserved word, so
unfortunately
we
         >>>>> cannot
         >>>>>>> have
         >>>>>>>>> a method with such name :-)
         >>>>>>>>>
         >>>>>>>>> Bummer. Didn't consider this. Maybe we
can still
come up
         with a
         >>>>> short
         >>>>>>>> name?
         >>>>>>>>>
         >>>>>>>>> Can you add the interface
`KBranchedStream` to
the KIP
         with all
         >>>>> it's
         >>>>>>>>> methods? It will be part of public API and
should be
         contained in
         >>>>> the
         >>>>>>>>> KIP. For example, it's unclear atm, what the
return type of
         >>>>>>>>> `defaultBranch()` is.
         >>>>>>>>>
         >>>>>>>>>
         >>>>>>>>> You did not comment on the idea to add a
         `KBranchedStream#get(int
         >>>>>>> index)
         >>>>>>>>> -> KStream` method to get the individually
         branched-KStreams. Would
         >>>>>> be
         >>>>>>>>> nice to get your feedback about it. It
seems you
suggest
         that users
         >>>>>>>>> would need to write custom utility code
otherwise, to
         access them.
         >>>>> We
         >>>>>>>>> should discuss the pros and cons of both
approaches. It
feels
         >>>>>>>>> "incomplete" to me atm, if the API has no
built-in support
         to get
         >>>>> the
         >>>>>>>>> branched-KStreams directly.
         >>>>>>>>>
         >>>>>>>>>
         >>>>>>>>>
         >>>>>>>>> -Matthias
         >>>>>>>>>
         >>>>>>>>>
         >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
         >>>>>>>>>> Hi all!
         >>>>>>>>>>
         >>>>>>>>>> I have updated the KIP-418 according to
the new
vision.
         >>>>>>>>>>
         >>>>>>>>>> Matthias, thanks for your comment!
         >>>>>>>>>>
         >>>>>>>>>>> Renaming KStream#branch() -> #split()
         >>>>>>>>>> I can see your point: this is to make
the name
similar to
         >>>>>>> String#split
         >>>>>>>>>> that also returns an array, right? But
is it
worth the
         loss of
         >>>>>>>> backwards
         >>>>>>>>>> compatibility? We can have overloaded
branch()
as well
         without
         >>>>>>>> affecting
         >>>>>>>>>> the existing code. Maybe the old
array-based
`branch`
method
         >>>>> should
         >>>>>>> be
         >>>>>>>>>> deprecated, but this is a subject for
discussion.
         >>>>>>>>>>
         >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
         >>>>> BranchingKStream#branch(),
         >>>>>>>>>> KBranchedStream#defaultBranch() ->
BranchingKStream#default()
         >>>>>>>>>>
         >>>>>>>>>> Totally agree with 'addBranch->branch'
rename.
'default'
is,
         >>>>>>> however, a
         >>>>>>>>>> reserved word, so unfortunately we
cannot have a
method
         with such
         >>>>>>> name
         >>>>>>>>> :-)
         >>>>>>>>>>> defaultBranch() does take an
`Predicate` as
argument,
but I
         >>>>> think
         >>>>>>> that
         >>>>>>>>>> is not required?
         >>>>>>>>>>
         >>>>>>>>>> Absolutely! I think that was just
copy-paste
error or
         something.
         >>>>>>>>>>
         >>>>>>>>>> Dear colleagues,
         >>>>>>>>>>
         >>>>>>>>>> please revise the new version of the KIP
and
Paul's PR
         >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
         >>>>>>>>>>
         >>>>>>>>>> Any new suggestions/objections?
         >>>>>>>>>>
         >>>>>>>>>> Regards,
         >>>>>>>>>>
         >>>>>>>>>> Ivan
         >>>>>>>>>>
         >>>>>>>>>>
         >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
         >>>>>>>>>>> Thanks for driving the discussion of
this KIP.
It seems
that
         >>>>>>> everybody
         >>>>>>>>>>> agrees that the current branch() method
using
arrays is
not
         >>>>>> optimal.
         >>>>>>>>>>> I had a quick look into the PR and I
like the
overall
         proposal.
         >>>>>>> There
         >>>>>>>>>>> are some minor things we need to
consider. I
would
         recommend the
         >>>>>>>>>>> following renaming:
         >>>>>>>>>>>
         >>>>>>>>>>> KStream#branch() -> #split()
         >>>>>>>>>>> KBranchedStream#addBranch() ->
BranchingKStream#branch()
         >>>>>>>>>>> KBranchedStream#defaultBranch() ->
         BranchingKStream#default()
         >>>>>>>>>>>
         >>>>>>>>>>> It's just a suggestion to get slightly
shorter
method
names.
         >>>>>>>>>>>
         >>>>>>>>>>> In the current PR, defaultBranch() does
take an
         `Predicate` as
         >>>>>>>> argument,
         >>>>>>>>>>> but I think that is not required?
         >>>>>>>>>>>
         >>>>>>>>>>> Also, we should consider KIP-307, that was
recently
         accepted and
         >>>>>> is
         >>>>>>>>>>> currently implemented:
         >>>>>>>>>>>
         >>>>>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL



         >>>>>>>>>>> Ie, we should add overloads that
accepted a
`Named`
         parameter.
         >>>>>>>>>>>
         >>>>>>>>>>>
         >>>>>>>>>>> For the issue that the created
`KStream` object
are in
         different
         >>>>>>>> scopes:
         >>>>>>>>>>> could we extend `KBranchedStream` with a
`get(int
         index)` method
         >>>>>>> that
         >>>>>>>>>>> returns the corresponding "branched"
result
`KStream`
         object?
         >>>>>> Maybe,
         >>>>>>>> the
         >>>>>>>>>>> second argument of `addBranch()` should
not
be a
         >>>>>> `Consumer<KStream>`
         >>>>>>>> but
         >>>>>>>>>>> a `Function<KStream,KStream>` and
`get()` could
return
         whatever
         >>>>>> the
         >>>>>>>>>>> `Function` returns?
         >>>>>>>>>>>
         >>>>>>>>>>>
         >>>>>>>>>>> Finally, I would also suggest to update
the KIP
with the
         current
         >>>>>>>>>>> proposal. That makes it easier to review.
         >>>>>>>>>>>
         >>>>>>>>>>>
         >>>>>>>>>>> -Matthias
         >>>>>>>>>>>
         >>>>>>>>>>>
         >>>>>>>>>>>
         >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
         >>>>>>>>>>>> Ivan,
         >>>>>>>>>>>>
         >>>>>>>>>>>> I'm a bit of a novice here as well, but I
think it
         makes sense
         >>>>>> for
         >>>>>>>> you
         >>>>>>>>> to
         >>>>>>>>>>>> revise the KIP and continue the
discussion.
Obviously
         we'll
         >>>>> need
         >>>>>>>> some
         >>>>>>>>>>>> buy-in from committers that have actual
binding votes on
         >>>>> whether
         >>>>>>> the
         >>>>>>>>> KIP
         >>>>>>>>>>>> could be adopted.  It would be great
to hear
if they
         think this
         >>>>>> is
         >>>>>>> a
         >>>>>>>>> good
         >>>>>>>>>>>> idea overall.  I'm not sure if that
happens
just by
         starting a
         >>>>>>> vote,
         >>>>>>>>> or if
         >>>>>>>>>>>> there is generally some indication of
interest
beforehand.
         >>>>>>>>>>>>
         >>>>>>>>>>>> That being said, I'll continue the
discussion
a bit:
         assuming
         >>>>> we
         >>>>>> do
         >>>>>>>>> move
         >>>>>>>>>>>> forward the solution of "stream.branch()
returns
         >>>>>> KBranchedStream",
         >>>>>>> do
         >>>>>>>>> we
         >>>>>>>>>>>> deprecate "stream.branch(...) returns
KStream[]"?  I
would
         >>>>> favor
         >>>>>>>>>>>> deprecating, since having two mutually
exclusive APIs
that
         >>>>>>> accomplish
         >>>>>>>>> the
         >>>>>>>>>>>> same thing is confusing, especially when
they're fairly
         similar
         >>>>>>>>> anyway.  We
         >>>>>>>>>>>> just need to be sure we're not making
something
         >>>>>>> impossible/difficult
         >>>>>>>>> that
         >>>>>>>>>>>> is currently possible/easy.
         >>>>>>>>>>>>
         >>>>>>>>>>>> Regarding my PR - I think the general
structure would
work,
         >>>>> it's
         >>>>>>>> just a
         >>>>>>>>>>>> little sloppy overall in terms of
naming and
clarity. In
         >>>>>>> particular,
         >>>>>>>>>>>> passing in the "predicates" and
"children"
lists which
get
         >>>>>> modified
         >>>>>>>> in
         >>>>>>>>>>>> KBranchedStream but read from all the way
         KStreamLazyBranch is
         >>>>> a
         >>>>>>> bit
         >>>>>>>>>>>> complicated to follow.
         >>>>>>>>>>>>
         >>>>>>>>>>>> Thanks,
         >>>>>>>>>>>> Paul
         >>>>>>>>>>>>
         >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan
Ponomarev <
         >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
         >>>>>>>>> wrote:
         >>>>>>>>>>>>> Hi Paul!
         >>>>>>>>>>>>>
         >>>>>>>>>>>>> I read your code carefully and now I
am fully
         convinced: your
         >>>>>>>> proposal
         >>>>>>>>>>>>> looks better and should work. We just
have to
document
the
         >>>>>> crucial
         >>>>>>>>> fact
         >>>>>>>>>>>>> that KStream consumers are invoked as
they're
added.
         And then
         >>>>>> it's
         >>>>>>>> all
         >>>>>>>>>>>>> going to be very nice.
         >>>>>>>>>>>>>
         >>>>>>>>>>>>> What shall we do now? I should
re-write the
KIP and
         resume the
         >>>>>>>>>>>>> discussion here, right?
         >>>>>>>>>>>>>
         >>>>>>>>>>>>> Why are you telling that your PR
'should not
be even a
         >>>>> starting
         >>>>>>>> point
         >>>>>>>>> if
         >>>>>>>>>>>>> we go in this direction'? To me it
looks like
a good
         starting
         >>>>>>> point.
         >>>>>>>>> But
         >>>>>>>>>>>>> as a novice in this project I might
miss some
important
         >>>>> details.
         >>>>>>>>>>>>> Regards,
         >>>>>>>>>>>>>
         >>>>>>>>>>>>> Ivan
         >>>>>>>>>>>>>
         >>>>>>>>>>>>>
         >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
         >>>>>>>>>>>>>> Ivan,
         >>>>>>>>>>>>>>
         >>>>>>>>>>>>>> Maybe I’m missing the point, but I
believe the
         >>>>> stream.branch()
         >>>>>>>>> solution
         >>>>>>>>>>>>> supports this. The couponIssuer::set*
consumers will be
         >>>>> invoked
         >>>>>> as
         >>>>>>>>> they’re
         >>>>>>>>>>>>> added, not during
streamsBuilder.build(). So
the user
         still
         >>>>>> ought
         >>>>>>> to
         >>>>>>>>> be
         >>>>>>>>>>>>> able to call couponIssuer.coupons()
afterward
and
         depend on
         >>>>> the
         >>>>>>>>> branched
         >>>>>>>>>>>>> streams having been set.
         >>>>>>>>>>>>>> The issue I mean to point out is
that it is
hard to
         access
         >>>>> the
         >>>>>>>>> branched
         >>>>>>>>>>>>> streams in the same scope as the
original
stream (that
         is, not
         >>>>>>>> inside
         >>>>>>>>> the
         >>>>>>>>>>>>> couponIssuer), which is a problem
with both
proposed
         >>>>> solutions.
         >>>>>> It
         >>>>>>>>> can be
         >>>>>>>>>>>>> worked around though.
         >>>>>>>>>>>>>> [Also, great to hear additional
interest in
401, I’m
         excited
         >>>>> to
         >>>>>>>> hear
         >>>>>>>>>>>>> your thoughts!]
         >>>>>>>>>>>>>> Paul
         >>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan
Ponomarev <
         >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
         >>>>>>>>> wrote:
         >>>>>>>>>>>>>>> Hi Paul!
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> The idea to postpone the wiring of
branches
to the
         >>>>>>>>>>>>> streamsBuilder.build() also looked
great for
me at
first
         >>>>> glance,
         >>>>>>> but
         >>>>>>>>> ---
         >>>>>>>>>>>>>>>> the newly branched streams are not
available in the
         same
         >>>>>> scope
         >>>>>>> as
         >>>>>>>>> each
         >>>>>>>>>>>>> other.  That is, if we wanted to merge
them back
together
         >>>>> again
         >>>>>> I
         >>>>>>>>> don't see
         >>>>>>>>>>>>> a way to do that.
         >>>>>>>>>>>>>>> You just took the words right out
of my
mouth, I was
         just
         >>>>>> going
         >>>>>>> to
         >>>>>>>>>>>>> write in details about this issue.
         >>>>>>>>>>>>>>> Consider the example from Bill's
book, p.
101: say
         we need
         >>>>> to
         >>>>>>>>> identify
         >>>>>>>>>>>>> customers who have bought coffee and
made a
purchase
         in the
         >>>>>>>>> electronics
         >>>>>>>>>>>>> store to give them coupons.
         >>>>>>>>>>>>>>> This is the code I usually write under
these
         circumstances
         >>>>>> using
         >>>>>>>> my
         >>>>>>>>>>>>> 'brancher' class:
         >>>>>>>>>>>>>>> @Setter
         >>>>>>>>>>>>>>> class CouponIssuer{
         >>>>>>>>>>>>>>>   private KStream<....>
coffePurchases;
         >>>>>>>>>>>>>>>   private KStream<....>
electronicsPurchases;
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>   KStream<...> coupons(){
         >>>>>>>>>>>>>>>       return
         >>>>>>>>>
coffePurchases.join(electronicsPurchases...)...whatever
         >>>>>>>>>>>>>>>       /*In the real world the code
here
can be
         complex, so
         >>>>>>>>> creation of
         >>>>>>>>>>>>> a separate CouponIssuer class is fully
justified, in
         order to
         >>>>>>>> separate
         >>>>>>>>>>>>> classes' responsibilities.*/
         >>>>>>>>>>>>>>>  }
         >>>>>>>>>>>>>>> }
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new
CouponIssuer();
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
         >>>>>>>>>>>>>>>     .branch(predicate1,
couponIssuer::setCoffePurchases)
         >>>>>>>>>>>>>>>     .branch(predicate2,
         >>>>>> couponIssuer::setElectronicsPurchases)
         >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> /*Alas, this won't work if we're
going to
wire up
         everything
         >>>>>>>> later,
         >>>>>>>>>>>>> without the terminal operation!!!*/
         >>>>>>>>>>>>>>> couponIssuer.coupons()...
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> Does this make sense?  In order to
properly
         initialize the
         >>>>>>>>> CouponIssuer
         >>>>>>>>>>>>> we need the terminal operation to be
called
before
         >>>>>>>>> streamsBuilder.build()
         >>>>>>>>>>>>> is called.
         >>>>>>>>>>>>>>> [BTW Paul, I just found out that your
KIP-401 is
         essentially
         >>>>>> the
         >>>>>>>>> next
         >>>>>>>>>>>>> KIP I was going to write here. I have
some
thoughts
         based on
         >>>>> my
         >>>>>>>>> experience,
         >>>>>>>>>>>>> so I will join the discussion on KIP-401
soon.]
         >>>>>>>>>>>>>>> Regards,
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> Ivan
         >>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
         >>>>>>>>>>>>>>>> Ivan,
         >>>>>>>>>>>>>>>> I tried to make a very rough proof of
concept of a
         fluent
         >>>>> API
         >>>>>>>> based
         >>>>>>>>>>>>> off of
         >>>>>>>>>>>>>>>> KStream here
         (https://github.com/apache/kafka/pull/6512),
         >>>>>> and
         >>>>>>> I
         >>>>>>>>> think
         >>>>>>>>>>>>> I
         >>>>>>>>>>>>>>>> succeeded at removing both cons.
         >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect
earlier about
         >>>>>>> compatibility
         >>>>>>>>>>>>> issues,
         >>>>>>>>>>>>>>>>    there aren't any direct ones.
I was
unaware
         that Java
         >>>>> is
         >>>>>>>> smart
         >>>>>>>>>>>>> enough to
         >>>>>>>>>>>>>>>>    distinguish between a
branch(varargs...)
         returning one
         >>>>>>> thing
         >>>>>>>>> and
         >>>>>>>>>>>>> branch()
         >>>>>>>>>>>>>>>>    with no arguments returning
another
thing.
         >>>>>>>>>>>>>>>>    - Requiring a terminal method:
We don't
actually
         need
         >>>>> it.
         >>>>>>> We
         >>>>>>>>> can
         >>>>>>>>>>>>> just
         >>>>>>>>>>>>>>>>    build up the branches in the
KBranchedStream who
         shares
         >>>>>> its
         >>>>>>>>> state
         >>>>>>>>>>>>> with the
         >>>>>>>>>>>>>>>>    ProcessorSupplier that will
actually do
the
         branching.
         >>>>>>> It's
         >>>>>>>>> not
         >>>>>>>>>>>>> terribly
         >>>>>>>>>>>>>>>>    pretty in its current form, but I
think it
         demonstrates
         >>>>>> its
         >>>>>>>>>>>>> feasibility.
         >>>>>>>>>>>>>>>> To be clear, I don't think that pull
request should
be
         >>>>> final
         >>>>>> or
         >>>>>>>>> even a
         >>>>>>>>>>>>>>>> starting point if we go in this
direction,
I just
         wanted to
         >>>>>> see
         >>>>>>>> how
         >>>>>>>>>>>>>>>> challenging it would be to get the
API
working.
         >>>>>>>>>>>>>>>> I will say though, that I'm not
sure the
existing
         solution
         >>>>>>> could
         >>>>>>>> be
         >>>>>>>>>>>>>>>> deprecated in favor of this, which
I had
originally
         >>>>> suggested
         >>>>>>>> was a
         >>>>>>>>>>>>>>>> possibility.  The reason is that
the newly
branched
         streams
         >>>>>> are
         >>>>>>>> not
         >>>>>>>>>>>>>>>> available in the same scope as each
other.  That
         is, if we
         >>>>>>> wanted
         >>>>>>>>> to
         >>>>>>>>>>>>> merge
         >>>>>>>>>>>>>>>> them back together again I don't
see a way
to do
         that.  The
         >>>>>> KIP
         >>>>>>>>>>>>> proposal
         >>>>>>>>>>>>>>>> has the same issue, though - all this
means is that
for
         >>>>>> either
         >>>>>>>>>>>>> solution,
         >>>>>>>>>>>>>>>> deprecating the existing
branch(...) is
not on the
         table.
         >>>>>>>>>>>>>>>> Thanks,
         >>>>>>>>>>>>>>>> Paul
         >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM
Ivan
Ponomarev <
         >>>>>>>>> iponoma...@mail.ru
<mailto:iponoma...@mail.ru>>
         >>>>>>>>>>>>> wrote:
         >>>>>>>>>>>>>>>>> OK, let me summarize what we have
discussed up to
this
         >>>>>> point.
         >>>>>>>>>>>>>>>>> First, it seems that it's
commonly agreed
that
         branch API
         >>>>>>> needs
         >>>>>>>>>>>>>>>>> improvement. Motivation is given in
the KIP.
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> There are two potential ways to
do it:
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
         >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
         >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
         >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
         >>>>>>>>>>>>>>>>>
.onTopOf(stream).mapValues(...)....
//onTopOf
         returns
         >>>>>> its
         >>>>>>>>> argument
         >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards
compatible. 2)
The code
won't
     ��   >>>>> make
         >>>>>>>> sense
         >>>>>>>>>>>>> until
         >>>>>>>>>>>>>>>>> all the necessary ingredients are
provided.
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> CONS: The need to create a
KafkaStreamsBrancher
         instance
         >>>>>>>>> contrasts the
         >>>>>>>>>>>>>>>>> fluency of other KStream methods.
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> stream
         >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
         >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
         >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or
noDefault(). Both
         >>>>>>>>> defaultBranch(..)
         >>>>>>>>>>>>> and
         >>>>>>>>>>>>>>>>> noDefault() return void
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> PROS: Generally follows the way
KStreams
interface
is
         >>>>>> defined.
         >>>>>>>>>>>>>>>>> CONS: We need to define two terminal
methods
         >>>>>>>> (defaultBranch(ks->)
         >>>>>>>>> and
         >>>>>>>>>>>>>>>>> noDefault()). And for a user it
is very
easy to
         miss the
         >>>>>> fact
         >>>>>>>>> that one
         >>>>>>>>>>>>>>>>> of the terminal methods should be
called.
If these
         methods
         >>>>>> are
         >>>>>>>> not
         >>>>>>>>>>>>>>>>> called, we can throw an exception in
runtime.
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> Colleagues, what are your
thoughts? Can
we do
better?
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> Regards,
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> Ivan
         >>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev
пишет:
         >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev
пишет:
         >>>>>>>>>>>>>>>>>>> Paul,
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>> I see your point when you are
talking
about
         >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot
not be
         implemented the
         >>>>>>> easy
         >>>>>>>>> way.
         >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>> Let me comment on two of your
ideas.
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>> user could specify a terminal
method that
assumes
         >>>>> nothing
         >>>>>>>> will
         >>>>>>>>>>>>> reach
         >>>>>>>>>>>>>>>>>>>> the default branch,
         >>>>>>>>>>>>>>>>>>> throwing an exception if such a
case
occurs.
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should
not be
the only
option
         >>>>>> besides
         >>>>>>>>>>>>>>>>>>> `default`, because there are
scenarios
when we
         want to
         >>>>>> just
         >>>>>>>>> silently
         >>>>>>>>>>>>>>>>>>> drop the messages that didn't
match any
         predicate. 2)
         >>>>>>> Throwing
         >>>>>>>>> an
         >>>>>>>>>>>>>>>>>>> exception in the middle of data
flow
processing
         looks
         >>>>>> like a
         >>>>>>>> bad
         >>>>>>>>>>>>> idea.
         >>>>>>>>>>>>>>>>>>> In stream processing paradigm,
I would
prefer to
         emit a
         >>>>>>>> special
         >>>>>>>>>>>>>>>>>>> message to a dedicated stream.
This is
exactly
where
         >>>>>>> `default`
         >>>>>>>>> can
         >>>>>>>>>>>>> be
         >>>>>>>>>>>>>>>>>>> used.
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
         >>>>> InternalTopologyBuilder
         >>>>>>> to
         >>>>>>>>> track
         >>>>>>>>>>>>>>>>>>>> dangling
         >>>>>>>>>>>>>>>>>>> branches that haven't been
terminated
and raise
         a clear
         >>>>>>> error
         >>>>>>>>>>>>> before it
         >>>>>>>>>>>>>>>>>>> becomes an issue.
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>> You mean a runtime exception,
when the
program is
         >>>>> compiled
         >>>>>>> and
         >>>>>>>>> run?
         >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that
simply
won't
         compile if
         >>>>> used
         >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an
API as a
         method chain
         >>>>>>>> starting
         >>>>>>>>>>>>> from
         >>>>>>>>>>>>>>>>>>> KStream object? There is a huge
cost
difference
         between
         >>>>>>>> runtime
         >>>>>>>>> and
         >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a
failure
uncovers
         >>>>> instantly
         >>>>>> on
         >>>>>>>>> unit
         >>>>>>>>>>>>>>>>>>> tests, it costs more for the
project
than a
         compilation
         >>>>>>>> failure.
         >>>>>>>>>>>>>>>>>>> Regards,
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>> Ivan
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen
пишет:
         >>>>>>>>>>>>>>>>>>>> Ivan,
         >>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>> Good point about the terminal
operation being
         required.
         >>>>>>> But
         >>>>>>>> is
         >>>>>>>>>>>>> that
         >>>>>>>>>>>>>>>>>>>> really
         >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user
doesn't
want a
         >>>>>> defaultBranch
         >>>>>>>>> they
         >>>>>>>>>>>>> can
         >>>>>>>>>>>>>>>>>>>> call
         >>>>>>>>>>>>>>>>>>>> some other terminal method
(noDefaultBranch()?)
         just as
         >>>>>>>>> easily.  In
         >>>>>>>>>>>>>>>>>>>> fact I
         >>>>>>>>>>>>>>>>>>>> think it creates an
opportunity for a
nicer API
- a
         >>>>> user
         >>>>>>>> could
         >>>>>>>>>>>>> specify
         >>>>>>>>>>>>>>>>> a
         >>>>>>>>>>>>>>>>>>>> terminal method that assumes
nothing
will reach
the
         >>>>>> default
         >>>>>>>>> branch,
         >>>>>>>>>>>>>>>>>>>> throwing an exception if such
a case
occurs.
That
         >>>>> seems
         >>>>>>> like
         >>>>>>>>> an
         >>>>>>>>>>>>>>>>>>>> improvement over the current
branch()
API,
         which allows
         >>>>>> for
         >>>>>>>> the
         >>>>>>>>>>>>> more
         >>>>>>>>>>>>>>>>>>>> subtle
         >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly
getting
dropped.
         >>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>> The need for a terminal operation
certainly has
         to be
         >>>>>> well
         >>>>>>>>>>>>>>>>>>>> documented, but
         >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
         >>>>> InternalTopologyBuilder
         >>>>>>> to
         >>>>>>>>> track
         >>>>>>>>>>>>>>>>>>>> dangling
         >>>>>>>>>>>>>>>>>>>> branches that haven't been
terminated
and raise
         a clear
         >>>>>>> error
         >>>>>>>>>>>>> before it
         >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially
now that
there is
a
         >>>>> "build
         >>>>>>>> step"
         >>>>>>>>>>>>> where
         >>>>>>>>>>>>>>>>> the
         >>>>>>>>>>>>>>>>>>>> topology is actually wired up,
when
         >>>>>> StreamsBuilder.build()
         >>>>>>> is
         >>>>>>>>>>>>> called.
         >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its
argument, I
agree
         >>>>> that
         >>>>>>> it's
         >>>>>>>>>>>>>>>>>>>> critical to
         >>>>>>>>>>>>>>>>>>>> allow users to do other
operations on
the input
         stream.
         >>>>>>> With
         >>>>>>>>> the
         >>>>>>>>>>>>>>>>> fluent
         >>>>>>>>>>>>>>>>>>>> solution, it ought to work the
same
way all
other
         >>>>>>> operations
         >>>>>>>>> do -
         >>>>>>>>>>>>> if
         >>>>>>>>>>>>>>>>> you
         >>>>>>>>>>>>>>>>>>>> want to process off the original
KStream
multiple
         >>>>> times,
         >>>>>>> you
         >>>>>>>>> just
         >>>>>>>>>>>>>>>>>>>> need the
         >>>>>>>>>>>>>>>>>>>> stream as a variable so you
can call
as many
         operations
         >>>>>> on
         >>>>>>> it
         >>>>>>>>> as
         >>>>>>>>>>>>> you
         >>>>>>>>>>>>>>>>>>>> desire.
         >>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>> Thoughts?
         >>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>> Best,
         >>>>>>>>>>>>>>>>>>>> Paul
         >>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02
PM Ivan
Ponomarev <
         >>>>>>>>> iponoma...@mail.ru
<mailto:iponoma...@mail.ru>
         >>>>>>>>>>>>>>>>>>>> wrote:
         >>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>> Hello Paul,
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>> I afraid this won't work
because we
do not
         always need
         >>>>>> the
         >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a
terminal
operation we
         >>>>> don't
         >>>>>>>> know
         >>>>>>>>>>>>> when to
         >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch
switch'.
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf
returns its
argument,
         so we
         >>>>> can
         >>>>>> do
         >>>>>>>>>>>>> something
         >>>>>>>>>>>>>>>>>>>>> more with the original branch
after
branching.
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>> I understand your point that the
need of
special
         >>>>> object
         >>>>>>>>>>>>> construction
         >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most
KStream
methods.
But
         >>>>> here
         >>>>>> we
         >>>>>>>>> have a
         >>>>>>>>>>>>>>>>>>>>> special case: we build the
switch to
split the
         flow,
         >>>>> so
         >>>>>> I
         >>>>>>>>> think
         >>>>>>>>>>>>> this
         >>>>>>>>>>>>>>>>> is
         >>>>>>>>>>>>>>>>>>>>> still idiomatic.
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>> Regards,
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>> Ivan
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen
пишет:
         >>>>>>>>>>>>>>>>>>>>>> Ivan,
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to
improve
this
         API, but I
         >>>>>> find
         >>>>>>>> the
         >>>>>>>>>>>>>>>>>>>>>> onTopOff()
         >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing
since it
         contrasts the
         >>>>>>> fluency
         >>>>>>>>> of
         >>>>>>>>>>>>> other
         >>>>>>>>>>>>>>>>>>>>>> KStream method calls.
Ideally I'd
like to
         just call
         >>>>> a
         >>>>>>>>> method on
         >>>>>>>>>>>>> the
         >>>>>>>>>>>>>>>>>>>>> stream
         >>>>>>>>>>>>>>>>>>>>>> so it still reads top to
bottom if
the branch
         cases
         >>>>> are
         >>>>>>>>> defined
         >>>>>>>>>>>>>>>>>>>>>> fluently.
         >>>>>>>>>>>>>>>>>>>>>> I think the
addBranch(predicate,
handleCase)
         is very
         >>>>>> nice
         >>>>>>>>> and the
         >>>>>>>>>>>>>>>>>>>>>> right
         >>>>>>>>>>>>>>>>>>>>> way
         >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we
flipped
around
         how we
         >>>>>>> specify
         >>>>>>>>> the
         >>>>>>>>>>>>> source
         >>>>>>>>>>>>>>>>>>>>>> stream.
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>> Like:
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>> stream.branch()
         >>>>>>>>>>>>>>>>>>>>>>
.addBranch(predicate1,
this::handle1)
         >>>>>>>>>>>>>>>>>>>>>>
.addBranch(predicate2,
this::handle2)
         >>>>>>>>>>>>>>>>>>>>>>
.defaultBranch(this::handleDefault);
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a
KBranchedStreams or
         >>>>>>>> KStreamBrancher
         >>>>>>>>> or
         >>>>>>>>>>>>>>>>>>>>> something,
         >>>>>>>>>>>>>>>>>>>>>> which is added to by
addBranch() and
         terminated by
         >>>>>>>>>>>>> defaultBranch()
         >>>>>>>>>>>>>>>>>>>>>> (which
         >>>>>>>>>>>>>>>>>>>>>> returns void).  This is
obviously
         incompatible with
         >>>>> the
         >>>>>>>>> current
         >>>>>>>>>>>>>>>>>>>>>> API, so
         >>>>>>>>>>>>>>>>>>>>> the
         >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would
have to
have a
         different
         >>>>>> name,
         >>>>>>>> but
         >>>>>>>>> that
         >>>>>>>>>>>>>>>>>>>>>> seems
         >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem
- we
could call it
         >>>>>> something
         >>>>>>>> like
         >>>>>>>>>>>>>>>>>>>>>> branched()
         >>>>>>>>>>>>>>>>>>>>> or
         >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and
deprecate the
old API.
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the
motivations of
your
         KIP?  It
         >>>>>> seems
         >>>>>>>>> like it
         >>>>>>>>>>>>>>>>>>>>>> does to
         >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line
branching
         while also
         >>>>>>>> allowing
         >>>>>>>>> you
         >>>>>>>>>>>>> to
         >>>>>>>>>>>>>>>>>>>>>> dynamically build of
branches off of
         KBranchedStreams
         >>>>>> if
         >>>>>>>>> desired.
         >>>>>>>>>>>>>>>>>>>>>> Thanks,
         >>>>>>>>>>>>>>>>>>>>>> Paul
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28
PM Ivan
Ponomarev
         >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
         >>>>>>>>>>>>>>>>>>>>>> wrote:
         >>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> void
handleFirstCase(KStream<String, String>
         ks){
         >>>>>>>>>>>>>>>>>>>>>>>
ks.filter(....).mapValues(...)
         >>>>>>>>>>>>>>>>>>>>>>> }
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> void
handleSecondCase(KStream<String,
         String> ks){
         >>>>>>>>>>>>>>>>>>>>>>>
ks.selectKey(...).groupByKey()...
         >>>>>>>>>>>>>>>>>>>>>>> }
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> ......
         >>>>>>>>>>>>>>>>>>>>>>> new
KafkaStreamsBrancher<String,
String>()
         >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
         this::handleFirstCase)
         >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
         this::handleSecondCase)
         >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> Regards,
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> Ivan
         >>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill
Bejeck пишет:
         >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
KafkaStreamsBrancher
         >>>>> takes a
         >>>>>>>>> Consumer
         >>>>>>>>>>>>> as a
         >>>>>>>>>>>>>>>>>>>>>>> second
         >>>>>>>>>>>>>>>>>>>>>>>> argument which returns
nothing,
and the
         example in
         >>>>>> the
         >>>>>>>> KIP
         >>>>>>>>>>>>> shows
         >>>>>>>>>>>>>>>>>>>>>>>> each
         >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch
using a
terminal node
         >>>>>>>>> (KafkaStreams#to()
         >>>>>>>>>>>>>>>>>>>>>>>> in this
         >>>>>>>>>>>>>>>>>>>>>>>> case).
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed
something, but
how would
we
         >>>>> handle
         >>>>>>> the
         >>>>>>>>> case
         >>>>>>>>>>>>>>>>>>>>>>>> where the
         >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but
wants to
continue
         >>>>>>>> processing
         >>>>>>>>> and
         >>>>>>>>>>>>> not
         >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal
node on
the
branched
         >>>>>> stream
         >>>>>>>>>>>>> immediately?
         >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's
logic
as is if
         we had
         >>>>>>>> something
         >>>>>>>>> like
         >>>>>>>>>>>>>>>>>>>>>>>> this:
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[]
branches =
         >>>>>>>>>>>>>>>>>>>>>>>>
originalStream.branch(predicate1,
         >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
         >>>>>>>>>>>>>>>>>>>>>>>>
branches[0].filter(....).mapValues(...)..
         >>>>>>>>>>>>>>>>>>>>>>>>
branches[1].selectKey(...).groupByKey().....
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
         >>>>>>>>>>>>>>>>>>>>>>>> Bill
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at
6:15 PM
Bill Bejeck
<
         >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
         >>>>>>>>>>>>>>>>>>>>>>>> wrote:
         >>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>> All,
         >>>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the
discussion for
KIP-
         >>>>> 418.
         >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
         >>>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
         >>>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a
discussion
about
KIP-418.
         >>>>> Please
         >>>>>>>> take
         >>>>>>>>> a
         >>>>>>>>>>>>> look
         >>>>>>>>>>>>>>>>> at
         >>>>>>>>>>>>>>>>>>>>> the
         >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would
appreciate any
         feedback :)
         >>>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
         >>>>>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream



         >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
         >>>>>>>>>>>>>
https://issues.apache.org/jira/browse/KAFKA-5488
         >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
         >>>>> https://github.com/apache/kafka/pull/6164
         >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
         >>>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
         >>>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>>>>>>>>>>>>>>>>>
         >>>>>>>>>
         >















Attachments:
* signature.asc




Reply via email to