Ivan, did you see my last reply? What do you think about my proposal to mix both approaches and try to get best-of-both worlds?
-Matthias
On 6/11/19 3:56 PM, Matthias J. Sax wrote:
> Thanks for the input John!
>
>> under your suggestion, it seems that the name is required
>
> If you want to get the `KStream` as part of the `Map` back using a
> `Function`, yes. If you follow the "embedded chaining" pattern using a
> `Consumer`, no.
>
> Allowing for a default name via `split()` can of course be done.
> Similarly, using `Named` instead of `String` is possible.
>
> I wanted to sketch out a high level proposal to merge both patterns
> only. Your suggestions to align the new API with the existing API make
> totally sense.
>
>
>
> One follow up question: Would `Named` be optional or required in
> `split()` and `branch()`? It's unclear from your example.
>
> If both are mandatory, what do we gain by it? The returned `Map` only
> contains the corresponding branches, so why should we prefix all of
> them? If only `Named` is mandatory in `branch()`, but optional in
> `split()`, the same question raises?
>
> Requiring `Named` in `split()` seems only to make sense, if `Named` is
> optional in `branch()` and we generate `-X` suffix using a counter for
> different branch name. However, this might lead to the problem of
> changing names if branches are added/removed. Also, how would the names
> be generated if `Consumer` is mixed in (ie, not all branches are
> returned in the `Map`).
>
> If `Named` is optional for both, it could happen that a user misses to
> specify a name for a branch what would lead to runtime issues.
>
>
> Hence, I am actually in favor to not allow a default name but keep
> `split()` without parameter and make `Named` in `branch()` required if a
> `Function` is used. This makes it explicit to the user that specifying a
> name is required if a `Function` is used.
>
>
>
> About
>
>> KBranchedStream#branch(BranchConfig)
>
> I don't think that the branching predicate is a configuration and hence
> would not include it in a configuration object.
>
>> withChain(...);
>
> Similar, `withChain()` (that would only take a `Consumer`?) does not
> seem to be a configuration. We can also not prevent a user to call
> `withName()` in combination of `withChain()` what does not make sense
> IMHO. We could of course throw an RTE but not have a compile time check
> seems less appealing. Also, it could happen that neither `withChain()`
> not `withName()` is called and the branch is missing in the returned
> `Map` what lead to runtime issues, too.
>
> Hence, I don't think that we should add `BranchConfig`. A config object
> is helpful if each configuration can be set independently of all others,
> but this seems not to be the case here. If we add new configuration
> later, we can also just move forward by deprecating the methods that
> accept `Named` and add new methods that accepted `BranchConfig` (that
> would of course implement `Named`).
>
>
> Thoughts?
>
>
> @Ivan, what do you think about the general idea to blend the two main
> approaches of returning a `Map` plus an "embedded chaining"?
>
>
>
> -Matthias
>
>
>
> On 6/4/19 10:33 AM, John Roesler wrote:
>> Thanks for the idea, Matthias, it does seem like this would satisfy
>> everyone. Returning the map from the terminal operations also solves
>> the problem of merging/joining the branched streams, if we want to add
>> support for the compliment later on.
>>
>> Under your suggestion, it seems that the name is required. Otherwise,
>> we wouldn't have keys for the map to return. I this this is actually
>> not too bad, since experience has taught us that, although names for
>> operations are not required to define stream processing logic, it does
>> significantly improve the operational experience when you can map the
>> topology, logs, metrics, etc. back to the source code. Since you
>> wouldn't (have to) reference the name to chain extra processing onto
>> the branch (thanks to the second argument), you can avoid the
>> "unchecked name" problem that Ivan pointed out.
>>
>> In the current implementation of Branch, you can name the branch
>> operator itself, and then all the branches get index-suffixed names
>> built from the branch operator name. I guess under this proposal, we
>> could naturally append the branch name to the branching operator name,
>> like this:
>>
>> stream.split(Named.withName("mysplit")) //creates node "mysplit"
>> .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>> .defaultBranch(...) // creates node "mysplit-default"
>>
>> It does make me wonder about the DSL syntax itself, though.
>>
>> We don't have a defined grammar, so there's plenty of room to debate
>> the "best" syntax in the context of each operation, but in general,
>> the KStream DSL operators follow this pattern:
>>
>> operator(function, config_object?) OR operator(config_object)
>>
>> where config_object is often just Named in the "function" variant.
>> Even when the config_object isn't a Named, but some other config
>> class, that config class _always_ implements NamedOperation.
>>
>> Here, we're introducing a totally different pattern:
>>
>> operator(function, function, string)
>>
>> where the string is the name.
>> My first question is whether the name should instead be specified with
>> the NamedOperation interface.
>>
>> My second question is whether we should just roll all these arguments
>> up into a config object like:
>>
>> KBranchedStream#branch(BranchConfig)
>>
>> interface BranchConfig extends NamedOperation {
>> withPredicate(...);
>> withChain(...);
>> withName(...);
>> }
>>
>> Although I guess we'd like to call BranchConfig something more like
>> "Branched", even if I don't particularly like that pattern.
>>
>> This makes the source code a little noisier, but it also makes us more
>> future-proof, as we can deal with a wide range of alternatives purely
>> in the config interface, and never have to deal with adding overloads
>> to the KBranchedStream if/when we decide we want the name to be
>> optional, or the KStream->KStream to be optional.
>>
>> WDYT?
>>
>> Thanks,
>> -John
>>
>> On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
>> <[email protected]> wrote:
>>>
>>> Matthias: I think that's pretty reasonable from my point of view. Good
>>> suggestion.
>>>
>>> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax <[email protected]>
>>> wrote:
>>>
>>>> Interesting discussion.
>>>>
>>>> I am wondering, if we cannot unify the advantage of both approaches:
>>>>
>>>>
>>>>
>>>> KStream#split() -> KBranchedStream
>>>>
>>>> // branch is not easily accessible in current scope
>>>> KBranchedStream#branch(Predicate, Consumer<KStream>)
>>>> -> KBranchedStream
>>>>
>>>> // assign a name to the branch and
>>>> // return the sub-stream to the current scope later
>>>> //
>>>> // can be simple as `#branch(p, s->s, "name")`
>>>> // or also complex as `#branch(p, s->s.filter(...), "name")`
>>>> KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
>>>> -> KBranchedStream
>>>>
>>>> // default branch is not easily accessible
>>>> // return map of all named sub-stream into current scope
>>>> KBranchedStream#default(Cosumer<KStream>)
>>>> -> Map<String,KStream>
>>>>
>>>> // assign custom name to default-branch
>>>> // return map of all named sub-stream into current scope
>>>> KBranchedStream#default(Function<KStream,KStream>, String)
>>>> -> Map<String,KStream>
>>>>
>>>> // assign a default name for default
>>>> // return map of all named sub-stream into current scope
>>>> KBranchedStream#defaultBranch(Function<KStream,KStream>)
>>>> -> Map<String,KStream>
>>>>
>>>> // return map of all names sub-stream into current scope
>>>> KBranchedStream#noDefaultBranch()
>>>> -> Map<String,KStream>
>>>>
>>>>
>>>>
>>>> Hence, for each sub-stream, the user can pick to add a name and return
>>>> the branch "result" to the calling scope or not. The implementation can
>>>> also check at runtime that all returned names are unique. The returned
>>>> Map can be empty and it's also optional to use the Map.
>>>>
>>>> To me, it seems like a good way to get best of both worlds.
>>>>
>>>> Thoughts?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>>
>>>> On 5/6/19 5:15 PM, John Roesler wrote:
>>>>> Ivan,
>>>>>
>>>>> That's a very good point about the "start" operator in the dynamic case.
>>>>> I had no problem with "split()"; I was just questioning the necessity.
>>>>> Since you've provided a proof of necessity, I'm in favor of the
>>>>> "split()" start operator. Thanks!
>>>>>
>>>>> Separately, I'm interested to see where the present discussion leads.
>>>>> I've written enough Javascript code in my life to be suspicious of
>>>>> nested closures. You have a good point about using method references (or
>>>>> indeed function literals also work). It should be validating that this
>>>>> was also the JS community's first approach to flattening the logic when
>>>>> their nested closure situation got out of hand. Unfortunately, it's
>>>>> replacing nesting with redirection, both of which disrupt code
>>>>> readability (but in different ways for different reasons). In other
>>>>> words, I agree that function references is *the* first-order solution if
>>>>> the nested code does indeed become a problem.
>>>>>
>>>>> However, the history of JS also tells us that function references aren't
>>>>> the end of the story either, and you can see that by observing that
>>>>> there have been two follow-on eras, as they continue trying to cope with
>>>>> the consequences of living in such a callback-heavy language. First, you
>>>>> have Futures/Promises, which essentially let you convert nested code to
>>>>> method-chained code (Observables/FP is a popular variation on this).
>>>>> Most lately, you have async/await, which is an effort to apply language
>>>>> (not just API) syntax to the problem, and offer the "flattest" possible
>>>>> programming style to solve the problem (because you get back to just one
>>>>> code block per functional unit).
>>>>>
>>>>> Stream-processing is a different domain, and Java+KStreams is nowhere
>>>>> near as callback heavy as JS, so I don't think we have to take the JS
>>>>> story for granted, but then again, I think we can derive some valuable
>>>>> lessons by looking sideways to adjacent domains. I'm just bringing this
>>>>> up to inspire further/deeper discussion. At the same time, just like JS,
>>>>> we can afford to take an iterative approach to the problem.
>>>>>
>>>>> Separately again, I'm interested in the post-branch merge (and I'd also
>>>>> add join) problem that Paul brought up. We can clearly punt on it, by
>>>>> terminating the nested branches with sink operators. But is there a DSL
>>>>> way to do it?
>>>>>
>>>>> Thanks again for your driving this,
>>>>> -John
>>>>>
>>>>> On Thu, May 2, 2019 at 7:39 PM Paul Whalen <[email protected]
>>>>> <mailto:[email protected]>> wrote:
>>>>>
>>>>> Ivan, I’ll definitely forfeit my point on the clumsiness of the
>>>>> branch(predicate, consumer) solution, I don’t see any real drawbacks
>>>>> for the dynamic case.
>>>>>
>>>>> IMO the one trade off to consider at this point is the scope
>>>>> question. I don’t know if I totally agree that “we rarely need them
>>>>> in the same scope” since merging the branches back together later
>>>>> seems like a perfectly plausible use case that can be a lot nicer
>>>>> when the branched streams are in the same scope. That being said,
>>>>> for the reasons Ivan listed, I think it is overall the better
>>>>> solution - working around the scope thing is easy enough if you need
>>>>> to.
>>>>>
>>>>> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
>>>>> <[email protected]> wrote:
>>>>> >
>>>>> > Hello everyone, thank you all for joining the discussion!
>>>>> >
>>>>> > Well, I don't think the idea of named branches, be it a
>>>>> LinkedHashMap (no other Map will do, because order of definition
>>>>> matters) or `branch` method taking name and Consumer has more
>>>>> advantages than drawbacks.
>>>>> >
>>>>> > In my opinion, the only real positive outcome from Michael's
>>>>> proposal is that all the returned branches are in the same scope.
>>>>> But 1) we rarely need them in the same scope 2) there is a
>>>>> workaround for the scope problem, described in the KIP.
>>>>> >
>>>>> > 'Inlining the complex logic' is not a problem, because we can use
>>>>> method references instead of lambdas. In real world scenarios you
>>>>> tend to split the complex logic to methods anyway, so the code is
>>>>> going to be clean.
>>>>> >
>>>>> > The drawbacks are strong. The cohesion between predicates and
>>>>> handlers is lost. We have to define predicates in one place, and
>>>>> handlers in another. This opens the door for bugs:
>>>>> >
>>>>> > - what if we forget to define a handler for a name? or a name for
>>>>> a handler?
>>>>> > - what if we misspell a name?
>>>>> > - what if we copy-paste and duplicate a name?
>>>>> >
>>>>> > What Michael propose would have been totally OK if we had been
>>>>> writing the API in Lua, Ruby or Python. In those languages the
>>>>> "dynamic naming" approach would have looked most concise and
>>>>> beautiful. But in Java we expect all the problems related to
>>>>> identifiers to be eliminated in compile time.
>>>>> >
>>>>> > Do we have to invent duck-typing for the Java API?
>>>>> >
>>>>> > And if we do, what advantage are we supposed to get besides having
>>>>> all the branches in the same scope? Michael, maybe I'm missing your
>>>>> point?
>>>>> >
>>>>> > ---
>>>>> >
>>>>> > Earlier in this discussion John Roesler also proposed to do
>>>>> without "start branching" operator, and later Paul mentioned that in
>>>>> the case when we have to add a dynamic number of branches, the
>>>>> current KIP is 'clumsier' compared to Michael's 'Map' solution. Let
>>>>> me address both comments here.
>>>>> >
>>>>> > 1) "Start branching" operator (I think that *split* is a good name
>>>>> for it indeed) is critical when we need to do a dynamic branching,
>>>>> see example below.
>>>>> >
>>>>> > 2) No, dynamic branching in current KIP is not clumsy at all.
>>>>> Imagine a real-world scenario when you need one branch per enum
>>>>> value (say, RecordType). You can have something like this:
>>>>> >
>>>>> > /*John:if we had to start with stream.branch(...) here, it would
>>>>> have been much messier.*/
>>>>> > KBranchedStream branched = stream.split();
>>>>> >
>>>>> > /*Not clumsy at all :-)*/
>>>>> > for (RecordType recordType : RecordType.values())
>>>>> > branched = branched.branch((k, v) -> v.getRecType() ==
>>>>> recordType,
>>>>> > recordType::processRecords);
>>>>> >
>>>>> > Regards,
>>>>> >
>>>>> > Ivan
>>>>> >
>>>>> >
>>>>> > 02.05.2019 14:40, Matthias J. Sax пишет:
>>>>> >> I also agree with Michael's observation about the core problem of
>>>>> >> current `branch()` implementation.
>>>>> >>
>>>>> >> However, I also don't like to pass in a clumsy Map object. My
>>>>> thinking
>>>>> >> was more aligned with Paul's proposal to just add a name to each
>>>>> >> `branch()` statement and return a `Map<String,KStream>`.
>>>>> >>
>>>>> >> It makes the code easier to read, and also make the order of
>>>>> >> `Predicates` (that is essential) easier to grasp.
>>>>> >>
>>>>> >>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>> >>>>>> .branch("branchOne", Predicate<K, V>)
>>>>> >>>>>> .branch( "branchTwo", Predicate<K, V>)
>>>>> >>>>>> .defaultBranch("defaultBranch");
>>>>> >> An open question is the case for which no defaultBranch() should
>>>> be
>>>>> >> specified. Atm, `split()` and `branch()` would return
>>>>> `BranchedKStream`
>>>>> >> and the call to `defaultBranch()` that returns the `Map` is
>>>> mandatory
>>>>> >> (what is not the case atm). Or is this actually not a real
>>>> problem,
>>>>> >> because users can just ignore the branch returned by
>>>>> `defaultBranch()`
>>>>> >> in the result `Map` ?
>>>>> >>
>>>>> >>
>>>>> >> About "inlining": So far, it seems to be a matter of personal
>>>>> >> preference. I can see arguments for both, but no "killer
>>>>> argument" yet
>>>>> >> that clearly make the case for one or the other.
>>>>> >>
>>>>> >>
>>>>> >> -Matthias
>>>>> >>
>>>>> >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>>>> >>> Perhaps inlining is the wrong terminology. It doesn’t require
>>>>> that a lambda with the full downstream topology be defined inline -
>>>>> it can be a method reference as with Ivan’s original suggestion.
>>>>> The advantage of putting the predicate and its downstream logic
>>>>> (Consumer) together in branch() is that they are required to be near
>>>>> to each other.
>>>>> >>>
>>>>> >>> Ultimately the downstream code has to live somewhere, and deep
>>>>> branch trees will be hard to read regardless.
>>>>> >>>
>>>>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
>>>>> <[email protected]
>>>>> <mailto:[email protected]>> wrote:
>>>>> >>>>
>>>>> >>>> I'm less enthusiastic about inlining the branch logic with its
>>>>> downstream
>>>>> >>>> functionality. Programs that have deep branch trees will
>>>>> quickly become
>>>>> >>>> harder to read as a single unit.
>>>>> >>>>
>>>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
>>>>> <[email protected] <mailto:[email protected]>> wrote:
>>>>> >>>>>
>>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, I think
>>>>> that sets a
>>>>> >>>>> great framework for the discussion.
>>>>> >>>>>
>>>>> >>>>> Regarding the SortedMap solution, my understanding is that the
>>>>> current
>>>>> >>>>> proposal in the KIP is what is in my PR which (pending naming
>>>>> decisions) is
>>>>> >>>>> roughly this:
>>>>> >>>>>
>>>>> >>>>> stream.split()
>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>> >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>> >>>>> .defaultBranch(Consumer<KStream<K, V>>);
>>>>> >>>>>
>>>>> >>>>> Obviously some ordering is necessary, since branching as a
>>>>> construct
>>>>> >>>>> doesn't work without it, but this solution seems like it
>>>>> provides as much
>>>>> >>>>> associativity as the SortedMap solution, because each branch()
>>>>> call
>>>>> >>>>> directly associates the "conditional" with the "code block."
>>>>> The value it
>>>>> >>>>> provides over the KIP solution is the accessing of streams in
>>>>> the same
>>>>> >>>>> scope.
>>>>> >>>>>
>>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap solution
>>>>> in the sense
>>>>> >>>>> that it is slightly clumsier to add a dynamic number of
>>>>> branches, but it is
>>>>> >>>>> certainly possible. It seems to me like the API should favor
>>>>> the "static"
>>>>> >>>>> case anyway, and should make it simple and readable to
>>>>> fluently declare and
>>>>> >>>>> access your branches in-line. It also makes it impossible to
>>>>> ignore a
>>>>> >>>>> branch, and it is possible to build an (almost) identical
>>>>> SortedMap
>>>>> >>>>> solution on top of it.
>>>>> >>>>>
>>>>> >>>>> I could also see a middle ground where instead of a raw
>>>>> SortedMap being
>>>>> >>>>> taken in, branch() takes a name and not a Consumer. Something
>>>>> like this:
>>>>> >>>>>
>>>>> >>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>> >>>>> .branch("branchOne", Predicate<K, V>)
>>>>> >>>>> .branch( "branchTwo", Predicate<K, V>)
>>>>> >>>>> .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
>>>>> >>>>>
>>>>> >>>>> Pros for that solution:
>>>>> >>>>> - accessing branched KStreams in same scope
>>>>> >>>>> - no double brace initialization, hopefully slightly more
>>>>> readable than
>>>>> >>>>> SortedMap
>>>>> >>>>>
>>>>> >>>>> Cons
>>>>> >>>>> - downstream branch logic cannot be specified inline which
>>>>> makes it harder
>>>>> >>>>> to read top to bottom (like existing API and SortedMap, but
>>>>> unlike the KIP)
>>>>> >>>>> - you can forget to "handle" one of the branched streams (like
>>>>> existing
>>>>> >>>>> API and SortedMap, but unlike the KIP)
>>>>> >>>>>
>>>>> >>>>> (KBranchedStreams could even work *both* ways but perhaps
>>>>> that's overdoing
>>>>> >>>>> it).
>>>>> >>>>>
>>>>> >>>>> Overall I'm curious how important it is to be able to easily
>>>>> access the
>>>>> >>>>> branched KStream in the same scope as the original. It's
>>>>> possible that it
>>>>> >>>>> doesn't need to be handled directly by the API, but instead
>>>>> left up to the
>>>>> >>>>> user. I'm sort of in the middle on it.
>>>>> >>>>>
>>>>> >>>>> Paul
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
>>>>> <[email protected] <mailto:[email protected]>>
>>>>> >>>>> wrote:
>>>>> >>>>>
>>>>> >>>>>> I'd like to +1 what Michael said about the issues with the
>>>>> existing
>>>>> >>>>> branch
>>>>> >>>>>> method, I agree with what he's outlined and I think we should
>>>>> proceed by
>>>>> >>>>>> trying to alleviate these problems. Specifically it seems
>>>>> important to be
>>>>> >>>>>> able to cleanly access the individual branches (eg by mapping
>>>>> >>>>>> name->stream), which I thought was the original intention of
>>>>> this KIP.
>>>>> >>>>>>
>>>>> >>>>>> That said, I don't think we should so easily give in to the
>>>>> double brace
>>>>> >>>>>> anti-pattern or force ours users into it if at all possible to
>>>>> >>>>> avoid...just
>>>>> >>>>>> my two cents.
>>>>> >>>>>>
>>>>> >>>>>> Cheers,
>>>>> >>>>>> Sophie
>>>>> >>>>>>
>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>>> >>>>>> [email protected]
>>>>> <mailto:[email protected]>> wrote:
>>>>> >>>>>>
>>>>> >>>>>>> I’d like to propose a different way of thinking about this.
>>>>> To me,
>>>>> >>>>> there
>>>>> >>>>>>> are three problems with the existing branch signature:
>>>>> >>>>>>>
>>>>> >>>>>>> 1. If you use it the way most people do, Java raises unsafe
>>>> type
>>>>> >>>>>> warnings.
>>>>> >>>>>>> 2. The way in which you use the stream branches is
>>>>> positionally coupled
>>>>> >>>>>> to
>>>>> >>>>>>> the ordering of the conditionals.
>>>>> >>>>>>> 3. It is brittle to extend existing branch calls with
>>>>> additional code
>>>>> >>>>>>> paths.
>>>>> >>>>>>>
>>>>> >>>>>>> Using associative constructs instead of relying on ordered
>>>>> constructs
>>>>> >>>>>> would
>>>>> >>>>>>> be a stronger approach. Consider a signature that instead
>>>>> looks like
>>>>> >>>>>> this:
>>>>> >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String,
>>>>> Predicate<?
>>>>> >>>>>>> super K,? super V>>);
>>>>> >>>>>>>
>>>>> >>>>>>> Branches are given names in a map, and as a result, the API
>>>>> returns a
>>>>> >>>>>>> mapping of names to streams. The ordering of the
>>>> conditionals is
>>>>> >>>>>> maintained
>>>>> >>>>>>> because it’s a sorted map. Insert order determines the order
>>>> of
>>>>> >>>>>> evaluation.
>>>>> >>>>>>> This solves problem 1 because there are no more varargs. It
>>>>> solves
>>>>> >>>>>> problem
>>>>> >>>>>>> 2 because you no longer lean on ordering to access the
>>>>> branch you’re
>>>>> >>>>>>> interested in. It solves problem 3 because you can introduce
>>>>> another
>>>>> >>>>>>> conditional by simply attaching another name to the
>>>>> structure, rather
>>>>> >>>>>> than
>>>>> >>>>>>> messing with the existing indices.
>>>>> >>>>>>>
>>>>> >>>>>>> One of the drawbacks is that creating the map inline is
>>>>> historically
>>>>> >>>>>>> awkward in Java. I know it’s an anti-pattern to use
>>>>> voluminously, but
>>>>> >>>>>>> double brace initialization would clean up the aesthetics.
>>>>> >>>>>>>
>>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
>>>>> <[email protected] <mailto:[email protected]>>
>>>>> >>>>> wrote:
>>>>> >>>>>>>> Hi Ivan,
>>>>> >>>>>>>>
>>>>> >>>>>>>> Thanks for the update.
>>>>> >>>>>>>>
>>>>> >>>>>>>> FWIW, I agree with Matthias that the current "start
>>>> branching"
>>>>> >>>>> operator
>>>>> >>>>>>> is
>>>>> >>>>>>>> confusing when named the same way as the actual branches.
>>>>> "Split"
>>>>> >>>>> seems
>>>>> >>>>>>>> like a good name. Alternatively, we can do without a "start
>>>>> >>>>> branching"
>>>>> >>>>>>>> operator at all, and just do:
>>>>> >>>>>>>>
>>>>> >>>>>>>> stream
>>>>> >>>>>>>> .branch(Predicate)
>>>>> >>>>>>>> .branch(Predicate)
>>>>> >>>>>>>> .defaultBranch();
>>>>> >>>>>>>>
>>>>> >>>>>>>> Tentatively, I think that this branching operation should be
>>>>> >>>>> terminal.
>>>>> >>>>>>> That
>>>>> >>>>>>>> way, we don't create ambiguity about how to use it. That
>>>>> is, `branch`
>>>>> >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is
>>>>> `void`, to
>>>>> >>>>>>>> enforce that it comes last, and that there is only one
>>>>> definition of
>>>>> >>>>>> the
>>>>> >>>>>>>> default branch. Potentially, we should log a warning if
>>>>> there's no
>>>>> >>>>>>> default,
>>>>> >>>>>>>> and additionally log a warning (or throw an exception) if a
>>>>> record
>>>>> >>>>>> falls
>>>>> >>>>>>>> though with no default.
>>>>> >>>>>>>>
>>>>> >>>>>>>> Thoughts?
>>>>> >>>>>>>>
>>>>> >>>>>>>> Thanks,
>>>>> >>>>>>>> -John
>>>>> >>>>>>>>
>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>>>> >>>>> [email protected] <mailto:[email protected]>
>>>>> >>>>>>>> wrote:
>>>>> >>>>>>>>
>>>>> >>>>>>>>> Thanks for updating the KIP and your answers.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>> this is to make the name similar to String#split
>>>>> >>>>>>>>>>> that also returns an array, right?
>>>>> >>>>>>>>> The intend was to avoid name duplication. The return type
>>>>> should
>>>>> >>>>>> _not_
>>>>> >>>>>>>>> be an array.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> The current proposal is
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> stream.branch()
>>>>> >>>>>>>>> .branch(Predicate)
>>>>> >>>>>>>>> .branch(Predicate)
>>>>> >>>>>>>>> .defaultBranch();
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> IMHO, this reads a little odd, because the first
>>>>> `branch()` does
>>>>> >>>>> not
>>>>> >>>>>>>>> take any parameters and has different semantics than the
>>>> later
>>>>> >>>>>>>>> `branch()` calls. Note, that from the code snippet above,
>>>> it's
>>>>> >>>>> hidden
>>>>> >>>>>>>>> that the first call is `KStream#branch()` while the others
>>>> are
>>>>> >>>>>>>>> `KBranchedStream#branch()` what makes reading the code
>>>> harder.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`,
>>>>> I though
>>>>> >>>>>> it
>>>>> >>>>>>>>> might be better to also rename `KStream#branch()` to avoid
>>>> the
>>>>> >>>>> naming
>>>>> >>>>>>>>> overlap that seems to be confusing. The following reads
>>>> much
>>>>> >>>>> cleaner
>>>>> >>>>>> to
>>>>> >>>>>>>> me:
>>>>> >>>>>>>>> stream.split()
>>>>> >>>>>>>>> .branch(Predicate)
>>>>> >>>>>>>>> .branch(Predicate)
>>>>> >>>>>>>>> .defaultBranch();
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Maybe there is a better alternative to `split()` though to
>>>>> avoid
>>>>> >>>>> the
>>>>> >>>>>>>>> naming overlap.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately
>>>> we
>>>>> >>>>> cannot
>>>>> >>>>>>> have
>>>>> >>>>>>>>> a method with such name :-)
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up
>>>>> with a
>>>>> >>>>> short
>>>>> >>>>>>>> name?
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP
>>>>> with all
>>>>> >>>>> it's
>>>>> >>>>>>>>> methods? It will be part of public API and should be
>>>>> contained in
>>>>> >>>>> the
>>>>> >>>>>>>>> KIP. For example, it's unclear atm, what the return type of
>>>>> >>>>>>>>> `defaultBranch()` is.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> You did not comment on the idea to add a
>>>>> `KBranchedStream#get(int
>>>>> >>>>>>> index)
>>>>> >>>>>>>>> -> KStream` method to get the individually
>>>>> branched-KStreams. Would
>>>>> >>>>>> be
>>>>> >>>>>>>>> nice to get your feedback about it. It seems you suggest
>>>>> that users
>>>>> >>>>>>>>> would need to write custom utility code otherwise, to
>>>>> access them.
>>>>> >>>>> We
>>>>> >>>>>>>>> should discuss the pros and cons of both approaches. It
>>>> feels
>>>>> >>>>>>>>> "incomplete" to me atm, if the API has no built-in support
>>>>> to get
>>>>> >>>>> the
>>>>> >>>>>>>>> branched-KStreams directly.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> -Matthias
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>> >>>>>>>>>> Hi all!
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> I have updated the KIP-418 according to the new vision.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Matthias, thanks for your comment!
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>> >>>>>>>>>> I can see your point: this is to make the name similar to
>>>>> >>>>>>> String#split
>>>>> >>>>>>>>>> that also returns an array, right? But is it worth the
>>>>> loss of
>>>>> >>>>>>>> backwards
>>>>> >>>>>>>>>> compatibility? We can have overloaded branch() as well
>>>>> without
>>>>> >>>>>>>> affecting
>>>>> >>>>>>>>>> the existing code. Maybe the old array-based `branch`
>>>> method
>>>>> >>>>> should
>>>>> >>>>>>> be
>>>>> >>>>>>>>>> deprecated, but this is a subject for discussion.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>>>> >>>>> BranchingKStream#branch(),
>>>>> >>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>> BranchingKStream#default()
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default'
>>>> is,
>>>>> >>>>>>> however, a
>>>>> >>>>>>>>>> reserved word, so unfortunately we cannot have a method
>>>>> with such
>>>>> >>>>>>> name
>>>>> >>>>>>>>> :-)
>>>>> >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument,
>>>> but I
>>>>> >>>>> think
>>>>> >>>>>>> that
>>>>> >>>>>>>>>> is not required?
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste error or
>>>>> something.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Dear colleagues,
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> please revise the new version of the KIP and Paul's PR
>>>>> >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Any new suggestions/objections?
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Regards,
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Ivan
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>> >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems
>>>> that
>>>>> >>>>>>> everybody
>>>>> >>>>>>>>>>> agrees that the current branch() method using arrays is
>>>> not
>>>>> >>>>>> optimal.
>>>>> >>>>>>>>>>> I had a quick look into the PR and I like the overall
>>>>> proposal.
>>>>> >>>>>>> There
>>>>> >>>>>>>>>>> are some minor things we need to consider. I would
>>>>> recommend the
>>>>> >>>>>>>>>>> following renaming:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> KStream#branch() -> #split()
>>>>> >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
>>>>> >>>>>>>>>>> KBranchedStream#defaultBranch() ->
>>>>> BranchingKStream#default()
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter method
>>>> names.
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an
>>>>> `Predicate` as
>>>>> >>>>>>>> argument,
>>>>> >>>>>>>>>>> but I think that is not required?
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was recently
>>>>> accepted and
>>>>> >>>>>> is
>>>>> >>>>>>>>>>> currently implemented:
>>>>> >>>>>>>>>>>
>>>>> >>>>>
>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>> >>>>>>>>>>> Ie, we should add overloads that accepted a `Named`
>>>>> parameter.
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> For the issue that the created `KStream` object are in
>>>>> different
>>>>> >>>>>>>> scopes:
>>>>> >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
>>>>> index)` method
>>>>> >>>>>>> that
>>>>> >>>>>>>>>>> returns the corresponding "branched" result `KStream`
>>>>> object?
>>>>> >>>>>> Maybe,
>>>>> >>>>>>>> the
>>>>> >>>>>>>>>>> second argument of `addBranch()` should not be a
>>>>> >>>>>> `Consumer<KStream>`
>>>>> >>>>>>>> but
>>>>> >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return
>>>>> whatever
>>>>> >>>>>> the
>>>>> >>>>>>>>>>> `Function` returns?
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP with the
>>>>> current
>>>>> >>>>>>>>>>> proposal. That makes it easier to review.
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> -Matthias
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>> >>>>>>>>>>>> Ivan,
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it
>>>>> makes sense
>>>>> >>>>>> for
>>>>> >>>>>>>> you
>>>>> >>>>>>>>> to
>>>>> >>>>>>>>>>>> revise the KIP and continue the discussion. Obviously
>>>>> we'll
>>>>> >>>>> need
>>>>> >>>>>>>> some
>>>>> >>>>>>>>>>>> buy-in from committers that have actual binding votes on
>>>>> >>>>> whether
>>>>> >>>>>>> the
>>>>> >>>>>>>>> KIP
>>>>> >>>>>>>>>>>> could be adopted. It would be great to hear if they
>>>>> think this
>>>>> >>>>>> is
>>>>> >>>>>>> a
>>>>> >>>>>>>>> good
>>>>> >>>>>>>>>>>> idea overall. I'm not sure if that happens just by
>>>>> starting a
>>>>> >>>>>>> vote,
>>>>> >>>>>>>>> or if
>>>>> >>>>>>>>>>>> there is generally some indication of interest
>>>> beforehand.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> That being said, I'll continue the discussion a bit:
>>>>> assuming
>>>>> >>>>> we
>>>>> >>>>>> do
>>>>> >>>>>>>>> move
>>>>> >>>>>>>>>>>> forward the solution of "stream.branch() returns
>>>>> >>>>>> KBranchedStream",
>>>>> >>>>>>> do
>>>>> >>>>>>>>> we
>>>>> >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? I
>>>> would
>>>>> >>>>> favor
>>>>> >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs
>>>> that
>>>>> >>>>>>> accomplish
>>>>> >>>>>>>>> the
>>>>> >>>>>>>>>>>> same thing is confusing, especially when they're fairly
>>>>> similar
>>>>> >>>>>>>>> anyway. We
>>>>> >>>>>>>>>>>> just need to be sure we're not making something
>>>>> >>>>>>> impossible/difficult
>>>>> >>>>>>>>> that
>>>>> >>>>>>>>>>>> is currently possible/easy.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Regarding my PR - I think the general structure would
>>>> work,
>>>>> >>>>> it's
>>>>> >>>>>>>> just a
>>>>> >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
>>>>> >>>>>>> particular,
>>>>> >>>>>>>>>>>> passing in the "predicates" and "children" lists which
>>>> get
>>>>> >>>>>> modified
>>>>> >>>>>>>> in
>>>>> >>>>>>>>>>>> KBranchedStream but read from all the way
>>>>> KStreamLazyBranch is
>>>>> >>>>> a
>>>>> >>>>>>> bit
>>>>> >>>>>>>>>>>> complicated to follow.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Thanks,
>>>>> >>>>>>>>>>>> Paul
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
>>>>> >>>>>> [email protected] <mailto:[email protected]>
>>>>> >>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>> Hi Paul!
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully
>>>>> convinced: your
>>>>> >>>>>>>> proposal
>>>>> >>>>>>>>>>>>> looks better and should work. We just have to document
>>>> the
>>>>> >>>>>> crucial
>>>>> >>>>>>>>> fact
>>>>> >>>>>>>>>>>>> that KStream consumers are invoked as they're added.
>>>>> And then
>>>>> >>>>>> it's
>>>>> >>>>>>>> all
>>>>> >>>>>>>>>>>>> going to be very nice.
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and
>>>>> resume the
>>>>> >>>>>>>>>>>>> discussion here, right?
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
>>>>> >>>>> starting
>>>>> >>>>>>>> point
>>>>> >>>>>>>>> if
>>>>> >>>>>>>>>>>>> we go in this direction'? To me it looks like a good
>>>>> starting
>>>>> >>>>>>> point.
>>>>> >>>>>>>>> But
>>>>> >>>>>>>>>>>>> as a novice in this project I might miss some important
>>>>> >>>>> details.
>>>>> >>>>>>>>>>>>> Regards,
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> Ivan
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>> >>>>>>>>>>>>>> Ivan,
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
>>>>> >>>>> stream.branch()
>>>>> >>>>>>>>> solution
>>>>> >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
>>>>> >>>>> invoked
>>>>> >>>>>> as
>>>>> >>>>>>>>> they’re
>>>>> >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user
>>>>> still
>>>>> >>>>>> ought
>>>>> >>>>>>> to
>>>>> >>>>>>>>> be
>>>>> >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and
>>>>> depend on
>>>>> >>>>> the
>>>>> >>>>>>>>> branched
>>>>> >>>>>>>>>>>>> streams having been set.
>>>>> >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to
>>>>> access
>>>>> >>>>> the
>>>>> >>>>>>>>> branched
>>>>> >>>>>>>>>>>>> streams in the same scope as the original stream (that
>>>>> is, not
>>>>> >>>>>>>> inside
>>>>> >>>>>>>>> the
>>>>> >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
>>>>> >>>>> solutions.
>>>>> >>>>>> It
>>>>> >>>>>>>>> can be
>>>>> >>>>>>>>>>>>> worked around though.
>>>>> >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m
>>>>> excited
>>>>> >>>>> to
>>>>> >>>>>>>> hear
>>>>> >>>>>>>>>>>>> your thoughts!]
>>>>> >>>>>>>>>>>>>> Paul
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
>>>>> >>>>>> [email protected] <mailto:[email protected]>
>>>>> >>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>>>> Hi Paul!
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
>>>>> >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at
>>>> first
>>>>> >>>>> glance,
>>>>> >>>>>>> but
>>>>> >>>>>>>>> ---
>>>>> >>>>>>>>>>>>>>>> the newly branched streams are not available in the
>>>>> same
>>>>> >>>>>> scope
>>>>> >>>>>>> as
>>>>> >>>>>>>>> each
>>>>> >>>>>>>>>>>>> other. That is, if we wanted to merge them back
>>>> together
>>>>> >>>>> again
>>>>> >>>>>> I
>>>>> >>>>>>>>> don't see
>>>>> >>>>>>>>>>>>> a way to do that.
>>>>> >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was
>>>>> just
>>>>> >>>>>> going
>>>>> >>>>>>> to
>>>>> >>>>>>>>>>>>> write in details about this issue.
>>>>> >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say
>>>>> we need
>>>>> >>>>> to
>>>>> >>>>>>>>> identify
>>>>> >>>>>>>>>>>>> customers who have bought coffee and made a purchase
>>>>> in the
>>>>> >>>>>>>>> electronics
>>>>> >>>>>>>>>>>>> store to give them coupons.
>>>>> >>>>>>>>>>>>>>> This is the code I usually write under these
>>>>> circumstances
>>>>> >>>>>> using
>>>>> >>>>>>>> my
>>>>> >>>>>>>>>>>>> 'brancher' class:
>>>>> >>>>>>>>>>>>>>> @Setter
>>>>> >>>>>>>>>>>>>>> class CouponIssuer{
>>>>> >>>>>>>>>>>>>>> private KStream<....> coffePurchases;
>>>>> >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases;
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){
>>>>> >>>>>>>>>>>>>>> return
>>>>> >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>> >>>>>>>>>>>>>>> /*In the real world the code here can be
>>>>> complex, so
>>>>> >>>>>>>>> creation of
>>>>> >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in
>>>>> order to
>>>>> >>>>>>>> separate
>>>>> >>>>>>>>>>>>> classes' responsibilities.*/
>>>>> >>>>>>>>>>>>>>> }
>>>>> >>>>>>>>>>>>>>> }
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>> >>>>>>>>>>>>>>> .branch(predicate1,
>>>> couponIssuer::setCoffePurchases)
>>>>> >>>>>>>>>>>>>>> .branch(predicate2,
>>>>> >>>>>> couponIssuer::setElectronicsPurchases)
>>>>> >>>>>>>>>>>>>>> .onTopOf(transactionStream);
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up
>>>>> everything
>>>>> >>>>>>>> later,
>>>>> >>>>>>>>>>>>> without the terminal operation!!!*/
>>>>> >>>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly
>>>>> initialize the
>>>>> >>>>>>>>> CouponIssuer
>>>>> >>>>>>>>>>>>> we need the terminal operation to be called before
>>>>> >>>>>>>>> streamsBuilder.build()
>>>>> >>>>>>>>>>>>> is called.
>>>>> >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is
>>>>> essentially
>>>>> >>>>>> the
>>>>> >>>>>>>>> next
>>>>> >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts
>>>>> based on
>>>>> >>>>> my
>>>>> >>>>>>>>> experience,
>>>>> >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
>>>>> >>>>>>>>>>>>>>> Regards,
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Ivan
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>> >>>>>>>>>>>>>>>> Ivan,
>>>>> >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a
>>>>> fluent
>>>>> >>>>> API
>>>>> >>>>>>>> based
>>>>> >>>>>>>>>>>>> off of
>>>>> >>>>>>>>>>>>>>>> KStream here
>>>>> (https://github.com/apache/kafka/pull/6512),
>>>>> >>>>>> and
>>>>> >>>>>>> I
>>>>> >>>>>>>>> think
>>>>> >>>>>>>>>>>>> I
>>>>> >>>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>> >>>>>>>>>>>>>>>> - Compatibility: I was incorrect earlier about
>>>>> >>>>>>> compatibility
>>>>> >>>>>>>>>>>>> issues,
>>>>> >>>>>>>>>>>>>>>> there aren't any direct ones. I was unaware
>>>>> that Java
>>>>> >>>>> is
>>>>> >>>>>>>> smart
>>>>> >>>>>>>>>>>>> enough to
>>>>> >>>>>>>>>>>>>>>> distinguish between a branch(varargs...)
>>>>> returning one
>>>>> >>>>>>> thing
>>>>> >>>>>>>>> and
>>>>> >>>>>>>>>>>>> branch()
>>>>> >>>>>>>>>>>>>>>> with no arguments returning another thing.
>>>>> >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't actually
>>>>> need
>>>>> >>>>> it.
>>>>> >>>>>>> We
>>>>> >>>>>>>>> can
>>>>> >>>>>>>>>>>>> just
>>>>> >>>>>>>>>>>>>>>> build up the branches in the KBranchedStream who
>>>>> shares
>>>>> >>>>>> its
>>>>> >>>>>>>>> state
>>>>> >>>>>>>>>>>>> with the
>>>>> >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do the
>>>>> branching.
>>>>> >>>>>>> It's
>>>>> >>>>>>>>> not
>>>>> >>>>>>>>>>>>> terribly
>>>>> >>>>>>>>>>>>>>>> pretty in its current form, but I think it
>>>>> demonstrates
>>>>> >>>>>> its
>>>>> >>>>>>>>>>>>> feasibility.
>>>>> >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should
>>>> be
>>>>> >>>>> final
>>>>> >>>>>> or
>>>>> >>>>>>>>> even a
>>>>> >>>>>>>>>>>>>>>> starting point if we go in this direction, I just
>>>>> wanted to
>>>>> >>>>>> see
>>>>> >>>>>>>> how
>>>>> >>>>>>>>>>>>>>>> challenging it would be to get the API working.
>>>>> >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing
>>>>> solution
>>>>> >>>>>>> could
>>>>> >>>>>>>> be
>>>>> >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
>>>>> >>>>> suggested
>>>>> >>>>>>>> was a
>>>>> >>>>>>>>>>>>>>>> possibility. The reason is that the newly branched
>>>>> streams
>>>>> >>>>>> are
>>>>> >>>>>>>> not
>>>>> >>>>>>>>>>>>>>>> available in the same scope as each other. That
>>>>> is, if we
>>>>> >>>>>>> wanted
>>>>> >>>>>>>>> to
>>>>> >>>>>>>>>>>>> merge
>>>>> >>>>>>>>>>>>>>>> them back together again I don't see a way to do
>>>>> that. The
>>>>> >>>>>> KIP
>>>>> >>>>>>>>>>>>> proposal
>>>>> >>>>>>>>>>>>>>>> has the same issue, though - all this means is that
>>>> for
>>>>> >>>>>> either
>>>>> >>>>>>>>>>>>> solution,
>>>>> >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the
>>>>> table.
>>>>> >>>>>>>>>>>>>>>> Thanks,
>>>>> >>>>>>>>>>>>>>>> Paul
>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
>>>>> >>>>>>>>> [email protected] <mailto:[email protected]>>
>>>>> >>>>>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to
>>>> this
>>>>> >>>>>> point.
>>>>> >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that
>>>>> branch API
>>>>> >>>>>>> needs
>>>>> >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..)
>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..)
>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional
>>>>> >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf
>>>>> returns
>>>>> >>>>>> its
>>>>> >>>>>>>>> argument
>>>>> >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code
>>>> won't
>>>>> >>>>> make
>>>>> >>>>>>>> sense
>>>>> >>>>>>>>>>>>> until
>>>>> >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher
>>>>> instance
>>>>> >>>>>>>>> contrasts the
>>>>> >>>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> stream
>>>>> >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...)
>>>>> >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...)
>>>>> >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both
>>>>> >>>>>>>>> defaultBranch(..)
>>>>> >>>>>>>>>>>>> and
>>>>> >>>>>>>>>>>>>>>>> noDefault() return void
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface
>>>> is
>>>>> >>>>>> defined.
>>>>> >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
>>>>> >>>>>>>> (defaultBranch(ks->)
>>>>> >>>>>>>>> and
>>>>> >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to
>>>>> miss the
>>>>> >>>>>> fact
>>>>> >>>>>>>>> that one
>>>>> >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these
>>>>> methods
>>>>> >>>>>> are
>>>>> >>>>>>>> not
>>>>> >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do
>>>> better?
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> Regards,
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> Ivan
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>> >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>> >>>>>>>>>>>>>>>>>>> Paul,
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking about
>>>>> >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
>>>>> implemented the
>>>>> >>>>>>> easy
>>>>> >>>>>>>>> way.
>>>>> >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
>>>> assumes
>>>>> >>>>> nothing
>>>>> >>>>>>>> will
>>>>> >>>>>>>>>>>>> reach
>>>>> >>>>>>>>>>>>>>>>>>>> the default branch,
>>>>> >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only
>>>> option
>>>>> >>>>>> besides
>>>>> >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we
>>>>> want to
>>>>> >>>>>> just
>>>>> >>>>>>>>> silently
>>>>> >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
>>>>> predicate. 2)
>>>>> >>>>>>> Throwing
>>>>> >>>>>>>>> an
>>>>> >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing
>>>>> looks
>>>>> >>>>>> like a
>>>>> >>>>>>>> bad
>>>>> >>>>>>>>>>>>> idea.
>>>>> >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to
>>>>> emit a
>>>>> >>>>>>>> special
>>>>> >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly
>>>> where
>>>>> >>>>>>> `default`
>>>>> >>>>>>>>> can
>>>>> >>>>>>>>>>>>> be
>>>>> >>>>>>>>>>>>>>>>>>> used.
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>> >>>>> InternalTopologyBuilder
>>>>> >>>>>>> to
>>>>> >>>>>>>>> track
>>>>> >>>>>>>>>>>>>>>>>>>> dangling
>>>>> >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>> a clear
>>>>> >>>>>>> error
>>>>> >>>>>>>>>>>>> before it
>>>>> >>>>>>>>>>>>>>>>>>> becomes an issue.
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
>>>>> >>>>> compiled
>>>>> >>>>>>> and
>>>>> >>>>>>>>> run?
>>>>> >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't
>>>>> compile if
>>>>> >>>>> used
>>>>> >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
>>>>> method chain
>>>>> >>>>>>>> starting
>>>>> >>>>>>>>>>>>> from
>>>>> >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference
>>>>> between
>>>>> >>>>>>>> runtime
>>>>> >>>>>>>>> and
>>>>> >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
>>>>> >>>>> instantly
>>>>> >>>>>> on
>>>>> >>>>>>>>> unit
>>>>> >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a
>>>>> compilation
>>>>> >>>>>>>> failure.
>>>>> >>>>>>>>>>>>>>>>>>> Regards,
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> Ivan
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>> >>>>>>>>>>>>>>>>>>>> Ivan,
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being
>>>>> required.
>>>>> >>>>>>> But
>>>>> >>>>>>>> is
>>>>> >>>>>>>>>>>>> that
>>>>> >>>>>>>>>>>>>>>>>>>> really
>>>>> >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't want a
>>>>> >>>>>> defaultBranch
>>>>> >>>>>>>>> they
>>>>> >>>>>>>>>>>>> can
>>>>> >>>>>>>>>>>>>>>>>>>> call
>>>>> >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?)
>>>>> just as
>>>>> >>>>>>>>> easily. In
>>>>> >>>>>>>>>>>>>>>>>>>> fact I
>>>>> >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API
>>>> - a
>>>>> >>>>> user
>>>>> >>>>>>>> could
>>>>> >>>>>>>>>>>>> specify
>>>>> >>>>>>>>>>>>>>>>> a
>>>>> >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach
>>>> the
>>>>> >>>>>> default
>>>>> >>>>>>>>> branch,
>>>>> >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>> That
>>>>> >>>>> seems
>>>>> >>>>>>> like
>>>>> >>>>>>>>> an
>>>>> >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API,
>>>>> which allows
>>>>> >>>>>> for
>>>>> >>>>>>>> the
>>>>> >>>>>>>>>>>>> more
>>>>> >>>>>>>>>>>>>>>>>>>> subtle
>>>>> >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
>>>> dropped.
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has
>>>>> to be
>>>>> >>>>>> well
>>>>> >>>>>>>>>>>>>>>>>>>> documented, but
>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>> >>>>> InternalTopologyBuilder
>>>>> >>>>>>> to
>>>>> >>>>>>>>> track
>>>>> >>>>>>>>>>>>>>>>>>>> dangling
>>>>> >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise
>>>>> a clear
>>>>> >>>>>>> error
>>>>> >>>>>>>>>>>>> before it
>>>>> >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that there is
>>>> a
>>>>> >>>>> "build
>>>>> >>>>>>>> step"
>>>>> >>>>>>>>>>>>> where
>>>>> >>>>>>>>>>>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>>> >>>>>> StreamsBuilder.build()
>>>>> >>>>>>> is
>>>>> >>>>>>>>>>>>> called.
>>>>> >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I
>>>> agree
>>>>> >>>>> that
>>>>> >>>>>>> it's
>>>>> >>>>>>>>>>>>>>>>>>>> critical to
>>>>> >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input
>>>>> stream.
>>>>> >>>>>>> With
>>>>> >>>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>> fluent
>>>>> >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all
>>>> other
>>>>> >>>>>>> operations
>>>>> >>>>>>>>> do -
>>>>> >>>>>>>>>>>>> if
>>>>> >>>>>>>>>>>>>>>>> you
>>>>> >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
>>>> multiple
>>>>> >>>>> times,
>>>>> >>>>>>> you
>>>>> >>>>>>>>> just
>>>>> >>>>>>>>>>>>>>>>>>>> need the
>>>>> >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many
>>>>> operations
>>>>> >>>>>> on
>>>>> >>>>>>> it
>>>>> >>>>>>>>> as
>>>>> >>>>>>>>>>>>> you
>>>>> >>>>>>>>>>>>>>>>>>>> desire.
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> Thoughts?
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> Best,
>>>>> >>>>>>>>>>>>>>>>>>>> Paul
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
>>>>> >>>>>>>>> [email protected] <mailto:[email protected]>
>>>>> >>>>>>>>>>>>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not
>>>>> always need
>>>>> >>>>>> the
>>>>> >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
>>>> operation we
>>>>> >>>>> don't
>>>>> >>>>>>>> know
>>>>> >>>>>>>>>>>>> when to
>>>>> >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument,
>>>>> so we
>>>>> >>>>> can
>>>>> >>>>>> do
>>>>> >>>>>>>>>>>>> something
>>>>> >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
>>>> special
>>>>> >>>>> object
>>>>> >>>>>>>>>>>>> construction
>>>>> >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods.
>>>> But
>>>>> >>>>> here
>>>>> >>>>>> we
>>>>> >>>>>>>>> have a
>>>>> >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the
>>>>> flow,
>>>>> >>>>> so
>>>>> >>>>>> I
>>>>> >>>>>>>>> think
>>>>> >>>>>>>>>>>>> this
>>>>> >>>>>>>>>>>>>>>>> is
>>>>> >>>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> Regards,
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>> >>>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this
>>>>> API, but I
>>>>> >>>>>> find
>>>>> >>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>> >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
>>>>> contrasts the
>>>>> >>>>>>> fluency
>>>>> >>>>>>>>> of
>>>>> >>>>>>>>>>>>> other
>>>>> >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like to
>>>>> just call
>>>>> >>>>> a
>>>>> >>>>>>>>> method on
>>>>> >>>>>>>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>>>>>> stream
>>>>> >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch
>>>>> cases
>>>>> >>>>> are
>>>>> >>>>>>>>> defined
>>>>> >>>>>>>>>>>>>>>>>>>>>> fluently.
>>>>> >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase)
>>>>> is very
>>>>> >>>>>> nice
>>>>> >>>>>>>>> and the
>>>>> >>>>>>>>>>>>>>>>>>>>>> right
>>>>> >>>>>>>>>>>>>>>>>>>>> way
>>>>> >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around
>>>>> how we
>>>>> >>>>>>> specify
>>>>> >>>>>>>>> the
>>>>> >>>>>>>>>>>>> source
>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> Like:
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1,
>>>> this::handle1)
>>>>> >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2,
>>>> this::handle2)
>>>>> >>>>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDefault);
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
>>>>> >>>>>>>> KStreamBrancher
>>>>> >>>>>>>>> or
>>>>> >>>>>>>>>>>>>>>>>>>>> something,
>>>>> >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
>>>>> terminated by
>>>>> >>>>>>>>>>>>> defaultBranch()
>>>>> >>>>>>>>>>>>>>>>>>>>>> (which
>>>>> >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously
>>>>> incompatible with
>>>>> >>>>> the
>>>>> >>>>>>>>> current
>>>>> >>>>>>>>>>>>>>>>>>>>>> API, so
>>>>> >>>>>>>>>>>>>>>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a
>>>>> different
>>>>> >>>>>> name,
>>>>> >>>>>>>> but
>>>>> >>>>>>>>> that
>>>>> >>>>>>>>>>>>>>>>>>>>>> seems
>>>>> >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
>>>>> >>>>>> something
>>>>> >>>>>>>> like
>>>>> >>>>>>>>>>>>>>>>>>>>>> branched()
>>>>> >>>>>>>>>>>>>>>>>>>>> or
>>>>> >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your
>>>>> KIP? It
>>>>> >>>>>> seems
>>>>> >>>>>>>>> like it
>>>>> >>>>>>>>>>>>>>>>>>>>>> does to
>>>>> >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching
>>>>> while also
>>>>> >>>>>>>> allowing
>>>>> >>>>>>>>> you
>>>>> >>>>>>>>>>>>> to
>>>>> >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
>>>>> KBranchedStreams
>>>>> >>>>>> if
>>>>> >>>>>>>>> desired.
>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>> >>>>>>>>>>>>>>>>>>>>>> Paul
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
>>>>> >>>>>>>>>>>>>>>>>>>>> <[email protected]>
>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String>
>>>>> ks){
>>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...)
>>>>> >>>>>>>>>>>>>>>>>>>>>>> }
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
>>>>> String> ks){
>>>>> >>>>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey()...
>>>>> >>>>>>>>>>>>>>>>>>>>>>> }
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> ......
>>>>> >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1,
>>>>> this::handleFirstCase)
>>>>> >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2,
>>>>> this::handleSecondCase)
>>>>> >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....)
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
>>>> KafkaStreamsBrancher
>>>>> >>>>> takes a
>>>>> >>>>>>>>> Consumer
>>>>> >>>>>>>>>>>>> as a
>>>>> >>>>>>>>>>>>>>>>>>>>>>> second
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the
>>>>> example in
>>>>> >>>>>> the
>>>>> >>>>>>>> KIP
>>>>> >>>>>>>>>>>>> shows
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> each
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
>>>>> >>>>>>>>> (KafkaStreams#to()
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in this
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> case).
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would
>>>> we
>>>>> >>>>> handle
>>>>> >>>>>>> the
>>>>> >>>>>>>>> case
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> where the
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to
>>>> continue
>>>>> >>>>>>>> processing
>>>>> >>>>>>>>> and
>>>>> >>>>>>>>>>>>> not
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the
>>>> branched
>>>>> >>>>>> stream
>>>>> >>>>>>>>>>>>> immediately?
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if
>>>>> we had
>>>>> >>>>>>>> something
>>>>> >>>>>>>>> like
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck
>>>> <
>>>>> >>>>>>>>> [email protected] <mailto:[email protected]>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for
>>>> KIP-
>>>>> >>>>> 418.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
>>>> KIP-418.
>>>>> >>>>> Please
>>>>> >>>>>>>> take
>>>>> >>>>>>>>> a
>>>>> >>>>>>>>>>>>> look
>>>>> >>>>>>>>>>>>>>>>> at
>>>>> >>>>>>>>>>>>>>>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any
>>>>> feedback :)
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>> >>>>>
>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>>>> >>>>> https://github.com/apache/kafka/pull/6164
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >
>>>>>
>>>>
>
signature.asc
Description: OpenPGP digital signature
