I also agree with Michael's observation about the core problem of
current `branch()` implementation.

However, I also don't like to pass in a clumsy Map object. My thinking
was more aligned with Paul's proposal to just add a name to each
`branch()` statement and return a `Map<String,KStream>`.

It makes the code easier to read, and also make the order of
`Predicates` (that is essential) easier to grasp.

>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>    .branch("branchOne", Predicate<K, V>)
>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>    .defaultBranch("defaultBranch");

An open question is the case for which no defaultBranch() should be
specified. Atm, `split()` and `branch()` would return `BranchedKStream`
and the call to `defaultBranch()` that returns the `Map` is mandatory
(what is not the case atm). Or is this actually not a real problem,
because users can just ignore the branch returned by `defaultBranch()`
in the result `Map` ?


About "inlining": So far, it seems to be a matter of personal
preference. I can see arguments for both, but no "killer argument" yet
that clearly make the case for one or the other.


-Matthias

On 5/1/19 6:26 PM, Paul Whalen wrote:
> Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
> with the full downstream topology be defined inline - it can be a method 
> reference as with Ivan’s original suggestion.  The advantage of putting the 
> predicate and its downstream logic (Consumer) together in branch() is that 
> they are required to be near to each other. 
> 
> Ultimately the downstream code has to live somewhere, and deep branch trees 
> will be hard to read regardless.
> 
>> On May 1, 2019, at 1:07 PM, Michael Drogalis <michael.droga...@confluent.io> 
>> wrote:
>>
>> I'm less enthusiastic about inlining the branch logic with its downstream
>> functionality. Programs that have deep branch trees will quickly become
>> harder to read as a single unit.
>>
>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com> wrote:
>>>
>>> Also +1 on the issues/goals as Michael outlined them, I think that sets a
>>> great framework for the discussion.
>>>
>>> Regarding the SortedMap solution, my understanding is that the current
>>> proposal in the KIP is what is in my PR which (pending naming decisions) is
>>> roughly this:
>>>
>>> stream.split()
>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>    .defaultBranch(Consumer<KStream<K, V>>);
>>>
>>> Obviously some ordering is necessary, since branching as a construct
>>> doesn't work without it, but this solution seems like it provides as much
>>> associativity as the SortedMap solution, because each branch() call
>>> directly associates the "conditional" with the "code block."  The value it
>>> provides over the KIP solution is the accessing of streams in the same
>>> scope.
>>>
>>> The KIP solution is less "dynamic" than the SortedMap solution in the sense
>>> that it is slightly clumsier to add a dynamic number of branches, but it is
>>> certainly possible.  It seems to me like the API should favor the "static"
>>> case anyway, and should make it simple and readable to fluently declare and
>>> access your branches in-line.  It also makes it impossible to ignore a
>>> branch, and it is possible to build an (almost) identical SortedMap
>>> solution on top of it.
>>>
>>> I could also see a middle ground where instead of a raw SortedMap being
>>> taken in, branch() takes a name and not a Consumer.  Something like this:
>>>
>>> Map<String, KStream<K, V>> branches = stream.split()
>>>    .branch("branchOne", Predicate<K, V>)
>>>    .branch( "branchTwo", Predicate<K, V>)
>>>    .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
>>>
>>> Pros for that solution:
>>> - accessing branched KStreams in same scope
>>> - no double brace initialization, hopefully slightly more readable than
>>> SortedMap
>>>
>>> Cons
>>> - downstream branch logic cannot be specified inline which makes it harder
>>> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>>> - you can forget to "handle" one of the branched streams (like existing
>>> API and SortedMap, but unlike the KIP)
>>>
>>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
>>> it).
>>>
>>> Overall I'm curious how important it is to be able to easily access the
>>> branched KStream in the same scope as the original.  It's possible that it
>>> doesn't need to be handled directly by the API, but instead left up to the
>>> user.  I'm sort of in the middle on it.
>>>
>>> Paul
>>>
>>>
>>>
>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <sop...@confluent.io>
>>> wrote:
>>>
>>>> I'd like to +1 what Michael said about the issues with the existing
>>> branch
>>>> method, I agree with what he's outlined and I think we should proceed by
>>>> trying to alleviate these problems. Specifically it seems important to be
>>>> able to cleanly access the individual branches (eg by mapping
>>>> name->stream), which I thought was the original intention of this KIP.
>>>>
>>>> That said, I don't think we should so easily give in to the double brace
>>>> anti-pattern or force ours users into it if at all possible to
>>> avoid...just
>>>> my two cents.
>>>>
>>>> Cheers,
>>>> Sophie
>>>>
>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>> michael.droga...@confluent.io> wrote:
>>>>
>>>>> I’d like to propose a different way of thinking about this. To me,
>>> there
>>>>> are three problems with the existing branch signature:
>>>>>
>>>>> 1. If you use it the way most people do, Java raises unsafe type
>>>> warnings.
>>>>> 2. The way in which you use the stream branches is positionally coupled
>>>> to
>>>>> the ordering of the conditionals.
>>>>> 3. It is brittle to extend existing branch calls with additional code
>>>>> paths.
>>>>>
>>>>> Using associative constructs instead of relying on ordered constructs
>>>> would
>>>>> be a stronger approach. Consider a signature that instead looks like
>>>> this:
>>>>>
>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<?
>>>>> super K,? super V>>);
>>>>>
>>>>> Branches are given names in a map, and as a result, the API returns a
>>>>> mapping of names to streams. The ordering of the conditionals is
>>>> maintained
>>>>> because it’s a sorted map. Insert order determines the order of
>>>> evaluation.
>>>>>
>>>>> This solves problem 1 because there are no more varargs. It solves
>>>> problem
>>>>> 2 because you no longer lean on ordering to access the branch you’re
>>>>> interested in. It solves problem 3 because you can introduce another
>>>>> conditional by simply attaching another name to the structure, rather
>>>> than
>>>>> messing with the existing indices.
>>>>>
>>>>> One of the drawbacks is that creating the map inline is historically
>>>>> awkward in Java. I know it’s an anti-pattern to use voluminously, but
>>>>> double brace initialization would clean up the aesthetics.
>>>>>
>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io>
>>> wrote:
>>>>>
>>>>>> Hi Ivan,
>>>>>>
>>>>>> Thanks for the update.
>>>>>>
>>>>>> FWIW, I agree with Matthias that the current "start branching"
>>> operator
>>>>> is
>>>>>> confusing when named the same way as the actual branches. "Split"
>>> seems
>>>>>> like a good name. Alternatively, we can do without a "start
>>> branching"
>>>>>> operator at all, and just do:
>>>>>>
>>>>>> stream
>>>>>>      .branch(Predicate)
>>>>>>      .branch(Predicate)
>>>>>>      .defaultBranch();
>>>>>>
>>>>>> Tentatively, I think that this branching operation should be
>>> terminal.
>>>>> That
>>>>>> way, we don't create ambiguity about how to use it. That is, `branch`
>>>>>> should return `KBranchedStream`, while `defaultBranch` is `void`, to
>>>>>> enforce that it comes last, and that there is only one definition of
>>>> the
>>>>>> default branch. Potentially, we should log a warning if there's no
>>>>> default,
>>>>>> and additionally log a warning (or throw an exception) if a record
>>>> falls
>>>>>> though with no default.
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>> matth...@confluent.io
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for updating the KIP and your answers.
>>>>>>>
>>>>>>>
>>>>>>>> this is to make the name similar to String#split
>>>>>>>>> that also returns an array, right?
>>>>>>>
>>>>>>> The intend was to avoid name duplication. The return type should
>>>> _not_
>>>>>>> be an array.
>>>>>>>
>>>>>>> The current proposal is
>>>>>>>
>>>>>>> stream.branch()
>>>>>>>      .branch(Predicate)
>>>>>>>      .branch(Predicate)
>>>>>>>      .defaultBranch();
>>>>>>>
>>>>>>> IMHO, this reads a little odd, because the first `branch()` does
>>> not
>>>>>>> take any parameters and has different semantics than the later
>>>>>>> `branch()` calls. Note, that from the code snippet above, it's
>>> hidden
>>>>>>> that the first call is `KStream#branch()` while the others are
>>>>>>> `KBranchedStream#branch()` what makes reading the code harder.
>>>>>>>
>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`, I though
>>>> it
>>>>>>> might be better to also rename `KStream#branch()` to avoid the
>>> naming
>>>>>>> overlap that seems to be confusing. The following reads much
>>> cleaner
>>>> to
>>>>>> me:
>>>>>>>
>>>>>>> stream.split()
>>>>>>>      .branch(Predicate)
>>>>>>>      .branch(Predicate)
>>>>>>>      .defaultBranch();
>>>>>>>
>>>>>>> Maybe there is a better alternative to `split()` though to avoid
>>> the
>>>>>>> naming overlap.
>>>>>>>
>>>>>>>
>>>>>>>> 'default' is, however, a reserved word, so unfortunately we
>>> cannot
>>>>> have
>>>>>>> a method with such name :-)
>>>>>>>
>>>>>>> Bummer. Didn't consider this. Maybe we can still come up with a
>>> short
>>>>>> name?
>>>>>>>
>>>>>>>
>>>>>>> Can you add the interface `KBranchedStream` to the KIP with all
>>> it's
>>>>>>> methods? It will be part of public API and should be contained in
>>> the
>>>>>>> KIP. For example, it's unclear atm, what the return type of
>>>>>>> `defaultBranch()` is.
>>>>>>>
>>>>>>>
>>>>>>> You did not comment on the idea to add a `KBranchedStream#get(int
>>>>> index)
>>>>>>> -> KStream` method to get the individually branched-KStreams. Would
>>>> be
>>>>>>> nice to get your feedback about it. It seems you suggest that users
>>>>>>> would need to write custom utility code otherwise, to access them.
>>> We
>>>>>>> should discuss the pros and cons of both approaches. It feels
>>>>>>> "incomplete" to me atm, if the API has no built-in support to get
>>> the
>>>>>>> branched-KStreams directly.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>>>>> Hi all!
>>>>>>>>
>>>>>>>> I have updated the KIP-418 according to the new vision.
>>>>>>>>
>>>>>>>> Matthias, thanks for your comment!
>>>>>>>>
>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>>>>>
>>>>>>>> I can see your point: this is to make the name similar to
>>>>> String#split
>>>>>>>> that also returns an array, right? But is it worth the loss of
>>>>>> backwards
>>>>>>>> compatibility? We can have overloaded branch() as well without
>>>>>> affecting
>>>>>>>> the existing code. Maybe the old array-based `branch` method
>>> should
>>>>> be
>>>>>>>> deprecated, but this is a subject for discussion.
>>>>>>>>
>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>> BranchingKStream#branch(),
>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default()
>>>>>>>>
>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default' is,
>>>>> however, a
>>>>>>>> reserved word, so unfortunately we cannot have a method with such
>>>>> name
>>>>>>> :-)
>>>>>>>>
>>>>>>>>> defaultBranch() does take an `Predicate` as argument, but I
>>> think
>>>>> that
>>>>>>>> is not required?
>>>>>>>>
>>>>>>>> Absolutely! I think that was just copy-paste error or something.
>>>>>>>>
>>>>>>>> Dear colleagues,
>>>>>>>>
>>>>>>>> please revise the new version of the KIP and Paul's PR
>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>>>>>
>>>>>>>> Any new suggestions/objections?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Ivan
>>>>>>>>
>>>>>>>>
>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>>>>>> Thanks for driving the discussion of this KIP. It seems that
>>>>> everybody
>>>>>>>>> agrees that the current branch() method using arrays is not
>>>> optimal.
>>>>>>>>>
>>>>>>>>> I had a quick look into the PR and I like the overall proposal.
>>>>> There
>>>>>>>>> are some minor things we need to consider. I would recommend the
>>>>>>>>> following renaming:
>>>>>>>>>
>>>>>>>>> KStream#branch() -> #split()
>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
>>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default()
>>>>>>>>>
>>>>>>>>> It's just a suggestion to get slightly shorter method names.
>>>>>>>>>
>>>>>>>>> In the current PR, defaultBranch() does take an `Predicate` as
>>>>>> argument,
>>>>>>>>> but I think that is not required?
>>>>>>>>>
>>>>>>>>> Also, we should consider KIP-307, that was recently accepted and
>>>> is
>>>>>>>>> currently implemented:
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>>>>
>>>>>>>>> Ie, we should add overloads that accepted a `Named` parameter.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> For the issue that the created `KStream` object are in different
>>>>>> scopes:
>>>>>>>>> could we extend `KBranchedStream` with a `get(int index)` method
>>>>> that
>>>>>>>>> returns the corresponding "branched" result `KStream` object?
>>>> Maybe,
>>>>>> the
>>>>>>>>> second argument of `addBranch()` should not be a
>>>> `Consumer<KStream>`
>>>>>> but
>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return whatever
>>>> the
>>>>>>>>> `Function` returns?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Finally, I would also suggest to update the KIP with the current
>>>>>>>>> proposal. That makes it easier to review.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>>>>>>> Ivan,
>>>>>>>>>>
>>>>>>>>>> I'm a bit of a novice here as well, but I think it makes sense
>>>> for
>>>>>> you
>>>>>>> to
>>>>>>>>>> revise the KIP and continue the discussion.  Obviously we'll
>>> need
>>>>>> some
>>>>>>>>>> buy-in from committers that have actual binding votes on
>>> whether
>>>>> the
>>>>>>> KIP
>>>>>>>>>> could be adopted.  It would be great to hear if they think this
>>>> is
>>>>> a
>>>>>>> good
>>>>>>>>>> idea overall.  I'm not sure if that happens just by starting a
>>>>> vote,
>>>>>>> or if
>>>>>>>>>> there is generally some indication of interest beforehand.
>>>>>>>>>>
>>>>>>>>>> That being said, I'll continue the discussion a bit: assuming
>>> we
>>>> do
>>>>>>> move
>>>>>>>>>> forward the solution of "stream.branch() returns
>>>> KBranchedStream",
>>>>> do
>>>>>>> we
>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"?  I would
>>> favor
>>>>>>>>>> deprecating, since having two mutually exclusive APIs that
>>>>> accomplish
>>>>>>> the
>>>>>>>>>> same thing is confusing, especially when they're fairly similar
>>>>>>> anyway.  We
>>>>>>>>>> just need to be sure we're not making something
>>>>> impossible/difficult
>>>>>>> that
>>>>>>>>>> is currently possible/easy.
>>>>>>>>>>
>>>>>>>>>> Regarding my PR - I think the general structure would work,
>>> it's
>>>>>> just a
>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
>>>>> particular,
>>>>>>>>>> passing in the "predicates" and "children" lists which get
>>>> modified
>>>>>> in
>>>>>>>>>> KBranchedStream but read from all the way KStreamLazyBranch is
>>> a
>>>>> bit
>>>>>>>>>> complicated to follow.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Paul
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
>>>> iponoma...@mail.ru
>>>>>>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Paul!
>>>>>>>>>>>
>>>>>>>>>>> I read your code carefully and now I am fully convinced: your
>>>>>> proposal
>>>>>>>>>>> looks better and should work. We just have to document the
>>>> crucial
>>>>>>> fact
>>>>>>>>>>> that KStream consumers are invoked as they're added. And then
>>>> it's
>>>>>> all
>>>>>>>>>>> going to be very nice.
>>>>>>>>>>>
>>>>>>>>>>> What shall we do now? I should re-write the KIP and resume the
>>>>>>>>>>> discussion here, right?
>>>>>>>>>>>
>>>>>>>>>>> Why are you telling that your PR 'should not be even a
>>> starting
>>>>>> point
>>>>>>> if
>>>>>>>>>>> we go in this direction'? To me it looks like a good starting
>>>>> point.
>>>>>>> But
>>>>>>>>>>> as a novice in this project I might miss some important
>>> details.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>>
>>>>>>>>>>> Ivan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>
>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
>>> stream.branch()
>>>>>>> solution
>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
>>> invoked
>>>> as
>>>>>>> they’re
>>>>>>>>>>> added, not during streamsBuilder.build(). So the user still
>>>> ought
>>>>> to
>>>>>>> be
>>>>>>>>>>> able to call couponIssuer.coupons() afterward and depend on
>>> the
>>>>>>> branched
>>>>>>>>>>> streams having been set.
>>>>>>>>>>>> The issue I mean to point out is that it is hard to access
>>> the
>>>>>>> branched
>>>>>>>>>>> streams in the same scope as the original stream (that is, not
>>>>>> inside
>>>>>>> the
>>>>>>>>>>> couponIssuer), which is a problem with both proposed
>>> solutions.
>>>> It
>>>>>>> can be
>>>>>>>>>>> worked around though.
>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m excited
>>> to
>>>>>> hear
>>>>>>>>>>> your thoughts!]
>>>>>>>>>>>> Paul
>>>>>>>>>>>>
>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
>>>> iponoma...@mail.ru
>>>>>>
>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Paul!
>>>>>>>>>>>>>
>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
>>>>>>>>>>> streamsBuilder.build() also looked great for me at first
>>> glance,
>>>>> but
>>>>>>> ---
>>>>>>>>>>>>>> the newly branched streams are not available in the same
>>>> scope
>>>>> as
>>>>>>> each
>>>>>>>>>>> other.  That is, if we wanted to merge them back together
>>> again
>>>> I
>>>>>>> don't see
>>>>>>>>>>> a way to do that.
>>>>>>>>>>>>> You just took the words right out of my mouth, I was just
>>>> going
>>>>> to
>>>>>>>>>>> write in details about this issue.
>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say we need
>>> to
>>>>>>> identify
>>>>>>>>>>> customers who have bought coffee and made a purchase in the
>>>>>>> electronics
>>>>>>>>>>> store to give them coupons.
>>>>>>>>>>>>> This is the code I usually write under these circumstances
>>>> using
>>>>>> my
>>>>>>>>>>> 'brancher' class:
>>>>>>>>>>>>> @Setter
>>>>>>>>>>>>> class CouponIssuer{
>>>>>>>>>>>>>   private KStream<....> coffePurchases;
>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
>>>>>>>>>>>>>
>>>>>>>>>>>>>   KStream<...> coupons(){
>>>>>>>>>>>>>       return
>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>>>>>>>>>>
>>>>>>>>>>>>>       /*In the real world the code here can be complex, so
>>>>>>> creation of
>>>>>>>>>>> a separate CouponIssuer class is fully justified, in order to
>>>>>> separate
>>>>>>>>>>> classes' responsibilities.*/
>>>>>>>>>>>>>  }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
>>>>>>>>>>>>>
>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>>>>>>>>>>     .branch(predicate1, couponIssuer::setCoffePurchases)
>>>>>>>>>>>>>     .branch(predicate2,
>>>> couponIssuer::setElectronicsPurchases)
>>>>>>>>>>>>>     .onTopOf(transactionStream);
>>>>>>>>>>>>>
>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up everything
>>>>>> later,
>>>>>>>>>>> without the terminal operation!!!*/
>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>>>>>>>>>>
>>>>>>>>>>>>> Does this make sense?  In order to properly initialize the
>>>>>>> CouponIssuer
>>>>>>>>>>> we need the terminal operation to be called before
>>>>>>> streamsBuilder.build()
>>>>>>>>>>> is called.
>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is essentially
>>>> the
>>>>>>> next
>>>>>>>>>>> KIP I was going to write here. I have some thoughts based on
>>> my
>>>>>>> experience,
>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>
>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a fluent
>>> API
>>>>>> based
>>>>>>>>>>> off of
>>>>>>>>>>>>>> KStream here (https://github.com/apache/kafka/pull/6512),
>>>> and
>>>>> I
>>>>>>> think
>>>>>>>>>>> I
>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>>>>>>>>>>>    - Compatibility: I was incorrect earlier about
>>>>> compatibility
>>>>>>>>>>> issues,
>>>>>>>>>>>>>>    there aren't any direct ones.  I was unaware that Java
>>> is
>>>>>> smart
>>>>>>>>>>> enough to
>>>>>>>>>>>>>>    distinguish between a branch(varargs...) returning one
>>>>> thing
>>>>>>> and
>>>>>>>>>>> branch()
>>>>>>>>>>>>>>    with no arguments returning another thing.
>>>>>>>>>>>>>>    - Requiring a terminal method: We don't actually need
>>> it.
>>>>> We
>>>>>>> can
>>>>>>>>>>> just
>>>>>>>>>>>>>>    build up the branches in the KBranchedStream who shares
>>>> its
>>>>>>> state
>>>>>>>>>>> with the
>>>>>>>>>>>>>>    ProcessorSupplier that will actually do the branching.
>>>>> It's
>>>>>>> not
>>>>>>>>>>> terribly
>>>>>>>>>>>>>>    pretty in its current form, but I think it demonstrates
>>>> its
>>>>>>>>>>> feasibility.
>>>>>>>>>>>>>> To be clear, I don't think that pull request should be
>>> final
>>>> or
>>>>>>> even a
>>>>>>>>>>>>>> starting point if we go in this direction, I just wanted to
>>>> see
>>>>>> how
>>>>>>>>>>>>>> challenging it would be to get the API working.
>>>>>>>>>>>>>> I will say though, that I'm not sure the existing solution
>>>>> could
>>>>>> be
>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
>>> suggested
>>>>>> was a
>>>>>>>>>>>>>> possibility.  The reason is that the newly branched streams
>>>> are
>>>>>> not
>>>>>>>>>>>>>> available in the same scope as each other.  That is, if we
>>>>> wanted
>>>>>>> to
>>>>>>>>>>> merge
>>>>>>>>>>>>>> them back together again I don't see a way to do that.  The
>>>> KIP
>>>>>>>>>>> proposal
>>>>>>>>>>>>>> has the same issue, though - all this means is that for
>>>> either
>>>>>>>>>>> solution,
>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the table.
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
>>>>>>> iponoma...@mail.ru>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to this
>>>> point.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that branch API
>>>>> needs
>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...).... //onTopOf returns
>>>> its
>>>>>>> argument
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't
>>> make
>>>>>> sense
>>>>>>>>>>> until
>>>>>>>>>>>>>>> all the necessary ingredients are provided.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance
>>>>>>> contrasts the
>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or noDefault(). Both
>>>>>>> defaultBranch(..)
>>>>>>>>>>> and
>>>>>>>>>>>>>>> noDefault() return void
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface is
>>>> defined.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
>>>>>> (defaultBranch(ks->)
>>>>>>> and
>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to miss the
>>>> fact
>>>>>>> that one
>>>>>>>>>>>>>>> of the terminal methods should be called. If these methods
>>>> are
>>>>>> not
>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do better?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>>>>>>>>>>>>> Paul,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I see your point when you are talking about
>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be implemented the
>>>>> easy
>>>>>>> way.
>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> user could specify a terminal method that assumes
>>> nothing
>>>>>> will
>>>>>>>>>>> reach
>>>>>>>>>>>>>>>>>> the default branch,
>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only option
>>>> besides
>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we want to
>>>> just
>>>>>>> silently
>>>>>>>>>>>>>>>>> drop the messages that didn't match any predicate. 2)
>>>>> Throwing
>>>>>>> an
>>>>>>>>>>>>>>>>> exception in the middle of data flow processing looks
>>>> like a
>>>>>> bad
>>>>>>>>>>> idea.
>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to emit a
>>>>>> special
>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly where
>>>>> `default`
>>>>>>> can
>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>> InternalTopologyBuilder
>>>>> to
>>>>>>> track
>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear
>>>>> error
>>>>>>>>>>> before it
>>>>>>>>>>>>>>>>> becomes an issue.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
>>> compiled
>>>>> and
>>>>>>> run?
>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't compile if
>>> used
>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a method chain
>>>>>> starting
>>>>>>>>>>> from
>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference between
>>>>>> runtime
>>>>>>> and
>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
>>> instantly
>>>> on
>>>>>>> unit
>>>>>>>>>>>>>>>>> tests, it costs more for the project than a compilation
>>>>>> failure.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Good point about the terminal operation being required.
>>>>> But
>>>>>> is
>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't want a
>>>> defaultBranch
>>>>>>> they
>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as
>>>>>>> easily.  In
>>>>>>>>>>>>>>>>>> fact I
>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API - a
>>> user
>>>>>> could
>>>>>>>>>>> specify
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach the
>>>> default
>>>>>>> branch,
>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.  That
>>> seems
>>>>> like
>>>>>>> an
>>>>>>>>>>>>>>>>>> improvement over the current branch() API, which allows
>>>> for
>>>>>> the
>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>> subtle
>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting dropped.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has to be
>>>> well
>>>>>>>>>>>>>>>>>> documented, but
>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>> InternalTopologyBuilder
>>>>> to
>>>>>>> track
>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear
>>>>> error
>>>>>>>>>>> before it
>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that there is a
>>> "build
>>>>>> step"
>>>>>>>>>>> where
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>> StreamsBuilder.build()
>>>>> is
>>>>>>>>>>> called.
>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I agree
>>> that
>>>>> it's
>>>>>>>>>>>>>>>>>> critical to
>>>>>>>>>>>>>>>>>> allow users to do other operations on the input stream.
>>>>> With
>>>>>>> the
>>>>>>>>>>>>>>> fluent
>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all other
>>>>> operations
>>>>>>> do -
>>>>>>>>>>> if
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>> want to process off the original KStream multiple
>>> times,
>>>>> you
>>>>>>> just
>>>>>>>>>>>>>>>>>> need the
>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many operations
>>>> on
>>>>> it
>>>>>>> as
>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>> desire.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
>>>>>>> iponoma...@mail.ru
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not always need
>>>> the
>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal operation we
>>> don't
>>>>>> know
>>>>>>>>>>> when to
>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we
>>> can
>>>> do
>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I understand your point that the need of special
>>> object
>>>>>>>>>>> construction
>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods. But
>>> here
>>>> we
>>>>>>> have a
>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the flow,
>>> so
>>>> I
>>>>>>> think
>>>>>>>>>>> this
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this API, but I
>>>> find
>>>>>> the
>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it contrasts the
>>>>> fluency
>>>>>>> of
>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call
>>> a
>>>>>>> method on
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch cases
>>> are
>>>>>>> defined
>>>>>>>>>>>>>>>>>>>> fluently.
>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very
>>>> nice
>>>>>>> and the
>>>>>>>>>>>>>>>>>>>> right
>>>>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around how we
>>>>> specify
>>>>>>> the
>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>> stream.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Like:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1, this::handle1)
>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2, this::handle2)
>>>>>>>>>>>>>>>>>>>>           .defaultBranch(this::handleDefault);
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
>>>>>> KStreamBrancher
>>>>>>> or
>>>>>>>>>>>>>>>>>>> something,
>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and terminated by
>>>>>>>>>>> defaultBranch()
>>>>>>>>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously incompatible with
>>> the
>>>>>>> current
>>>>>>>>>>>>>>>>>>>> API, so
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a different
>>>> name,
>>>>>> but
>>>>>>> that
>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
>>>> something
>>>>>> like
>>>>>>>>>>>>>>>>>>>> branched()
>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It
>>>> seems
>>>>>>> like it
>>>>>>>>>>>>>>>>>>>> does to
>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching while also
>>>>>> allowing
>>>>>>> you
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams
>>>> if
>>>>>>> desired.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
>>>>>>>>>>>>>>>>>>>>>           ks.filter(....).mapValues(...)
>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
>>>>>>>>>>>>>>>>>>>>>           ks.selectKey(...).groupByKey()...
>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ......
>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1, this::handleFirstCase)
>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2, this::handleSecondCase)
>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher
>>> takes a
>>>>>>> Consumer
>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the example in
>>>> the
>>>>>> KIP
>>>>>>>>>>> shows
>>>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
>>>>>>> (KafkaStreams#to()
>>>>>>>>>>>>>>>>>>>>>> in this
>>>>>>>>>>>>>>>>>>>>>> case).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would we
>>> handle
>>>>> the
>>>>>>> case
>>>>>>>>>>>>>>>>>>>>>> where the
>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to continue
>>>>>> processing
>>>>>>> and
>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the branched
>>>> stream
>>>>>>>>>>> immediately?
>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if we had
>>>>>> something
>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
>>>>>>> bbej...@gmail.com
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP-
>>> 418.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418.
>>> Please
>>>>>> take
>>>>>>> a
>>>>>>>>>>> look
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :)
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>> https://github.com/apache/kafka/pull/6164
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to