Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io> wrote:
>
> Hello there!
>
> Thanks everyone for the comments. There's a lot of back-and-forth going on,
> so I'll do my best to summarize what everyone's said in TLDR format:
>
> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do similarly
> for the other methods.
> 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> 3. Remove the `earliestOffset` parameter for performance reasons.
>
> If that's all fine with everyone, I'll update the KIP and we—well, mostly
> Edu (:  —will open a PR.
>
> Cheers,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <edu...@littlehorse.io>
> wrote:
>
> > Hello everyone,
> >
> > Thanks for all your feedback for this KIP!
> >
> > I think that the key to choosing proper names for this API is understanding
> > the terms used inside the StoreChangelogReader. Currently, this class has
> > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion,
> > using StandbyUpdateListener for the interface fits better on these terms.
> > Same applies for onUpdateStart/Suspended.
> >
> > StoreChangelogReader uses "the same mechanism" for active task restoration
> > and standby task updates, but this is an implementation detail. Under
> > normal circumstances (no rebalances or task migrations), the changelog
> > reader will be in STANDBY_UPDATING, which means it will be updating standby
> > tasks as long as there are new records in the changelog topic. That's why I
> > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100%
> > align with StateRestoreListener, but either one is fine.
> >
> > Edu
> >
> > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <guozhang.wang...@gmail.com>
> > wrote:
> >
> > > Hello Colt,
> > >
> > > Thanks for writing the KIP! I have read through the updated KIP and
> > > overall it looks great. I only have minor naming comments (well,
> > > aren't naming the least boring stuff to discuss and that takes the
> > > most of the time for KIPs :P):
> > >
> > > 1. I tend to agree with Sophie regarding whether or not to include
> > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > it is also more consistent with the functions of
> > > "StateRestoreListener" where we do not name it as
> > > "onStateRestoreState" etc.
> > >
> > > 2. I know in community discussions we sometimes say "a standby is
> > > promoted to active", but in the official code / java docs we did not
> > > have a term of "promotion", since what the code does is really recycle
> > > the task (while keeping its state stores open), and create a new
> > > active task that takes in the recycled state stores and just changing
> > > the other fields like task type etc. After thinking about this for a
> > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > facing purposes while "recycle" is more of a technical detail inside
> > > the code and could be abstracted away from users. So I feel keeping
> > > the name "PROMOTED" is fine.
> > >
> > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > avoid another call to the Kafka API. And on the other hand, I also
> > > tend to think that such bookkeeping may be better done at the app
> > > level than from the Streams' public API level. I.e. the app could keep
> > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > when we have rolling restart and hence some standby task keeps
> > > "jumping" from one client to another via task assignment, the app
> > > would update this value just one when it finds the
> > > ""topic-partition-store" was never triggered before. What do you
> > > think?
> > >
> > > 4. I do not have a strong opinion either, but what about
> > "onBatchUpdated" ?
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io>
> > wrote:
> > > >
> > > > Sohpie—
> > > >
> > > > Thank you very much for such a detailed review of the KIP. It might
> > > > actually be longer than the original KIP in the first place!
> > > >
> > > > 1. Ack'ed and fixed.
> > > >
> > > > 2. Correct, this is a confusing passage and requires context:
> > > >
> > > > One thing on our list of TODO's regarding reliability is to determine
> > how
> > > > to configure `session.timeout.ms`. In our Kubernetes Environment, an
> > > > instance of our Streams App can be terminated, restarted, and get back
> > > into
> > > > the "RUNNING" Streams state in about 20 seconds. We have two options
> > > here:
> > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20
> > seconds
> > > of
> > > > unavailability for affected partitions, but avoid shuffling Tasks; or
> > b)
> > > > set session.timeout.ms to a low value, such as 6 seconds (
> > > > heartbeat.interval.ms of 2000), and reduce the unavailability window
> > > during
> > > > a rolling bounce but incur an "extra" rebalance. There are several
> > > > different costs to a rebalance, including the shuffling of standby
> > tasks.
> > > > JMX metrics are not fine-grained enough to give us an accurate picture
> > of
> > > > what's going on with the whole Standby Task Shuffle Dance. I
> > hypothesize
> > > > that the Standby Update Listener might help us clarify just how the
> > > > shuffling actually (not theoretically) works, which will help us make a
> > > > more informed decision about the session timeout config.
> > > >
> > > > If you think this is worth putting in the KIP, I'll polish it and do
> > so;
> > > > else, I'll remove the current half-baked explanation.
> > > >
> > > > 3. Overall, I agree with this. In our app, each Task has only one Store
> > > to
> > > > reduce the number of changelog partitions, so I sometimes forget the
> > > > distinction between the two concepts, as reflected in the KIP (:
> > > >
> > > > 3a. I don't like the word "Restore" here, since Restoration refers to
> > an
> > > > Active Task getting caught up in preparation to resume processing.
> > > > `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a
> > > > native Python speaker so I do prefer shorter names anyways (:
> > > >
> > > > 3b1. +1 to removing the word 'Task'.
> > > >
> > > > 3b2. I like `onUpdateStart()`, but with your permission I'd prefer
> > > > `onStandbyUpdateStart()` which matches the name of the Interface
> > > > "StandbyUpdateListener". (the python part of me hates this, however)
> > > >
> > > > 3b3. Going back to question 2), `earliestOffset` was intended to allow
> > us
> > > > to more easily calculate the amount of state _already loaded_ in the
> > > store
> > > > by subtracting (startingOffset - earliestOffset). This would help us
> > see
> > > > how much inefficiency is introduced in a rolling restart—if we end up
> > > going
> > > > from a situation with an up-to-date standby before the restart, and
> > then
> > > > after the whole restart, the Task is shuffled onto an instance where
> > > there
> > > > is no previous state, then that is expensive. However, if the final
> > > > shuffling results in the Task back on an instance with a lot of
> > pre-built
> > > > state, it's not expensive.
> > > >
> > > > If a call over the network is required to determine the earliestOffset,
> > > > then this is a "hard no-go" for me, and we will remove it (I'll have to
> > > > check with Eduwer as he is close to having a working implementation). I
> > > > think we can probably determine what we wanted to see in a different
> > > > way, but it will take more thinking.. If `earliestOffset` is confusing,
> > > > perhaps rename it to `earliestChangelogOffset`?
> > > >
> > > > `startingOffset` is easy to remove as it can be determined from the
> > first
> > > > call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> > > >
> > > > Anyways, I've updated the JavaDoc in the interface; hopefully it's more
> > > > clear. Awaiting further instructions here.
> > > >
> > > > 3c. Good point; after thinking, my preference is `onBatchLoaded()`  ->
> > > > `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`. I am
> > > > less fond of "processed" because when I was first learning Streams I
> > > > mistakenly thought that standby tasks actually processed the input
> > topic
> > > > rather than loaded from the changelog. I'll defer to you here.
> > > >
> > > > 3d. +1 to `onUpdateSuspended()`, or better yet
> > > > `onStandbyUpdateSuspended()`. Will check about the implementation of
> > > > keeping track of the number of records loaded.
> > > >
> > > > 4a. I think this might be best in a separate KIP, especially given that
> > > > this is my and Eduwer's first time contributing to Kafka (so we want to
> > > > minimize the blast radius).
> > > >
> > > > 4b. I might respectfully (and timidly) push back here, RECYCLED for an
> > > > Active Task is a bit confusing to me. DEMOTED and MIGRATED make sense
> > > from
> > > > the standpoint of an Active Task, recycling to me sounds like throwing
> > > > stuff away, such that the resources (i.e. disk space) can be used by a
> > > > separate Task. As an alternative rather than trying to reuse the same
> > > enum,
> > > > maybe rename it to `StandbySuspendReason` to avoid naming conflicts
> > with
> > > > `ActiveSuspendReason`? However, I could be convinced to rename PROMOTED
> > > ->
> > > > RECYCLED, especially if Eduwer agrees.
> > > >
> > > > TLDR:
> > > >
> > > > T1. Agreed, will remove the word "Task" as it's incorrect.
> > > > T2. Will update to `onStandbyUpdateStart()`
> > > > T3. Awaiting further instructions on earliestOffset and startingOffset.
> > > > T4. I don't like `onBatchProcessed()` too much, perhaps
> > > `onBatchLoaded()`?
> > > > T5. Will update to `onStandbyUpdateSuspended()`
> > > > T6. Thoughts on renaming SuspendReason to StandbySuspendReason, rather
> > > than
> > > > renaming PROMOTED to RECYCLED? @Eduwer?
> > > >
> > > > Long Live the Otter,
> > > > Colt McNealy
> > > >
> > > > *Founder, LittleHorse.dev*
> > > >
> > > >
> > > > On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> > > sop...@responsive.dev>
> > > > wrote:
> > > >
> > > > > Hey Colt! Thanks for the KIP -- this will be a great addition to
> > > Streams, I
> > > > > can't believe we've gone so long without this.
> > > > >
> > > > > Overall the proposal makes sense, but I had a handful of fairly minor
> > > > > questions and suggestions/requests
> > > > >
> > > > > 1. Seems like the last sentence in the 2nd paragraph of the
> > Motivation
> > > > > section is cut off and incomplete -- "want to be able to know " what
> > > > > exactly?
> > > > >
> > > > > 2. This isn't that important since the motivation as a whole is clear
> > > to me
> > > > > and convincing enough, but I'm not quite sure I understand the
> > example
> > > at
> > > > > the end of the Motivation section. How are standby tasks (and the
> > > ability
> > > > > to hook into and monitor their status) related to the
> > > session.timeout.ms
> > > > > config?
> > > > >
> > > > > 3. To help both old and new users of Kafka Streams understand this
> > new
> > > > > restore listener and its purpose/semantics, can we try to name the
> > > class
> > > > > and
> > > > >  callbacks in a way that's more consistent with the active task
> > restore
> > > > > listener?
> > > > >
> > > > > 3a. StandbyTaskUpdateListener:
> > > > > The existing restore listener is called StateRestoreListener, so the
> > > new
> > > > > one could be called something like StandbyStateRestoreListener.
> > > Although
> > > > > we typically refer to standby tasks as "processing" rather than
> > > "restoring"
> > > > > records -- ie restoration is a term for active task state
> > > specifically. I
> > > > > actually
> > > > > like the original suggestion if we just drop the "Task" part of the
> > > name,
> > > > > ie StandbyUpdateListener. I think either that or
> > StandbyRestoreListener
> > > > > would be fine and probably the two best options.
> > > > > Also, this probably goes without saying but any change to the name of
> > > this
> > > > > class should of course be reflected in the KafkaStreams#setXXX API as
> > > well
> > > > >
> > > > > 3b. #onTaskCreated
> > > > >  I know the "start" callback feels a bit different for the standby
> > task
> > > > > updater vs an active task beginning restoration, but I think we
> > should
> > > try
> > > > > to
> > > > > keep the various callbacks aligned to their active restore listener
> > > > > counterpart. We can/should just replace the term "restore" with
> > > "update"
> > > > > for the
> > > > > callback method names the same way we do for the class name, which in
> > > this
> > > > > case would give us #onUpdateStart. Personally I like this better,
> > > > > but it's ultimately up to you. However, I would push back against
> > > anything
> > > > > that includes the word "Task" (eg #onTaskCreated) as the listener
> > > > >  is actually not scoped to the task itself but instead to the
> > > individual
> > > > > state store(s). This is the main reason I would prefer calling it
> > > something
> > > > > like #onUpdateStart, which keeps the focus on the store being updated
> > > > > rather than the task that just happens to own this store
> > > > > One last thing on this callback -- do we really need both the
> > > > > `earliestOffset` and `startingOffset`? I feel like this might be more
> > > > > confusing than it
> > > > > is helpful (tbh even I'm not completely sure I know what the
> > > earliestOffset
> > > > > is supposed to represent) More importantly, is this all information
> > > > > that is already available and able to be passed in to the callback by
> > > > > Streams? I haven't checked on this but it feels like the
> > > earliestOffset is
> > > > > likely to require a remote call, either by the embedded consumer or
> > > via the
> > > > > admin client. If so, the ROI on including this parameter seems
> > > > > quite low (if not outright negative)
> > > > >
> > > > > 3c. #onBatchRestored
> > > > > If we opt to use the term "update" in place of "restore" elsewhere,
> > > then we
> > > > > should consider doing so here as well. What do you think about
> > > > > #onBatchUpdated, or even #onBatchProcessed?
> > > > > I'm actually not super concerned about this particular API, and
> > > honestly I
> > > > > think we can use restore or update interchangeably here, so if you
> > > > >  don't like any of the suggested names (and no one can think of
> > > anything
> > > > > better), I would just stick with #onBatchRestored. In this case,
> > > > > it kind of makes the most sense.
> > > > >
> > > > > 3d. #onTaskSuspended
> > > > > Along the same lines as 3b above, #onUpdateSuspended or just
> > > > > #onRestoreSuspended probably makes more sense for this callback.
> > Also,
> > > > >  I notice the StateRestoreListener passes in the total number of
> > > records
> > > > > restored to its #onRestoreSuspended. Assuming we already track
> > > > > that information in Streams and have it readily available to pass in
> > at
> > > > > whatever point we would be invoking this callback, that might be a
> > > > > useful  parameter for the standby listener to have as well
> > > > >
> > > > > 4. I totally love the SuspendReason thing, just two notes/requests:
> > > > >
> > > > > 4a. Feel free to push back against adding onto the scope of this KIP,
> > > but
> > > > > it would be great to expand the active state restore listener with
> > this
> > > > > SuspendReason enum as well. It would be really useful for both
> > > variants of
> > > > > restore listener
> > > > >
> > > > > 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for
> > standby
> > > > > tasks it means basically the same thing, the point is that active
> > > > > tasks can also be recycled into standbys through the same mechanism.
> > > This
> > > > > way they can share the SuspendReason enum -- not that it's
> > > > > necessary for them to share, I just think it would be a good idea to
> > > keep
> > > > > the two restore listeners aligned to the highest degree possible for
> > as
> > > > > we can.
> > > > > I was actually considering proposing a short KIP with a new
> > > > > RecyclingListener (or something) specifically for this exact kind of
> > > thing,
> > > > > since we
> > > > > currently have literally zero insight into the recycling process.
> > It's
> > > > > practically impossible to tell when a store has been converted from
> > > active
> > > > > to
> > > > > standby, or vice versa. So having access to the SuspendReason, and
> > more
> > > > > importantly having a callback guaranteed to notify you when a
> > > > > state store is recycled whether active or standby, would be amazing.
> > > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman
> > > > >
> > > > >
> > > > > ---------- Forwarded message ---------
> > > > > > From: Colt McNealy <c...@littlehorse.io>
> > > > > > Date: Tue, Oct 3, 2023 at 12:48 PM
> > > > > > Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener
> > > > > > To: <dev@kafka.apache.org>
> > > > > >
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > We would like to propose a small KIP to improve the ability of
> > > Streams
> > > > > apps
> > > > > > to monitor the progress of their standby tasks through a callback
> > > > > > interface.
> > > > > >
> > > > > > We have a nearly-working implementation on our fork and are curious
> > > for
> > > > > > feedback.
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> > > > > >
> > > > > > Thank you,
> > > > > > Colt McNealy
> > > > > >
> > > > > > *Founder, LittleHorse.dev*
> > > > > >
> > > > >
> > >
> >

Reply via email to