Hello Nick,

Thanks for the re-written KIP! I read through it again, and so far have
just one quick question on top of my head regarding repartitioning: it
seems to me that when there's an intermediate topic inside the recursion
step, then using this new API would basically give us the same behavior as
using the existing `to` APIs. Of course, with the new API the user can make
it more explicit that it is supposed to be recursive, but efficiency wise
it provides no further optimizations. Is my understanding correct? If yes,
I'm wondering if it's worthy the complexity to allow repartitioning inside
the unary operator, or should we just restrict the recursion inside a
single sub-topology.


Guozhang

On Tue, Sep 6, 2022 at 9:05 AM Nick Telford <nick.telf...@gmail.com> wrote:

> Hi everyone,
>
> I've re-written the KIP, with a new design that I think resolves the issues
> you highlighted, and also simplifies usage.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
>
> Note: I'm still working out the "automatic repartitioning" in my head, as I
> don't think it's quite right. It may turn out that the additional overload
> (with the Produced argument) is not necessary.
>
> Thanks for all your feedback so far. Let me know what you think!
>
> Regards,
>
> Nick
>
> On Thu, 25 Aug 2022 at 17:46, Nick Telford <nick.telf...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > The reason I chose to add a new overload of "to", instead of creating a
> > new method, is simply because I felt that "to" was about sending records
> > "to" somewhere, and that "somewhere" just happens to currently be
> > exclusively topics. By re-using "to", we can send records *to other
> > KStreams*, including a KStream from an earlier point in the current
> > KStreams' pipeline, which would facilitate recursion. Sending records to
> a
> > completely different KStream would be essentially a merge.
> >
> > However, I'm happy to reduce the scope of this method to focus
> exclusively
> > on recursion: we'd simply need to add a check in to the method that
> ensures
> > the target is an ancestor node of the current KStream node.
> >
> > Which brings me to your first query...
> >
> > My argument is simply that a 0-ary method isn't enough to facilitate
> > recursive streaming, because you need to be able to communicate which
> point
> > in the process graph you want to feed your records back in to.
> >
> > Consider my example from the KIP, but re-written with a 0-ary
> > "recursively" method:
> >
> > updates
> >     .join(parents, (count, parent) -> { KeyValue(parent, count) })
> >     .recursively()
> >
> > Where does the join output get fed to?
> >
> >    1. The "updates" (source) node?
> >    2. The "join" node itself?
> >
> > It would probably be most intuitive if it simply caused the last step to
> > be recursive, but that won't always be what you want. Consider if we add
> > some more steps in to the above:
> >
> > updates
> >     .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't make
> > sense in this algorithm, but let's pretend it does
> >     .join(parents, (count, parent) -> { KeyValue(parent, count) })
> >     .recursively()
> >
> > If "recursively" just feeds records back into the "join", it misses out
> on
> > potentially important steps in our recursive algorithm. It also gets even
> > worse if the step you're making recursive doesn't contain your terminal
> > condition:
> >
> > foo
> >     .filter((key, value) -> value <= 0) // <-- terminal condition
> >     .mapValues((value) -> value - 1)
> >     .recursively()
> >
> > If "recursively" feeds records back to the "mapValues" stage in our
> > pipeline, and not in to "filter" or "foo", then the terminal condition in
> > "filter" won't be evaluated for any values with a starting value greater
> > than 0, *causing an infinite loop*.
> >
> > There's an argument to be had to always feed the values back to the first
> > ancestor "source node", in the process-graph, but that might not be
> > particularly intuitive, and is likely going to limit some of the
> recursive
> > algorithms that some may want to implement. For example, in the previous
> > example, there's no guarantee that "foo" is a source node; it could be
> the
> > result of a "mapValues", for example.
> >
> > Ultimately, the solution here is to make this method take a parameter,
> > explicitly specifying the KStream that records are fed back in to, making
> > the above two examples:
> >
> > updates
> >     .map((parent, count) -> KeyValue(parent, count + 1))
> >     .join(parents, (count, parent) -> { KeyValue(parent, count) })
> >     .recursively(updates)
> >
> > and:
> >
> > foo
> >     .filter((key, value) -> value <= 0)
> >     .mapValues((value) -> value - 1)
> >     .recursively(foo)
> >
> > We could *also* support a 0-ary version of the method that defaults to
> > recursively executing the previous node, but I'm worried that users may
> not
> > fully understand the consequences of this, inadvertently creating
> infinite
> > loops that are difficult to debug.
> >
> > Finally, I'm not convinced that "recursively" is the best name for the
> > method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome!
> >
> > If we want to prevent this method being "abused" to merge different
> > streams together, it should be trivial to ensure that the provided
> argument
> > is an ancestor of the current node, by recursively traversing up the
> > process graph.
> >
> > I hope this clarifies your questions. It's clear that the KIP needs more
> > work to better elaborate on these points. I haven't had a chance to
> revise
> > it yet, due to more pressing issues with EOS stability that I've been
> > looking into.
> >
> > Regards,
> >
> > Nick
> >
> > On Tue, 23 Aug 2022 at 23:50, Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> >> Hey Nick,
> >>
> >> Sounds like an interesting KIP, and I agree the current way of achieving
> >> this in Streams
> >> seems wildly overcomplicated. So I'm definitely +1 on adding a smooth
> API
> >> that abstracts
> >> away a lot of the complexity and unnecessary topic management.
> >>
> >> That said, I've found much of the discussion so far on the API itself to
> >> be
> >> very confusing -- for example, I don't understand this point:
> >>
> >>  I actually considered a "recursion" API, something
> >> > like you suggested, however it won't work, because to do the recursion
> >> you
> >> > need to know both the end of the KStream that you want to recurse, AND
> >> the
> >> > beginning of the stream you want to feed it back into.
> >>
> >>
> >> As I see it, the internal implementation should be, and is, essentially
> >> independent from the
> >> design of the API itself -- in other words, why does calling this
> >> operator/method `recursion`
> >> not work, or have anything at all to do with what Streams "knows" or how
> >> it
> >> does the actual
> >> recursion? And why would calling it recursion be any different from
> >> calling
> >> it/reusing the existing
> >> `to` operator method?
> >>
> >> On that note, the proposal to reuse the `to` operator for this purpose
> is
> >> the other thing I've found
> >> to be very confusing. Can you expand on why you think `to` would be
> >> appropriate here vs a
> >> dedicated recursion operator? I actually think it would be fairly
> >> misleading to have the `to` operator
> >> do something pretty wildly different depending on what you passed in, I
> >> mean stream recursion seems
> >> quite far removed from its current semantics -- I just don't really see
> >> the
> >> connection.
> >>
> >> so tl;dr why not give this operation its own dedicated operator/method
> >> name, vs reusing an existing operator that does something else?
> >>
> >> Overall though this sounds great, thanks for the KIP!
> >>
> >> Cheers,
> >> Sophie
> >>
> >> On Thu, Aug 18, 2022 at 4:48 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> > Hello Nick,
> >> >
> >> > Thanks for the replies! They are very thoughtful. I think I agree with
> >> you
> >> > that requiring the output stream to a source stream is not sufficient
> >> for a
> >> > valid recursion, and even without the proposed API users today can
> still
> >> > create a broken recursive topology.
> >> >
> >> > Just want to clarify another question:
> >> >
> >> > In our current examples, the linked output stream and input stream are
> >> on
> >> > the same sub-topology, in which case this API allows us to avoid
> >> creating
> >> > unnecessary intermediate topics; when the linked output/input streams
> >> are
> >> > not on the same sub-topology, then using this API would not buy us
> >> > anything, right? E.g.
> >> >
> >> > ```
> >> > stream1 = builder.stream("topic1");
> >> > stream2 = stream1.repartition("topic2");
> >> > stream2.to(stream1)
> >> > ```
> >> >
> >> > Then this API would not buy us anything compared with
> >> >
> >> > ```
> >> > stream1 = builder.stream("topic1");
> >> > stream2 = stream1.repartition("topic2");
> >> > stream2.to("topic1")
> >> > ```
> >> >
> >> > Is that right?
> >> >
> >> > Guozhang
> >> >
> >> >
> >> >
> >> >
> >> > On Wed, Aug 10, 2022 at 11:10 AM Nick Telford <nick.telf...@gmail.com
> >
> >> > wrote:
> >> >
> >> > > Hi everyone,
> >> > >
> >> > > On Guozhang's point 1): I actually considered a "recursion" API,
> >> > something
> >> > > like you suggested, however it won't work, because to do the
> recursion
> >> > you
> >> > > need to know both the end of the KStream that you want to recurse,
> AND
> >> > the
> >> > > beginning of the stream you want to feed it back into. Your proposed
> >> > > "recursive(stream1.join(table))" (which is equivalent to
> >> > > "stream1.join(table).recursively()" etc.) won't work, because the
> >> > > "recursive" function only receives the tail of the stream to feed
> >> back,
> >> > but
> >> > > not the point that it needs to feed back in to. This is the reason
> for
> >> > > using the "to" API overload, as it allows you to instruct Kafka
> >> Streams
> >> > to
> >> > > take the end of a KStream and feed it back into *a specific point*
> in
> >> the
> >> > > process graph. It just so happens that the API has no restriction as
> >> to
> >> > > whether you feed the stream back into one of its own ancestor nodes,
> >> or a
> >> > > completely separate processor node, which is why I kept the
> >> "recursive"
> >> > > terminology out of the method name.
> >> > >
> >> > > I don't think it ultimately matters whether you feed it into a
> sourced
> >> > > stream or not. In your example, the expression
> >> "stream2.mapValues(...)"
> >> > > would loop recursively. Obviously since "mapValues" can't omit
> records
> >> > from
> >> > > its output, this would produce an infinite loop, but other, similar
> >> > > programs would be perfectly valid:
> >> > >
> >> > > stream2 = stream1.mapValues(...)
> >> > > stream3 = stream2.flatMapValues(...)
> >> > > stream3.to(stream2)
> >> > >
> >> > > Provided that the function passed to "flatMapValues" had a terminal
> >> > > condition.
> >> > >
> >> > > While you may worry about users creating infinite recursion loops,
> >> it's
> >> > > worth noting that the same can be said of (most) programming
> >> languages,
> >> > > including Java, but we don't generally consider it a big problem. If
> >> you
> >> > > have any ideas on how we can protect against infinite recursion
> loops,
> >> > that
> >> > > could definitely help. I don't think requiring a "sourced node" at
> the
> >> > > point of recursion would help, as it's ultimately the presence of a
> >> > > terminal condition in the recursive process graph that determines
> >> whether
> >> > > or not it loops infinitely.
> >> > >
> >> > > I'm happy to rewrite the KIP to orient it more around the new
> methods
> >> > > itself, and I'm happy to change the methods being added if you can
> >> come
> >> > up
> >> > > with a better solution :-)
> >> > >
> >> > > Re: 2) thanks for spotting my error, I've already corrected it in
> the
> >> > KIP.
> >> > >
> >> > > Thank you both for your feedback so far. Keep it coming!
> >> > >
> >> > > Regards,
> >> > >
> >> > > Nick
> >> > >
> >> > > On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hello Nick,
> >> > > >
> >> > > > Thanks for bringing this KIP! Just a few thoughts:
> >> > > >
> >> > > > 1) I agree with Sagar that, we'd probably think about two routes
> to
> >> > > > rephrase / restructure the proposal:
> >> > > >
> >> > > > * we can propose a couple of new APIs, and just list "more
> >> > > > convenient recursion" as one of its benefits. Then we'd need to be
> >> > > careful
> >> > > > and consider all possible use scenarios, e.g. what if "other" is
> >> not a
> >> > > > sourced stream, e.g.:
> >> > > >
> >> > > > stream2 = stream1.mapValues(...)
> >> > > > stream3 = stream2.mapValues(...)
> >> > > > stream3.to(stream2)
> >> > > >
> >> > > > Would that be allowed? If yes what's the implementation semantics
> of
> >> > this
> >> > > > code.
> >> > > >
> >> > > > * OR, we propose sth just for more convenient recursion, but then
> we
> >> > > would
> >> > > > need to consider having a more restrictive expressiveness in the
> new
> >> > DSL,
> >> > > > e.g. we'd need to enforce that "other" is a source stream, and
> that
> >> > > "other"
> >> > > > is one of the ancestor of "this", programmatically. Or we can
> think
> >> > > about a
> >> > > > totally different set of new DSL e.g. (I'm just making it up on
> top
> >> of
> >> > my
> >> > > > head for illustration, not really advocating it :P):
> >> > > >
> >> > > > stream1 = stream2.mapValues(...);
> >> > > > stream1 = recursive(stream1.join(table));
> >> > > >
> >> > > > 2) Just a nit comment, it seems in your example, the topic name
> >> should
> >> > > be:
> >> > > >
> >> > > > ```
> >> > > > nodes
> >> > > >     .map((node, parent) -> { KeyValue(parent, 1L) })
> >> > > >     .to("node-updates")
> >> > > >
> >> > > > updates
> >> > > >     .join(parents, (count, parent) -> { KeyValue(parent, count) })
> >> //
> >> > the
> >> > > > root node has no parent, so recursion halts at the root
> >> > > >     .to("node-updates")
> >> > > > ```
> >> > > >
> >> > > > Right?
> >> > > >
> >> > > >
> >> > > > On Sun, Aug 7, 2022 at 7:52 PM Sagar <sagarmeansoc...@gmail.com>
> >> > wrote:
> >> > > >
> >> > > > > Hey Nick,
> >> > > > >
> >> > > > > Since we are adding a new method to the public interface, we
> >> should
> >> > > > > probably decide the necessity of doing so, more so when you say
> >> that
> >> > > it's
> >> > > > > an alternative to something already existing. My suggestion
> would
> >> be
> >> > to
> >> > > > > still modify the KIP around the new API, highlight how it's an
> >> > > > alternative
> >> > > > > to something already existing and why we should add the new API.
> >> You
> >> > > have
> >> > > > > already explained streaming recursion, so that's one added
> >> benefit we
> >> > > get
> >> > > > > as part of the new API. So, try to expand a little bit around
> >> those
> >> > > > points.
> >> > > > > Graph traversal should be fine as an example. You could make it
> >> > > slightly
> >> > > > > more clear.
> >> > > > >
> >> > > > > Let me know if it makes sense.
> >> > > > >
> >> > > > > Thank you for your work on this!
> >> > > > >
> >> > > > > Thanks!
> >> > > > > Sagar.
> >> > > > >
> >> > > > >
> >> > > > > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford <
> >> nick.telf...@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Sagar,
> >> > > > > >
> >> > > > > > Thanks for reading through my proposal.
> >> > > > > >
> >> > > > > > While the 2 new methods were originally intended for the
> >> recursive
> >> > > > > > use-case, they could also be used as an alternative means of
> >> wiring
> >> > > two
> >> > > > > > different KStreams together. The main reason I didn't document
> >> this
> >> > > in
> >> > > > > the
> >> > > > > > KIP is that using the API for this doesn't bring anything new
> to
> >> > the
> >> > > > > table:
> >> > > > > > it's just an alternative form of something that already
> exists.
> >> If
> >> > > you
> >> > > > > > believe it would be helpful, I can document this in more
> >> detail. I
> >> > > can
> >> > > > > > re-orient the KIP around the new methods themselves, but I
> felt
> >> > there
> >> > > > was
> >> > > > > > more value in the KIP emphasizing the new functionality and
> >> > > algorithms
> >> > > > > that
> >> > > > > > they enable.
> >> > > > > >
> >> > > > > > What additional context would you like to see in the KIP? Some
> >> more
> >> > > > > > examples of recursive algorithms that would benefit? A more
> >> > concrete
> >> > > > > > example than generic graph traversal? Something else?
> >> > > > > >
> >> > > > > > Regards,
> >> > > > > >
> >> > > > > > Nick Telford
> >> > > > > >
> >> > > > > > On Fri, 5 Aug 2022 at 11:02, Sagar <sagarmeansoc...@gmail.com
> >
> >> > > wrote:
> >> > > > > >
> >> > > > > > > Hey Nick,
> >> > > > > > >
> >> > > > > > > Thanks for the KIP. This seems like a great addition.
> However,
> >> > just
> >> > > > > > > wondering if the 2 new methods that you plan to add are
> meant
> >> > only
> >> > > > for
> >> > > > > > > streaming recursion? I would imagine they could be
> repurposed
> >> for
> >> > > > other
> >> > > > > > use
> >> > > > > > > cases as well? If yes, then probably the KIP should revolve
> >> > around
> >> > > > the
> >> > > > > > > addition of adding these methods which would btw also
> support
> >> > > > streaming
> >> > > > > > > recursion. IMHO adding 2 new methods just for streaming
> >> recursion
> >> > > > seems
> >> > > > > > > slightly odd to me.
> >> > > > > > >
> >> > > > > > > Also, pardon my ignorance here, but I don't have much
> insight
> >> > into
> >> > > > > > > streaming recursion. You can add some more context to it.
> >> > > > > > >
> >> > > > > > > Thanks!
> >> > > > > > > Sagar.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford <
> >> > > nick.telf...@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi everyone,
> >> > > > > > > >
> >> > > > > > > > URL:
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
> >> > > > > > > >
> >> > > > > > > > Here's a KIP for extending the Streams DSL API to support
> >> > > > "streaming
> >> > > > > > > > recursion". See the Motivation section for details on
> what I
> >> > mean
> >> > > > by
> >> > > > > > > this,
> >> > > > > > > > along with an example of recursively counting nodes in a
> >> graph.
> >> > > > > > > >
> >> > > > > > > > I haven't included changes for the PAPI, mostly because I
> >> don't
> >> > > use
> >> > > > > it,
> >> > > > > > > so
> >> > > > > > > > I'm not as familiar with the idioms there. If you can
> think
> >> of
> >> > a
> >> > > > good
> >> > > > > > > > analogue for a new PAPI method, I'm happy to include it in
> >> the
> >> > > KIP.
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > >
> >> > > > > > > > Nick Telford
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > -- Guozhang
> >> > > >
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
>


-- 
-- Guozhang

Reply via email to