What is the status of this KIP? -Matthias
On 7/11/19 4:00 PM, Matthias J. Sax wrote:
> Ivan,
>
> did you see my last reply? What do you think about my proposal to mix
> both approaches and try to get best-of-both worlds?
>
>
> -Matthias
>
> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
>> Thanks for the input John!
>>
>>> under your suggestion, it seems that the name is required
>>
>> If you want to get the `KStream` as part of the `Map` back using a
>> `Function`, yes. If you follow the "embedded chaining" pattern using a
>> `Consumer`, no.
>>
>> Allowing for a default name via `split()` can of course be done.
>> Similarly, using `Named` instead of `String` is possible.
>>
>> I wanted to sketch out a high level proposal to merge both patterns
>> only. Your suggestions to align the new API with the existing API make
>> totally sense.
>>
>>
>>
>> One follow up question: Would `Named` be optional or required in
>> `split()` and `branch()`? It's unclear from your example.
>>
>> If both are mandatory, what do we gain by it? The returned `Map` only
>> contains the corresponding branches, so why should we prefix all of
>> them? If only `Named` is mandatory in `branch()`, but optional in
>> `split()`, the same question raises?
>>
>> Requiring `Named` in `split()` seems only to make sense, if `Named` is
>> optional in `branch()` and we generate `-X` suffix using a counter for
>> different branch name. However, this might lead to the problem of
>> changing names if branches are added/removed. Also, how would the names
>> be generated if `Consumer` is mixed in (ie, not all branches are
>> returned in the `Map`).
>>
>> If `Named` is optional for both, it could happen that a user misses to
>> specify a name for a branch what would lead to runtime issues.
>>
>>
>> Hence, I am actually in favor to not allow a default name but keep
>> `split()` without parameter and make `Named` in `branch()` required if a
>> `Function` is used. This makes it explicit to the user that specifying a
>> name is required if a `Function` is used.
>>
>>
>>
>> About
>>
>>> KBranchedStream#branch(BranchConfig)
>>
>> I don't think that the branching predicate is a configuration and hence
>> would not include it in a configuration object.
>>
>>> withChain(...);
>>
>> Similar, `withChain()` (that would only take a `Consumer`?) does not
>> seem to be a configuration. We can also not prevent a user to call
>> `withName()` in combination of `withChain()` what does not make sense
>> IMHO. We could of course throw an RTE but not have a compile time check
>> seems less appealing. Also, it could happen that neither `withChain()`
>> not `withName()` is called and the branch is missing in the returned
>> `Map` what lead to runtime issues, too.
>>
>> Hence, I don't think that we should add `BranchConfig`. A config object
>> is helpful if each configuration can be set independently of all others,
>> but this seems not to be the case here. If we add new configuration
>> later, we can also just move forward by deprecating the methods that
>> accept `Named` and add new methods that accepted `BranchConfig` (that
>> would of course implement `Named`).
>>
>>
>> Thoughts?
>>
>>
>> @Ivan, what do you think about the general idea to blend the two main
>> approaches of returning a `Map` plus an "embedded chaining"?
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/4/19 10:33 AM, John Roesler wrote:
>>> Thanks for the idea, Matthias, it does seem like this would satisfy
>>> everyone. Returning the map from the terminal operations also solves
>>> the problem of merging/joining the branched streams, if we want to add
>>> support for the compliment later on.
>>>
>>> Under your suggestion, it seems that the name is required. Otherwise,
>>> we wouldn't have keys for the map to return. I this this is actually
>>> not too bad, since experience has taught us that, although names for
>>> operations are not required to define stream processing logic, it does
>>> significantly improve the operational experience when you can map the
>>> topology, logs, metrics, etc. back to the source code. Since you
>>> wouldn't (have to) reference the name to chain extra processing onto
>>> the branch (thanks to the second argument), you can avoid the
>>> "unchecked name" problem that Ivan pointed out.
>>>
>>> In the current implementation of Branch, you can name the branch
>>> operator itself, and then all the branches get index-suffixed names
>>> built from the branch operator name. I guess under this proposal, we
>>> could naturally append the branch name to the branching operator name,
>>> like this:
>>>
>>> stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>> .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>>> .defaultBranch(...) // creates node "mysplit-default"
>>>
>>> It does make me wonder about the DSL syntax itself, though.
>>>
>>> We don't have a defined grammar, so there's plenty of room to debate
>>> the "best" syntax in the context of each operation, but in general,
>>> the KStream DSL operators follow this pattern:
>>>
>>> operator(function, config_object?) OR operator(config_object)
>>>
>>> where config_object is often just Named in the "function" variant.
>>> Even when the config_object isn't a Named, but some other config
>>> class, that config class _always_ implements NamedOperation.
>>>
>>> Here, we're introducing a totally different pattern:
>>>
>>> operator(function, function, string)
>>>
>>> where the string is the name.
>>> My first question is whether the name should instead be specified with
>>> the NamedOperation interface.
>>>
>>> My second question is whether we should just roll all these arguments
>>> up into a config object like:
>>>
>>> KBranchedStream#branch(BranchConfig)
>>>
>>> interface BranchConfig extends NamedOperation {
>>> withPredicate(...);
>>> withChain(...);
>>> withName(...);
>>> }
>>>
>>> Although I guess we'd like to call BranchConfig something more like
>>> "Branched", even if I don't particularly like that pattern.
>>>
>>> This makes the source code a little noisier, but it also makes us more
>>> future-proof, as we can deal with a wide range of alternatives purely
>>> in the config interface, and never have to deal with adding overloads
>>> to the KBranchedStream if/when we decide we want the name to be
>>> optional, or the KStream->KStream to be optional.
>>>
>>> WDYT?
>>>
>>> Thanks,
>>> -John
>>>
>>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
>>> <[email protected]> wrote:
>>>>
>>>> Matthias: I think that's pretty reasonable from my point of view. Good
>>>> suggestion.
>>>>
>>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax <[email protected]>
>>>> wrote:
>>>>
>>>>> Interesting discussion.
>>>>>
>>>>> I am wondering, if we cannot unify the advantage of both approaches:
>>>>>
>>>>>
>>>>>
>>>>> KStream#split() -> KBranchedStream
>>>>>
>>>>> // branch is not easily accessible in current scope
>>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
>>>>> -> KBranchedStream
>>>>>
>>>>> // assign a name to the branch and
>>>>> // return the sub-stream to the current scope later
>>>>> //
>>>>> // can be simple as `#branch(p, s->s, "name")`
>>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
>>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>>>>> -> KBranchedStream
>>>>>
>>>>> // default branch is not easily accessible
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#default(Cosumer<KStream>)
>>>>> -> Map<String,KStream>
>>>>>
>>>>> // assign custom name to default-branch
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#default(Function<KStream,KStream>, String)
>>>>> -> Map<String,KStream>
>>>>>
>>>>> // assign a default name for default
>>>>> // return map of all named sub-stream into current scope
>>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>>>>> -> Map<String,KStream>
>>>>>
>>>>> // return map of all names sub-stream into current scope
>>>>> KBranchedStream#noDefaultBranch()
>>>>> -> Map<String,KStream>
>>>>>
>>>>>
>>>>>
>>>>> Hence, for each sub-stream, the user can pick to add a name and return
>>>>> the branch "result" to the calling scope or not. The implementation can
>>>>> also check at runtime that all returned names are unique. The returned
>>>>> Map can be empty and it's also optional to use the Map.
>>>>>
>>>>> To me, it seems like a good way to get best of both worlds.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 5/6/19 5:15 PM, John Roesler wrote:
>>>>>> Ivan,
>>>>>>
>>>>>> That's a very good point about the "start" operator in the dynamic case.
>>>>>> I had no problem with "split()"; I was just questioning the necessity.
>>>>>> Since you've provided a proof of necessity, I'm in favor of the
>>>>>> "split()" start operator. Thanks!
>>>>>>
>>>>>> Separately, I'm interested to see where the present discussion leads.
>>>>>> I've written enough Javascript code in my life to be suspicious of
>>>>>> nested closures. You have a good point about using method references (or
>>>>>> indeed function literals also work). It should be validating that this
>>>>>> was also the JS community's first approach to flattening the logic when
>>>>>> their nested closure situation got out of hand. Unfortunately, it's
>>>>>> replacing nesting with redirection, both of which disrupt code
>>>>>> readability (but in different ways for different reasons). In other
>>>>>> words, I agree that function references is *the* first-order solution if
>>>>>> the nested code does indeed become a problem.
>>>>>>
>>>>>> However, the history of JS also tells us that function references aren't
>>>>>> the end of the story either, and you can see that by observing that
>>>>>> there have been two follow-on eras, as they continue trying to cope with
>>>>>> the consequences of living in such a callback-heavy language. First, you
>>>>>> have Futures/Promises, which essentially let you convert nested code to
>>>>>> method-chained code (Observables/FP is a popular variation on this).
>>>>>> Most lately, you have async/await, which is an effort to apply language
>>>>>> (not just API) syntax to the problem, and offer the "flattest" possible
>>>>>> programming style to solve the problem (because you get back to just one
>>>>>> code block per functional unit).
>>>>>>
>>>>>> Stream-processing is a different domain, and Java+KStreams is nowhere
>>>>>> near as callback heavy as JS, so I don't think we have to take the JS
>>>>>> story for granted, but then again, I think we can derive some valuable
>>>>>> lessons by looking sideways to adjacent domains. I'm just bringing this
>>>>>> up to inspire further/deeper discussion. At the same time, just like JS,
>>>>>> we can afford to take an iterative approach to the problem.
>>>>>>
>>>>>> Separately again, I'm interested in the post-branch merge (and I'd also
>>>>>> add join) problem that Paul brought up. We can clearly punt on it, by
>>>>>> terminating the nested branches with sink operators. But is there a DSL
>>>>>> way to do it?
>>>>>>
>>>>>> Thanks again for your driving this,
>>>>>> -John
>>>>>>
>>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <[email protected]
>>>>>> <mailto:[email protected]>> wrote:
>>>>>>
>>>>>> Ivan, I’ll definitely forfeit my point on the clumsiness of the
>>>>>> branch(predicate, consumer) solution, I don’t see any real drawbacks
>>>>>> for the dynamic case.
>>>>>>
>>>>>> IMO the one trade off to consider at this point is the scope
>>>>>> question. I don’t know if I totally agree that “we rarely need them
>>>>>> in the same scope” since merging the branches back together later
>>>>>> seems like a perfectly plausible use case that can be a lot nicer
>>>>>> when the branched streams are in the same scope. That being said,
>>>>>> for the reasons Ivan listed, I think it is overall the better
>>>>>> solution - working around the scope thing is easy enough if you need
>>>>>> to.
>>>>>>
>>>>>> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
>>>>>> <[email protected]> wrote:
>>>>>> >
>>>>>> > Hello everyone, thank you all for joining the discussion!
>>>>>> >
>>>>>> > Well, I don't think the idea of named branches, be it a
>>>>>> LinkedHashMap (no other Map will do, because order of definition
>>>>>> matters) or `branch` method taking name and Consumer has more
>>>>>> advantages than drawbacks.
>>>>>> >
>>>>>> > In my opinion, the only real positive outcome from Michael's
>>>>>> proposal is that all the returned branches are in the same scope.
>>>>>> But 1) we rarely need them in the same scope 2) there is a
>>>>>> workaround for the scope problem, described in the KIP.
>>>>>> >
>>>>>> > 'Inlining the complex logic' is not a problem, because we can use
>>>>>> method references instead of lambdas. In real world scenarios you
>>>>>> tend to split the complex logic to methods anyway, so the code is
>>>>>> going to be clean.
>>>>>> >
>>>>>> > The drawbacks are strong. The cohesion between predicates and
>>>>>> handlers is lost. We have to define predicates in one place, and
>>>>>> handlers in another. This opens the door for bugs:
>>>>>> >
>>>>>> > - what if we forget to define a handler for a name? or a name for
>>>>>> a handler?
>>>>>> > - what if we misspell a name?
>>>>>> > - what if we copy-paste and duplicate a name?
>>>>>> >
>>>>>> > What Michael propose would have been totally OK if we had been
>>>>>> writing the API in Lua, Ruby or Python. In those languages the
>>>>>> "dynamic naming" approach would have looked most concise and
>>>>>> beautiful. But in Java we expect all the problems related to
>>>>>> identifiers to be eliminated in compile time.
>>>>>> >
>>>>>> > Do we have to invent duck-typing for the Java API?
>>>>>> >
>>>>>> > And if we do, what advantage are we supposed to get besides having
>>>>>> all the branches in the same scope? Michael, maybe I'm missing your
>>>>>> point?
>>>>>> >
>>>>>> > ---
>>>>>> >
>>>>>> > Earlier in this discussion John Roesler also proposed to do
>>>>>> without "start branching" operator, and later Paul mentioned that in
>>>>>> the case when we have to add a dynamic number of branches, the
>>>>>> current KIP is 'clumsier' compared to Michael's 'Map' solution. Let
>>>>>> me address both comments here.
>>>>>> >
>>>>>> > 1) "Start branching" operator (I think that *split* is a good name
>>>>>> for it indeed) is critical when we need to do a dynamic branching,
>>>>>> see example below.
>>>>>> >
>>>>>> > 2) No, dynamic branching in current KIP is not clumsy at all.
>>>>>> Imagine a real-world scenario when you need one branch per enum
>>>>>> value (say, RecordType). You can have something like this:
>>>>>> >
>>>>>> > /*John:if we had to start with stream.branch(...) here, it would
>>>>>> have been much messier.*/
>>>>>> > KBranchedStream branched = stream.split();
>>>>>> >
>>>>>> > /*Not clumsy at all :-)*/
>>>>>> > for (RecordType recordType : RecordType.values())
>>>>>> > branched = branched.branch((k, v) -> v.getRecType() ==
>>>>>> recordType,
>>>>>> > recordType::processRecords);
>>>>>> >
>>>>>> > Regards,
>>>>>> >
>>>>>> > Ivan
>>>>>> >
>>>>>> >
>>>>>> > 02.05.2019 14:40, Matthias J. Sax пишет:
>>>>>> >> I also agree with Michael's observation about the core problem of
>>>>>> >> current `branch()` implementation.
>>>>>> >>
>>>>>> >> However, I also don't like to pass in a clumsy Map object. My
>>>>>> thinking
>>>>>> >> was more aligned with Paul's proposal to just add a name to each
>>>>>> >> `branch()` statement and return a `Map<String,KStream>`.
>>>>>> >>
>>>>>> >> It makes the code easier to read, and also make the order of
>>>>>> >> `Predicates` (that is essential) easier to grasp.
>>>>>> >>
>>>>>> >>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>> >>>>>> .branch("branchOne", Predicate<K, V>)
>>>>>> >>>>>> .branch( "branchTwo", Predicate<K, V>)
>>>>>> >>>>>> .defaultBranch("defaultBranch");
>>>>>> >> An open question is the case for which no defaultBranch() should
>>>>> be
>>>>>> >> specified. Atm, `split()` and `branch()` would return
>>>>>> `BranchedKStream`
>>>>>> >> and the call to `defaultBranch()` that returns the `Map` is
>>>>> mandatory
>>>>>> >> (what is not the case atm). Or is this actually not a real
>>>>> problem,
>>>>>> >> because users can just ignore the branch returned by
>>>>>> `defaultBranch()`
>>>>>> >> in the result `Map` ?
>>>>>> >>
>>>>>> >>
>>>>>> >> About "inlining": So far, it seems to be a matter of personal
>>>>>> >> preference. I can see arguments for both, but no "killer
>>>>>> argument" yet
>>>>>> >> that clearly make the case for one or the other.
>>>>>> >>
>>>>>> >>
>>>>>> >> -Matthias
>>>>>> >>
>>>>>> >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>>>>> >>> Perhaps inlining is the wrong terminology. It doesn’t require
>>>>>> that a lambda with the full downstream topology be defined inline -
>>>>>> it can be a method reference as with Ivan’s original suggestion.
>>>>>> The advantage of putting the predicate and its downstream logic
>>>>>> (Consumer) together in branch() is that they are required to be near
>>>>>> to each other.
>>>>>> >>>
>>>>>> >>> Ultimately the downstream code has to live somewhere, and deep
>>>>>> branch trees will be hard to read regardless.
>>>>>> >>>
>>>>>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
>>>>>> <[email protected]
>>>>>> <mailto:[email protected]>> wrote:
>>>>>> >>>>
>>>>>> >>>> I'm less enthusiastic about inlining the branch logic with its
>>>>>> downstream
>>>>>> >>>> functionality. Programs that have deep branch trees will
>>>>>> quickly become
>>>>>> >>>> harder to read as a single unit.
>>>>>> >>>>
>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
>>>>>> <[email protected] <mailto:[email protected]>> wrote:
>>>>>> >>>>>
>>>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, I think
>>>>>> that sets a
>>>>>> >>>>> great framework for the discussion.
>>>>>> >>>>>
>>>>>> >>>>> Regarding the SortedMap solution, my understanding is that the
>>>>>> current
>>>>>> >>>>> proposal in the KIP is what is in my PR which (pending naming
>>>>>> decisions) is
>>>>>> >>>>> roughly this:
>>>>>> >>>>>
>>>>>> >>>>> stream.split()
>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>> >>>>> .defaultBranch(Consumer<KStream<K, V>>);
>>>>>> >>>>>
>>>>>> >>>>> Obviously some ordering is necessary, since branching as a
>>>>>> construct
>>>>>> >>>>> doesn't work without it, but this solution seems like it
>>>>>> provides as much
>>>>>> >>>>> associativity as the SortedMap solution, because each branch()
>>>>>> call
>>>>>> >>>>> directly associates the "conditional" with the "code block."
>>>>>> The value it
>>>>>> >>>>> provides over the KIP solution is the accessing of streams in
>>>>>> the same
>>>>>> >>>>> scope.
>>>>>> >>>>>
>>>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap solution
>>>>>> in the sense
>>>>>> >>>>> that it is slightly clumsier to add a dynamic number of
>>>>>> branches, but it is
>>>>>> >>>>> certainly possible. It seems to me like the API should favor
>>>>>> the "static"
>>>>>> >>>>> case anyway, and should make it simple and readable to
>>>>>> fluently declare and
>>>>>> >>>>> access your branches in-line. It also makes it impossible to
>>>>>> ignore a
>>>>>> >>>>> branch, and it is possible to build an (almost) identical
>>>>>> SortedMap
>>>>>> >>>>> solution on top of it.
>>>>>> >>>>>
>>>>>> >>>>> I could also see a middle ground where instead of a raw
>>>>>> SortedMap being
>>>>>> >>>>> taken in, branch() takes a name and not a Consumer. Something
>>>>>> like this:
>>>>>> >>>>>
>>>>>> >>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>> >>>>> .branch("branchOne", Predicate<K, V>)
>>>>>> >>>>> .branch( "branchTwo", Predicate<K, V>)
>>>>>> >>>>> .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
>>>>>> >>>>>
>>>>>> >>>>> Pros for that solution:
>>>>>> >>>>> - accessing branched KStreams in same scope
>>>>>> >>>>> - no double brace initialization, hopefully slightly more
>>>>>> readable than
>>>>>> >>>>> SortedMap
>>>>>> >>>>>
>>>>>> >>>>> Cons
>>>>>> >>>>> - downstream branch logic cannot be specified inline which
>>>>>> makes it harder
>>>>>> >>>>> to read top to bottom (like existing API and SortedMap, but
>>>>>> unlike the KIP)
>>>>>> >>>>> - you can forget to "handle" one of the branched streams (like
>>>>>> existing
>>>>>> >>>>> API and SortedMap, but unlike the KIP)
>>>>>> >>>>>
>>>>>> >>>>> (KBranchedStreams could even work *both* ways but perhaps
>>>>>> that's overdoing
>>>>>> >>>>> it).
>>>>>> >>>>>
>>>>>> >>>>> Overall I'm curious how important it is to be able to easily
>>>>>> access the
>>>>>> >>>>> branched KStream in the same scope as the original. It's
>>>>>> possible that it
>>>>>> >>>>> doesn't need to be handled directly by the API, but instead
>>>>>> left up to the
>>>>>> >>>>> user. I'm sort of in the middle on it.
>>>>>> >>>>>
>>>>>> >>>>> Paul
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
>>>>>> <[email protected] <mailto:[email protected]>>
>>>>>> >>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>>> I'd like to +1 what Michael said about the issues with the
>>>>>> existing
>>>>>> >>>>> branch
>>>>>> >>>>>> method, I agree with what he's outlined and I think we should
>>>>>> proceed by
>>>>>> >>>>>> trying to alleviate these problems. Specifically it seems
>>>>>> important to be
>>>>>> >>>>>> able to cleanly access the individual branches (eg by mapping
>>>>>> >>>>>> name->stream), which I thought was the original intention of
>>>>>> this KIP.
>>>>>> >>>>>>
>>>>>> >>>>>> That said, I don't think we should so easily give in to the
>>>>>> double brace
>>>>>> >>>>>> anti-pattern or force ours users into it if at all possible to
>>>>>> >>>>> avoid...just
>>>>>> >>>>>> my two cents.
>>>>>> >>>>>>
>>>>>> >>>>>> Cheers,
>>>>>> >>>>>> Sophie
>>>>>> >>>>>>
>>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>>>> >>>>>> [email protected]
>>>>>> <mailto:[email protected]>> wrote:
>>>>>> >>>>>>
>>>>>> >>>>>>> I’d like to propose a different way of thinking about this.
>>>>>> To me,
>>>>>> >>>>> there
>>>>>> >>>>>>> are three problems with the existing branch signature:
>>>>>> >>>>>>>
>>>>>> >>>>>>> 1. If you use it the way most people do, Java raises unsafe
>>>>> type
>>>>>> >>>>>> warnings.
>>>>>> >>>>>>> 2. The way in which you use the stream branches is
>>>>>> positionally coupled
>>>>>> >>>>>> to
>>>>>> >>>>>>> the ordering of the conditionals.
>>>>>> >>>>>>> 3. It is brittle to extend existing branch calls with
>>>>>> additional code
>>>>>> >>>>>>> paths.
>>>>>> >>>>>>>
>>>>>> >>>>>>> Using associative constructs instead of relying on ordered
>>>>>> constructs
>>>>>> >>>>>> would
>>>>>> >>>>>>> be a stronger approach. Consider a signature that instead
>>>>>> looks like
>>>>>> >>>>>> this:
>>>>>> >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String,
>>>>>> Predicate<?
>>>>>> >>>>>>> super K,? super V>>);
>>>>>> >>>>>>>
>>>>>> >>>>>>> Branches are given names in a map, and as a result, the API
>>>>>> returns a
>>>>>> >>>>>>> mapping of names to streams. The ordering of the
>>>>> conditionals is
>>>>>> >>>>>> maintained
>>>>>> >>>>>>> because it’s a sorted map. Insert order determines the order
>>>>> of
>>>>>> >>>>>> evaluation.
>>>>>> >>>>>>> This solves problem 1 because there are no more varargs. It
>>>>>> solves
>>>>>> >>>>>> problem
>>>>>> >>>>>>> 2 because you no longer lean on ordering to access the
>>>>>> branch you’re
>>>>>> >>>>>>> interested in. It solves problem 3 because you can introduce
>>>>>> another
>>>>>> >>>>>>> conditional by simply attaching another name to the
>>>>>> structure, rather
>>>>>> >>>>>> than
>>>>>> >>>>>>> messing with the existing indices.
>>>>>> >>>>>>>
>>>>>> >>>>>>> One of the drawbacks is that creating the map inline is
>>>>>> historically
>>>>>> >>>>>>> awkward in Java. I know it’s an anti-pattern to use
>>>>>> voluminously, but
>>>>>> >>>>>>> double brace initialization would clean up the aesthetics.
>>>>>> >>>>>>>
>>>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
>>>>>> <[email protected] <mailto:[email protected]>>
>>>>>> >>>>> wrote:
>>>>>> >>>>>>>> Hi Ivan,
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> Thanks for the update.
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> FWIW, I agree with Matthias that the current "start
>>>>> branching"
>>>>>> >>>>> operator
>>>>>> >>>>>>> is
>>>>>> >>>>>>>> confusing when named the same way as the actual branches.
>>>>>> "Split"
>>>>>> >>>>> seems
>>>>>> >>>>>>>> like a good name. Alternatively, we can do without a "start
>>>>>> >>>>> branching"
>>>>>> >>>>>>>> operator at all, and just do:
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> stream
>>>>>> >>>>>>>> .branch(Predicate)
>>>>>> >>>>>>>> .branch(Predicate)
>>>>>> >>>>>>>> .defaultBranch();
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> Tentatively, I think that this branching operation should be
>>>>>> >>>>> terminal.
>>>>>> >>>>>>> That
>>>>>> >>>>>>>> way, we don't create ambiguity about how to use it. That
>>>>>> is, `branch`
>>>>>> >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is
>>>>>> `void`, to
>>>>>> >>>>>>>> enforce that it comes last, and that there is only one
>>>>>> definition of
>>>>>> >>>>>> the
>>>>>> >>>>>>>> default branch. Potentially, we should log a warning if
>>>>>> there's no
>>>>>> >>>>>>> default,
>>>>>> >>>>>>>> and additionally log a warning (or throw an exception) if a
>>>>>> record
>>>>>> >>>>>> falls
>>>>>> >>>>>>>> though with no default.
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> Thoughts?
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> Thanks,
>>>>>> >>>>>>>> -John
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>>>>> >>>>> [email protected] <mailto:[email protected]>
>>>>>> >>>>>>>> wrote:
>>>>>> >>>>>>>>
>>>>>> >>>>>>>>> Thanks for updating the KIP and your answers.
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>> this is to make the name similar to String#split
>>>>>> >>>>>>>>>>> that also returns an array, right?
>>>>>> >>>>>>>>> The intend was to avoid name duplication. The return type
>>>>>> should
>>>>>> >>>>>> _not_
>>>>>> >>>>>>>>> be an array.
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> The current proposal is
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> stream.branch()
>>>>>> >>>>>>>>> .branch(Predicate)
>>>>>> >>>>>>>>> .branch(Predicate)
>>>>>> >>>>>>>>> .defaultBranch();
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> IMHO, this reads a little odd, because the first
>>>>>> `branch()` does
>>>>>> >>>>> not
>>>>>> >>>>>>>>> take any parameters and has different semantics than the
>>>>> later
>>>>>> >>>>>>>>> `branch()` calls. Note, that from the code snippet above,
>>>>> it's
>>>>>> >>>>> hidden
>>>>>> >>>>>>>>> that the first call is `KStream#branch()` while the others
>>>>> are
>>>>>> >>>>>>>>> `KBranchedStream#branch()` what makes reading the code
>>>>> harder.
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`,
>>>>>> I though
>>>>>> >>>>>> it
>>>>>> >>>>>>>>> might be better to also rename `KStream#branch()` to avoid
>>>>> the
>>>>>> >>>>> naming
>>>>>> >>>>>>>>> overlap that seems to be confusing. The following reads
>>>>> much
>>>>>> >>>>> cleaner
>>>>>> >>>>>> to
>>>>>> >>>>>>>> me:
>>>>>> >>>>>>>>> stream.split()
>>>>>> >>>>>>>>> .branch(Predicate)
>>>>>> >>>>>>>>> .branch(Predicate)
>>>>>> >>>>>>>>> .defaultBranch();
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> Maybe there is a better alternative to `split()` though to
>>>>>> avoid
>>>>>> >>>>> the
>>>>>> >>>>>>>>> naming overlap.
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately
>>>>> we
>>>>>> >>>>> cannot
>>>>>> >>>>>>> have
>>>>>> >>>>>>>>> a method with such name :-)
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up
>>>>>> with a
>>>>>> >>>>> short
>>>>>> >>>>>>>> name?
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP
>>>>>> with all
>>>>>> >>>>> it's
>>>>>> >>>>>>>>> methods? It will be part of public API and should be
>>>>>> contained in
>>>>>> >>>>> the
>>>>>> >>>>>>>>> KIP. For example, it's unclear atm, what the return type of
>>>>>> >>>>>>>>> `defaultBranch()` is.
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> You did not comment on the idea to add a
>>>>>> `KBranchedStream#get(int
>>>>>> >>>>>>> index)
>>>>>> >>>>>>>>> -> KStream` method to get the individually
>>>>>> branched-KStreams. Would
>>>>>> >>>>>> be
>>>>>> >>>>>>>>> nice to get your feedback about it. It seems you suggest
>>>>>> that users
>>>>>> >>>>>>>>> would need to write custom utility code otherwise, to
>>>>>> access them.
>>>>>> >>>>> We
>>>>>> >>>>>>>>> should discuss the pros and cons of both approaches. It
>>>>> feels
>>>>>> >>>>>>>>> "incomplete" to me atm, if the API has no built-in support
>>>>>> to get
>>>>>> >>>>> the
>>>>>> >>>>>>>>> branched-KStreams directly.
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> -Matthias
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>>> >>>>>>>>>> Hi all!
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> I have updated the KIP-418 according to the new vision.
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> Matthias, thanks for your comment!
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>>> >>>>>>>>>> I can see your point: this is to make the name similar to
>>>>>> >>>>>>> String#split
>>>>>> >>>>>>>>>> that also returns an array, right? But is it worth the
>>>>>> loss of
>>>>>> >>>>>>>> backwards
>>>>>> >>>>>>>>>> compatibility? We can have overloaded branch() as well
>>>>>> without
>>>>>> >>>>>>>> affecting
>>>>>> >>>>>>>>>> the existing code. Maybe the old array-based `branch`
>>>>> method
>>>>>> >>>>> should
>>>>>> >>>>>>> be
>>>>>> >>>>>>>>>> deprecated, but this is a subject for discussion.
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>>>>> >>>>> BranchingKStream#branch(),
>>>>>> >>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>> BranchingKStream#default()
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default'
>>>>> is,
>>>>>> >>>>>>> however, a
>>>>>> >>>>>>>>>> reserved word, so unfortunately we cannot have a method
>>>>>> with such
>>>>>> >>>>>>> name
>>>>>> >>>>>>>>> :-)
>>>>>> >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument,
>>>>> but I
>>>>>> >>>>> think
>>>>>> >>>>>>> that
>>>>>> >>>>>>>>>> is not required?
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste error or
>>>>>> something.
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> Dear colleagues,
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> please revise the new version of the KIP and Paul's PR
>>>>>> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> Any new suggestions/objections?
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> Ivan
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>>> >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems
>>>>> that
>>>>>> >>>>>>> everybody
>>>>>> >>>>>>>>>>> agrees that the current branch() method using arrays is
>>>>> not
>>>>>> >>>>>> optimal.
>>>>>> >>>>>>>>>>> I had a quick look into the PR and I like the overall
>>>>>> proposal.
>>>>>> >>>>>>> There
>>>>>> >>>>>>>>>>> are some minor things we need to consider. I would
>>>>>> recommend the
>>>>>> >>>>>>>>>>> following renaming:
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> KStream#branch() -> #split()
>>>>>> >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
>>>>>> >>>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>>> BranchingKStream#default()
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter method
>>>>> names.
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an
>>>>>> `Predicate` as
>>>>>> >>>>>>>> argument,
>>>>>> >>>>>>>>>>> but I think that is not required?
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was recently
>>>>>> accepted and
>>>>>> >>>>>> is
>>>>>> >>>>>>>>>>> currently implemented:
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>
>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>> >>>>>>>>>>> Ie, we should add overloads that accepted a `Named`
>>>>>> parameter.
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> For the issue that the created `KStream` object are in
>>>>>> different
>>>>>> >>>>>>>> scopes:
>>>>>> >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
>>>>>> index)` method
>>>>>> >>>>>>> that
>>>>>> >>>>>>>>>>> returns the corresponding "branched" result `KStream`
>>>>>> object?
>>>>>> >>>>>> Maybe,
>>>>>> >>>>>>>> the
>>>>>> >>>>>>>>>>> second argument of `addBranch()` should not be a
>>>>>> >>>>>> `Consumer<KStream>`
>>>>>> >>>>>>>> but
>>>>>> >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return
>>>>>> whatever
>>>>>> >>>>>> the
>>>>>> >>>>>>>>>>> `Function` returns?
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP with the
>>>>>> current
>>>>>> >>>>>>>>>>> proposal. That makes it easier to review.
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> -Matthias
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>>> >>>>>>>>>>>> Ivan,
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it
>>>>>> makes sense
>>>>>> >>>>>> for
>>>>>> >>>>>>>> you
>>>>>> >>>>>>>>> to
>>>>>> >>>>>>>>>>>> revise the KIP and continue the discussion. Obviously
>>>>>> we'll
>>>>>> >>>>> need
>>>>>> >>>>>>>> some
>>>>>> >>>>>>>>>>>> buy-in from committers that have actual binding votes on
>>>>>> >>>>> whether
>>>>>> >>>>>>> the
>>>>>> >>>>>>>>> KIP
>>>>>> >>>>>>>>>>>> could be adopted. It would be great to hear if they
>>>>>> think this
>>>>>> >>>>>> is
>>>>>> >>>>>>> a
>>>>>> >>>>>>>>> good
>>>>>> >>>>>>>>>>>> idea overall. I'm not sure if that happens just by
>>>>>> starting a
>>>>>> >>>>>>> vote,
>>>>>> >>>>>>>>> or if
>>>>>> >>>>>>>>>>>> there is generally some indication of interest
>>>>> beforehand.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> That being said, I'll continue the discussion a bit:
>>>>>> assuming
>>>>>> >>>>> we
>>>>>> >>>>>> do
>>>>>> >>>>>>>>> move
>>>>>> >>>>>>>>>>>> forward the solution of "stream.branch() returns
>>>>>> >>>>>> KBranchedStream",
>>>>>> >>>>>>> do
>>>>>> >>>>>>>>> we
>>>>>> >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? I
>>>>> would
>>>>>> >>>>> favor
>>>>>> >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs
>>>>> that
>>>>>> >>>>>>> accomplish
>>>>>> >>>>>>>>> the
>>>>>> >>>>>>>>>>>> same thing is confusing, especially when they're fairly
>>>>>> similar
>>>>>> >>>>>>>>> anyway. We
>>>>>> >>>>>>>>>>>> just need to be sure we're not making something
>>>>>> >>>>>>> impossible/difficult
>>>>>> >>>>>>>>> that
>>>>>> >>>>>>>>>>>> is currently possible/easy.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> Regarding my PR - I think the general structure would
>>>>> work,
>>>>>> >>>>> it's
>>>>>> >>>>>>>> just a
>>>>>> >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
>>>>>> >>>>>>> particular,
>>>>>> >>>>>>>>>>>> passing in the "predicates" and "children" lists which
>>>>> get
>>>>>> >>>>>> modified
>>>>>> >>>>>>>> in
>>>>>> >>>>>>>>>>>> KBranchedStream but read from all the way
>>>>>> KStreamLazyBranch is
>>>>>> >>>>> a
>>>>>> >>>>>>> bit
>>>>>> >>>>>>>>>>>> complicated to follow.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> Thanks,
>>>>>> >>>>>>>>>>>> Paul
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
>>>>>> >>>>>> [email protected] <mailto:[email protected]>
>>>>>> >>>>>>>>> wrote:
>>>>>> >>>>>>>>>>>>> Hi Paul!
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully
>>>>>> convinced: your
>>>>>> >>>>>>>> proposal
>>>>>> >>>>>>>>>>>>> looks better and should work. We just have to document
>>>>> the
>>>>>> >>>>>> crucial
>>>>>> >>>>>>>>> fact
>>>>>> >>>>>>>>>>>>> that KStream consumers are invoked as they're added.
>>>>>> And then
>>>>>> >>>>>> it's
>>>>>> >>>>>>>> all
>>>>>> >>>>>>>>>>>>> going to be very nice.
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and
>>>>>> resume the
>>>>>> >>>>>>>>>>>>> discussion here, right?
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
>>>>>> >>>>> starting
>>>>>> >>>>>>>> point
>>>>>> >>>>>>>>> if
>>>>>> >>>>>>>>>>>>> we go in this direction'? To me it looks like a good
>>>>>> starting
>>>>>> >>>>>>> point.
>>>>>> >>>>>>>>> But
>>>>>> >>>>>>>>>>>>> as a novice in this project I might miss some important
>>>>>> >>>>> details.
>>>>>> >>>>>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>> Ivan
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>>> >>>>>>>>>>>>>> Ivan,
>>>>>> >>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
>>>>>> >>>>> stream.branch()
>>>>>> >>>>>>>>> solution
>>>>>> >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
>>>>>> >>>>> invoked
>>>>>> >>>>>> as
>>>>>> >>>>>>>>> they’re
>>>>>> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user
>>>>>> still
>>>>>> >>>>>> ought
>>>>>> >>>>>>> to
>>>>>> >>>>>>>>> be
>>>>>> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and
>>>>>> depend on
>>>>>> >>>>> the
>>>>>> >>>>>>>>> branched
>>>>>> >>>>>>>>>>>>> streams having been set.
>>>>>> >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to
>>>>>> access
>>>>>> >>>>> the
>>>>>> >>>>>>>>> branched
>>>>>> >>>>>>>>>>>>> streams in the same scope as the original stream (that
>>>>>> is, not
>>>>>> >>>>>>>> inside
>>>>>> >>>>>>>>> the
>>>>>> >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
>>>>>> >>>>> solutions.
>>>>>> >>>>>> It
>>>>>> >>>>>>>>> can be
>>>>>> >>>>>>>>>>>>> worked around though.
>>>>>> >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m
>>>>>> excited
>>>>>> >>>>> to
>>>>>> >>>>>>>> hear
>>>>>> >>>>>>>>>>>>> your thoughts!]
>>>>>> >>>>>>>>>>>>>> Paul
>>>>>> >>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
>>>>>> >>>>>> [email protected] <mailto:[email protected]>
>>>>>> >>>>>>>>> wrote:
>>>>>> >>>>>>>>>>>>>>> Hi Paul!
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
>>>>>> >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at
>>>>> first
>>>>>> >>>>> glance,
>>>>>> >>>>>>> but
>>>>>> >>>>>>>>> ---
>>>>>> >>>>>>>>>>>>>>>> the newly branched streams are not available in the
>>>>>> same
>>>>>> >>>>>> scope
>>>>>> >>>>>>> as
>>>>>> >>>>>>>>> each
>>>>>> >>>>>>>>>>>>> other. That is, if we wanted to merge them back
>>>>> together
>>>>>> >>>>> again
>>>>>> >>>>>> I
>>>>>> >>>>>>>>> don't see
>>>>>> >>>>>>>>>>>>> a way to do that.
>>>>>> >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was
>>>>>> just
>>>>>> >>>>>> going
>>>>>> >>>>>>> to
>>>>>> >>>>>>>>>>>>> write in details about this issue.
>>>>>> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say
>>>>>> we need
>>>>>> >>>>> to
>>>>>> >>>>>>>>> identify
>>>>>> >>>>>>>>>>>>> customers who have bought coffee and made a purchase
>>>>>> in the
>>>>>> >>>>>>>>> electronics
>>>>>> >>>>>>>>>>>>> store to give them coupons.
>>>>>> >>>>>>>>>>>>>>> This is the code I usually write under these
>>>>>> circumstances
>>>>>> >>>>>> using
>>>>>> >>>>>>>> my
>>>>>> >>>>>>>>>>>>> 'brancher' class:
>>>>>> >>>>>>>>>>>>>>> @Setter
>>>>>> >>>>>>>>>>>>>>> class CouponIssuer{
>>>>>> >>>>>>>>>>>>>>> private KStream<....> coffePurchases;
>>>>>> >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases;
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){
>>>>>> >>>>>>>>>>>>>>> return
>>>>>> >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>>> >>>>>>>>>>>>>>> /*In the real world the code here can be
>>>>>> complex, so
>>>>>> >>>>>>>>> creation of
>>>>>> >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in
>>>>>> order to
>>>>>> >>>>>>>> separate
>>>>>> >>>>>>>>>>>>> classes' responsibilities.*/
>>>>>> >>>>>>>>>>>>>>> }
>>>>>> >>>>>>>>>>>>>>> }
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>>> >>>>>>>>>>>>>>> .branch(predicate1,
>>>>> couponIssuer::setCoffePurchases)
>>>>>> >>>>>>>>>>>>>>> .branch(predicate2,
>>>>>> >>>>>> couponIssuer::setElectronicsPurchases)
>>>>>> >>>>>>>>>>>>>>> .onTopOf(transactionStream);
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up
>>>>>> everything
>>>>>> >>>>>>>> later,
>>>>>> >>>>>>>>>>>>> without the terminal operation!!!*/
>>>>>> >>>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly
>>>>>> initialize the
>>>>>> >>>>>>>>> CouponIssuer
>>>>>> >>>>>>>>>>>>> we need the terminal operation to be called before
>>>>>> >>>>>>>>> streamsBuilder.build()
>>>>>> >>>>>>>>>>>>> is called.
>>>>>> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is
>>>>>> essentially
>>>>>> >>>>>> the
>>>>>> >>>>>>>>> next
>>>>>> >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts
>>>>>> based on
>>>>>> >>>>> my
>>>>>> >>>>>>>>> experience,
>>>>>> >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
>>>>>> >>>>>>>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> Ivan
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>>> >>>>>>>>>>>>>>>> Ivan,
>>>>>> >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a
>>>>>> fluent
>>>>>> >>>>> API
>>>>>> >>>>>>>> based
>>>>>> >>>>>>>>>>>>> off of
>>>>>> >>>>>>>>>>>>>>>> KStream here
>>>>>> (https://github.com/apache/kafka/pull/6512),
>>>>>> >>>>>> and
>>>>>> >>>>>>> I
>>>>>> >>>>>>>>> think
>>>>>> >>>>>>>>>>>>> I
>>>>>> >>>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>>> >>>>>>>>>>>>>>>> - Compatibility: I was incorrect earlier about
>>>>>> >>>>>>> compatibility
>>>>>> >>>>>>>>>>>>> issues,
>>>>>> >>>>>>>>>>>>>>>> there aren't any direct ones. I was unaware
>>>>>> that Java
>>>>>> >>>>> is
>>>>>> >>>>>>>> smart
>>>>>> >>>>>>>>>>>>> enough to
>>>>>> >>>>>>>>>>>>>>>> distinguish between a branch(varargs...)
>>>>>> returning one
>>>>>> >>>>>>> thing
>>>>>> >>>>>>>>> and
>>>>>> >>>>>>>>>>>>> branch()
>>>>>> >>>>>>>>>>>>>>>> with no arguments returning another thing.
>>>>>> >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't actually
>>>>>> need
>>>>>> >>>>> it.
>>>>>> >>>>>>> We
>>>>>> >>>>>>>>> can
>>>>>> >>>>>>>>>>>>> just
>>>>>> >>>>>>>>>>>>>>>> build up the branches in the KBranchedStream who
>>>>>> shares
>>>>>> >>>>>> its
>>>>>> >>>>>>>>> state
>>>>>> >>>>>>>>>>>>> with the
>>>>>> >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do the
>>>>>> branching.
>>>>>> >>>>>>> It's
>>>>>> >>>>>>>>> not
>>>>>> >>>>>>>>>>>>> terribly
>>>>>> >>>>>>>>>>>>>>>> pretty in its current form, but I think it
>>>>>> demonstrates
>>>>>> >>>>>> its
>>>>>> >>>>>>>>>>>>> feasibility.
>>>>>> >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should
>>>>> be
>>>>>> >>>>> final
>>>>>> >>>>>> or
>>>>>> >>>>>>>>> even a
>>>>>> >>>>>>>>>>>>>>>> starting point if we go in this direction, I just
>>>>>> wanted to
>>>>>> >>>>>> see
>>>>>> >>>>>>>> how
>>>>>> >>>>>>>>>>>>>>>> challenging it would be to get the API working.
>>>>>> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing
>>>>>> solution
>>>>>> >>>>>>> could
>>>>>> >>>>>>>> be
>>>>>> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
>>>>>> >>>>> suggested
>>>>>> >>>>>>>> was a
>>>>>> >>>>>>>>>>>>>>>> possibility. The reason is that the newly branched
>>>>>> streams
>>>>>> >>>>>> are
>>>>>> >>>>>>>> not
>>>>>> >>>>>>>>>>>>>>>> available in the same scope as each other. That
>>>>>> is, if we
>>>>>> >>>>>>> wanted
>>>>>> >>>>>>>>> to
>>>>>> >>>>>>>>>>>>> merge
>>>>>> >>>>>>>>>>>>>>>> them back together again I don't see a way to do
>>>>>> that. The
>>>>>> >>>>>> KIP
>>>>>> >>>>>>>>>>>>> proposal
>>>>>> >>>>>>>>>>>>>>>> has the same issue, though - all this means is that
>>>>> for
>>>>>> >>>>>> either
>>>>>> >>>>>>>>>>>>> solution,
>>>>>> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the
>>>>>> table.
>>>>>> >>>>>>>>>>>>>>>> Thanks,
>>>>>> >>>>>>>>>>>>>>>> Paul
>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
>>>>>> >>>>>>>>> [email protected] <mailto:[email protected]>>
>>>>>> >>>>>>>>>>>>> wrote:
>>>>>> >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to
>>>>> this
>>>>>> >>>>>> point.
>>>>>> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that
>>>>>> branch API
>>>>>> >>>>>>> needs
>>>>>> >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..)
>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..)
>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional
>>>>>> >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf
>>>>>> returns
>>>>>> >>>>>> its
>>>>>> >>>>>>>>> argument
>>>>>> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code
>>>>> won't
>>>>>> >>>>> make
>>>>>> >>>>>>>> sense
>>>>>> >>>>>>>>>>>>> until
>>>>>> >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher
>>>>>> instance
>>>>>> >>>>>>>>> contrasts the
>>>>>> >>>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> stream
>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...)
>>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...)
>>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both
>>>>>> >>>>>>>>> defaultBranch(..)
>>>>>> >>>>>>>>>>>>> and
>>>>>> >>>>>>>>>>>>>>>>> noDefault() return void
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface
>>>>> is
>>>>>> >>>>>> defined.
>>>>>> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
>>>>>> >>>>>>>> (defaultBranch(ks->)
>>>>>> >>>>>>>>> and
>>>>>> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to
>>>>>> miss the
>>>>>> >>>>>> fact
>>>>>> >>>>>>>>> that one
>>>>>> >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these
>>>>>> methods
>>>>>> >>>>>> are
>>>>>> >>>>>>>> not
>>>>>> >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do
>>>>> better?
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> Ivan
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>> >>>>>>>>>>>>>>>>>>> Paul,
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking about
>>>>>> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
>>>>>> implemented the
>>>>>> >>>>>>> easy
>>>>>> >>>>>>>>> way.
>>>>>> >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
>>>>> assumes
>>>>>> >>>>> nothing
>>>>>> >>>>>>>> will
>>>>>> >>>>>>>>>>>>> reach
>>>>>> >>>>>>>>>>>>>>>>>>>> the default branch,
>>>>>> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only
>>>>> option
>>>>>> >>>>>> besides
>>>>>> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we
>>>>>> want to
>>>>>> >>>>>> just
>>>>>> >>>>>>>>> silently
>>>>>> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
>>>>>> predicate. 2)
>>>>>> >>>>>>> Throwing
>>>>>> >>>>>>>>> an
>>>>>> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing
>>>>>> looks
>>>>>> >>>>>> like a
>>>>>> >>>>>>>> bad
>>>>>> >>>>>>>>>>>>> idea.
>>>>>> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to
>>>>>> emit a
>>>>>> >>>>>>>> special
>>>>>> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly
>>>>> where
>>>>>> >>>>>>> `default`
>>>>>> >>>>>>>>> can
>>>>>> >>>>>>>>>>>>> be
>>>>>> >>>>>>>>>>>>>>>>>>> used.
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>> >>>>> InternalTopologyBuilder
>>>>>> >>>>>>> to
>>>>>> >>>>>>>>> track
>>>>>> >>>>>>>>>>>>>>>>>>>> dangling
>>>>>> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>>> a clear
>>>>>> >>>>>>> error
>>>>>> >>>>>>>>>>>>> before it
>>>>>> >>>>>>>>>>>>>>>>>>> becomes an issue.
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
>>>>>> >>>>> compiled
>>>>>> >>>>>>> and
>>>>>> >>>>>>>>> run?
>>>>>> >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't
>>>>>> compile if
>>>>>> >>>>> used
>>>>>> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
>>>>>> method chain
>>>>>> >>>>>>>> starting
>>>>>> >>>>>>>>>>>>> from
>>>>>> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference
>>>>>> between
>>>>>> >>>>>>>> runtime
>>>>>> >>>>>>>>> and
>>>>>> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
>>>>>> >>>>> instantly
>>>>>> >>>>>> on
>>>>>> >>>>>>>>> unit
>>>>>> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a
>>>>>> compilation
>>>>>> >>>>>>>> failure.
>>>>>> >>>>>>>>>>>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>> Ivan
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>> >>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being
>>>>>> required.
>>>>>> >>>>>>> But
>>>>>> >>>>>>>> is
>>>>>> >>>>>>>>>>>>> that
>>>>>> >>>>>>>>>>>>>>>>>>>> really
>>>>>> >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't want a
>>>>>> >>>>>> defaultBranch
>>>>>> >>>>>>>>> they
>>>>>> >>>>>>>>>>>>> can
>>>>>> >>>>>>>>>>>>>>>>>>>> call
>>>>>> >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?)
>>>>>> just as
>>>>>> >>>>>>>>> easily. In
>>>>>> >>>>>>>>>>>>>>>>>>>> fact I
>>>>>> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API
>>>>> - a
>>>>>> >>>>> user
>>>>>> >>>>>>>> could
>>>>>> >>>>>>>>>>>>> specify
>>>>>> >>>>>>>>>>>>>>>>> a
>>>>>> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach
>>>>> the
>>>>>> >>>>>> default
>>>>>> >>>>>>>>> branch,
>>>>>> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>> That
>>>>>> >>>>> seems
>>>>>> >>>>>>> like
>>>>>> >>>>>>>>> an
>>>>>> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API,
>>>>>> which allows
>>>>>> >>>>>> for
>>>>>> >>>>>>>> the
>>>>>> >>>>>>>>>>>>> more
>>>>>> >>>>>>>>>>>>>>>>>>>> subtle
>>>>>> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
>>>>> dropped.
>>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has
>>>>>> to be
>>>>>> >>>>>> well
>>>>>> >>>>>>>>>>>>>>>>>>>> documented, but
>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>>> >>>>> InternalTopologyBuilder
>>>>>> >>>>>>> to
>>>>>> >>>>>>>>> track
>>>>>> >>>>>>>>>>>>>>>>>>>> dangling
>>>>>> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>>> a clear
>>>>>> >>>>>>> error
>>>>>> >>>>>>>>>>>>> before it
>>>>>> >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that there is
>>>>> a
>>>>>> >>>>> "build
>>>>>> >>>>>>>> step"
>>>>>> >>>>>>>>>>>>> where
>>>>>> >>>>>>>>>>>>>>>>> the
>>>>>> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>>>> >>>>>> StreamsBuilder.build()
>>>>>> >>>>>>> is
>>>>>> >>>>>>>>>>>>> called.
>>>>>> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I
>>>>> agree
>>>>>> >>>>> that
>>>>>> >>>>>>> it's
>>>>>> >>>>>>>>>>>>>>>>>>>> critical to
>>>>>> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input
>>>>>> stream.
>>>>>> >>>>>>> With
>>>>>> >>>>>>>>> the
>>>>>> >>>>>>>>>>>>>>>>> fluent
>>>>>> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all
>>>>> other
>>>>>> >>>>>>> operations
>>>>>> >>>>>>>>> do -
>>>>>> >>>>>>>>>>>>> if
>>>>>> >>>>>>>>>>>>>>>>> you
>>>>>> >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
>>>>> multiple
>>>>>> >>>>> times,
>>>>>> >>>>>>> you
>>>>>> >>>>>>>>> just
>>>>>> >>>>>>>>>>>>>>>>>>>> need the
>>>>>> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many
>>>>>> operations
>>>>>> >>>>>> on
>>>>>> >>>>>>> it
>>>>>> >>>>>>>>> as
>>>>>> >>>>>>>>>>>>> you
>>>>>> >>>>>>>>>>>>>>>>>>>> desire.
>>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>> Best,
>>>>>> >>>>>>>>>>>>>>>>>>>> Paul
>>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
>>>>>> >>>>>>>>> [email protected] <mailto:[email protected]>
>>>>>> >>>>>>>>>>>>>>>>>>>> wrote:
>>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not
>>>>>> always need
>>>>>> >>>>>> the
>>>>>> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
>>>>> operation we
>>>>>> >>>>> don't
>>>>>> >>>>>>>> know
>>>>>> >>>>>>>>>>>>> when to
>>>>>> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument,
>>>>>> so we
>>>>>> >>>>> can
>>>>>> >>>>>> do
>>>>>> >>>>>>>>>>>>> something
>>>>>> >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
>>>>> special
>>>>>> >>>>> object
>>>>>> >>>>>>>>>>>>> construction
>>>>>> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods.
>>>>> But
>>>>>> >>>>> here
>>>>>> >>>>>> we
>>>>>> >>>>>>>>> have a
>>>>>> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the
>>>>>> flow,
>>>>>> >>>>> so
>>>>>> >>>>>> I
>>>>>> >>>>>>>>> think
>>>>>> >>>>>>>>>>>>> this
>>>>>> >>>>>>>>>>>>>>>>> is
>>>>>> >>>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>> >>>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this
>>>>>> API, but I
>>>>>> >>>>>> find
>>>>>> >>>>>>>> the
>>>>>> >>>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>>> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
>>>>>> contrasts the
>>>>>> >>>>>>> fluency
>>>>>> >>>>>>>>> of
>>>>>> >>>>>>>>>>>>> other
>>>>>> >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like to
>>>>>> just call
>>>>>> >>>>> a
>>>>>> >>>>>>>>> method on
>>>>>> >>>>>>>>>>>>> the
>>>>>> >>>>>>>>>>>>>>>>>>>>> stream
>>>>>> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch
>>>>>> cases
>>>>>> >>>>> are
>>>>>> >>>>>>>>> defined
>>>>>> >>>>>>>>>>>>>>>>>>>>>> fluently.
>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase)
>>>>>> is very
>>>>>> >>>>>> nice
>>>>>> >>>>>>>>> and the
>>>>>> >>>>>>>>>>>>>>>>>>>>>> right
>>>>>> >>>>>>>>>>>>>>>>>>>>> way
>>>>>> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around
>>>>>> how we
>>>>>> >>>>>>> specify
>>>>>> >>>>>>>>> the
>>>>>> >>>>>>>>>>>>> source
>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>> Like:
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1,
>>>>> this::handle1)
>>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2,
>>>>> this::handle2)
>>>>>> >>>>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDefault);
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
>>>>>> >>>>>>>> KStreamBrancher
>>>>>> >>>>>>>>> or
>>>>>> >>>>>>>>>>>>>>>>>>>>> something,
>>>>>> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
>>>>>> terminated by
>>>>>> >>>>>>>>>>>>> defaultBranch()
>>>>>> >>>>>>>>>>>>>>>>>>>>>> (which
>>>>>> >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously
>>>>>> incompatible with
>>>>>> >>>>> the
>>>>>> >>>>>>>>> current
>>>>>> >>>>>>>>>>>>>>>>>>>>>> API, so
>>>>>> >>>>>>>>>>>>>>>>>>>>> the
>>>>>> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a
>>>>>> different
>>>>>> >>>>>> name,
>>>>>> >>>>>>>> but
>>>>>> >>>>>>>>> that
>>>>>> >>>>>>>>>>>>>>>>>>>>>> seems
>>>>>> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
>>>>>> >>>>>> something
>>>>>> >>>>>>>> like
>>>>>> >>>>>>>>>>>>>>>>>>>>>> branched()
>>>>>> >>>>>>>>>>>>>>>>>>>>> or
>>>>>> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your
>>>>>> KIP? It
>>>>>> >>>>>> seems
>>>>>> >>>>>>>>> like it
>>>>>> >>>>>>>>>>>>>>>>>>>>>> does to
>>>>>> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching
>>>>>> while also
>>>>>> >>>>>>>> allowing
>>>>>> >>>>>>>>> you
>>>>>> >>>>>>>>>>>>> to
>>>>>> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
>>>>>> KBranchedStreams
>>>>>> >>>>>> if
>>>>>> >>>>>>>>> desired.
>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>> >>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
>>>>>> >>>>>>>>>>>>>>>>>>>>> <[email protected]>
>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String>
>>>>>> ks){
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...)
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
>>>>>> String> ks){
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey()...
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> }
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ......
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1,
>>>>>> this::handleFirstCase)
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2,
>>>>>> this::handleSecondCase)
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....)
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
>>>>> KafkaStreamsBrancher
>>>>>> >>>>> takes a
>>>>>> >>>>>>>>> Consumer
>>>>>> >>>>>>>>>>>>> as a
>>>>>> >>>>>>>>>>>>>>>>>>>>>>> second
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the
>>>>>> example in
>>>>>> >>>>>> the
>>>>>> >>>>>>>> KIP
>>>>>> >>>>>>>>>>>>> shows
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> each
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
>>>>>> >>>>>>>>> (KafkaStreams#to()
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in this
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> case).
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would
>>>>> we
>>>>>> >>>>> handle
>>>>>> >>>>>>> the
>>>>>> >>>>>>>>> case
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> where the
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to
>>>>> continue
>>>>>> >>>>>>>> processing
>>>>>> >>>>>>>>> and
>>>>>> >>>>>>>>>>>>> not
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the
>>>>> branched
>>>>>> >>>>>> stream
>>>>>> >>>>>>>>>>>>> immediately?
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if
>>>>>> we had
>>>>>> >>>>>>>> something
>>>>>> >>>>>>>>> like
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck
>>>>> <
>>>>>> >>>>>>>>> [email protected] <mailto:[email protected]>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for
>>>>> KIP-
>>>>>> >>>>> 418.
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
>>>>> KIP-418.
>>>>>> >>>>> Please
>>>>>> >>>>>>>> take
>>>>>> >>>>>>>>> a
>>>>>> >>>>>>>>>>>>> look
>>>>>> >>>>>>>>>>>>>>>>> at
>>>>>> >>>>>>>>>>>>>>>>>>>>> the
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any
>>>>>> feedback :)
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>>> >>>>>
>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>>>>> >>>>> https://github.com/apache/kafka/pull/6164
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>
>>>>>> >
>>>>>>
>>>>>
>>
>
signature.asc
Description: OpenPGP digital signature
