Hi, it seems under the new rebalance protocol in KIP-1071 we'd still have this 
gap for auto-scaling under-partitioned internal topics… If so, would it be 
worthwhile to still make auto-scaling available as an unsafe option in streams, 
under KIP-1071?

Thanks,
Guang

On 2022/12/07 16:24:47 "Matthias J. Sax" wrote:
> Thanks for the background. Was just curious about the details. I agree 
> that we should not add a new backoff config at this point.
> 
> -Matthias
> 
> On 12/2/22 4:47 PM, Sophie Blee-Goldman wrote:
> >>
> >> I missed the default config values as they were put into comments...
> > 
> > You don't read code comments????? (jk...sorry, wasn't sure where the best
> > place for this would be, suppose I could've just included the full config
> > definition
> > 
> > About the default timeout: what is the follow up rebalance cadence (I
> >> though it would be 10 minutes?). For this case, a default timeout of 15
> >> minutes would imply that we only allow a single retry before we hit the
> >> timeout. Would this be sufficient (sounds rather aggressive to me)?
> > 
> > Well no, because we will trigger the followup rebalance for this case
> > immediately
> > after like we do for cooperative rebalances, not 10 minutes later as in the
> > case of
> > probing rebalances. I thought 10 minutes was a rather extreme backoff time
> > that
> > there was no motivation for here, unlike with probing rebalances where
> > we're
> > explicitly giving the clients time to finish warming up tasks and an
> > immediate
> > followup rebalance wouldn't make any sense.
> > 
> > We could of course provide another config for users to tune the backoff
> > time here,
> > but I felt that triggering one right away was justified here -- and we can
> > always add
> > a backoff config in a followup KIP if there is demand for it. But why
> > complicate
> > things for users in the first iteration of this feature when following up
> > right away
> > doesn't cause too much harm -- all other threads can continue processing
> > during
> > the rebalance, and the leader can fit in some processing between
> > rebalances  as
> > well.
> > 
> > Does this sound reasonable to you or would you prefer including the backoff
> > config
> > right off the bat?
> > 
> > On Fri, Dec 2, 2022 at 9:21 AM Matthias J. Sax <[email protected]> wrote:
> > 
> >> Thanks Sophie.
> >>
> >> Good catch on the default partitioner issue!
> >>
> >> I missed the default config values as they were put into comments...
> >>
> >> About the default timeout: what is the follow up rebalance cadence (I
> >> though it would be 10 minutes?). For this case, a default timeout of 15
> >> minutes would imply that we only allow a single retry before we hit the
> >> timeout. Would this be sufficient (sounds rather aggressive to me)?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/2/22 8:00 AM, Sophie Blee-Goldman wrote:
> >>> Thanks again for the responses -- just want to say up front that I
> >> realized
> >>> the concept of a
> >>> default partitioner is actually substantially more complicated than I
> >> first
> >>> assumed due to
> >>> key/value typing, so I pulled it from this KIP and filed a ticket for it
> >>> for now.
> >>>
> >>> Bruno,
> >>>
> >>> What is exactly the motivation behind metric num-autoscaling-failures?
> >>>> Actually, to realise that autoscaling did not work, we only need to
> >>>> monitor subtopology-parallelism over partition.autoscaling.timeout.ms
> >>>> time, right?
> >>>
> >>> That is exactly the motivation -- I imagine some users may want to retry
> >>> indefinitely, and it would not be practical (or very nice) to require
> >> users
> >>> monitor for up to *partition.autoscaling.timeout.ms
> >>> <http://partition.autoscaling.timeout.ms>* when that's been
> >>> configured to MAX_VALUE
> >>>
> >>> Is num-autoscaling-failures a way to verify that Streams went through
> >>>> enough autoscaling attempts during partition.autoscaling.timeout.ms?
> >>>> Could you maybe add one or two sentences on how users should use
> >>>> num-autoscaling-failures?
> >>>
> >>> Not really, for the reason outlined above -- I just figured users might
> >> be
> >>> monitoring how often the autoscaling is failing and alert past some
> >>> threshold
> >>> since this implies something funny is going on. This is more of a "health
> >>> check"
> >>> kind of metric than a "scaling completed" status gauge. At the very
> >> least,
> >>> users will want to know when a failure has occurred, even if it's a
> >> single
> >>> failure,
> >>> no?
> >>>
> >>> Hopefully that makes more sense now, but I suppose I can write something
> >>> like that in
> >>> the KIP too
> >>>
> >>>
> >>> Matthias -- answers inline below:
> >>>
> >>> On Thu, Dec 1, 2022 at 10:44 PM Matthias J. Sax <[email protected]>
> >> wrote:
> >>>
> >>>> Thanks for updating the KIP Sophie.
> >>>>
> >>>> I have the same question as Bruno. How can the user use the failure
> >>>> metric and what actions can be taken to react if the metric increases?
> >>>>
> >>>
> >>> I guess this depends on how important the autoscaling is, but presumably
> >> in
> >>> most cases
> >>> if you see things failing you probably want to at least look into the
> >> logs
> >>> to figure out why
> >>> (for example quota violation), and at the most stop your application
> >> while
> >>> investigating?
> >>>
> >>>
> >>>> Plus a few more:
> >>>>
> >>>> (1) Do we assume that user can reason about `subtopology-parallelism`
> >>>> metric to figure out if auto-scaling is finished? Given that a topology
> >>>> might be complex and the rules to determine the partition count of
> >>>> internal topic are not easy, it might be hard to use?
> >>>>
> >>>> Even if the feature is for advanced users, I don't think we should push
> >>>> the burden to understand the partition count details onto them.
> >>>>
> >>>> We could add a second `target-subtopology-parallelism` metric (or
> >>>> `expected-subtopology-paralleslism` or some other name)? This way, users
> >>>> can compare "target/expected" and "actual" value and easily figure out
> >>>> if some sub-topologies are not expanded yet.
> >>>>
> >>>> Thoughts?
> >>>>
> >>>
> >>> Makes sense to me -- will add a `expected-subtopology-paralleslism`
> >> metric
> >>>
> >>>
> >>>> (2) What are the default values for the newly added configs? It's
> >>>> obvious that `partition.autoscaling.enabled == false` by default, but
> >>>> what timeout would we use?
> >>>>
> >>>
> >>> This is in the KIP already -- look at the config definition
> >>>
> >>>
> >>>> Also, what's the `default.stream.partitioner.class`? Should it be
> >>>> `DefaultStreamPartitioner.class`?
> >>>>
> >>>> Would we fail if auto-scaling is enabled and the default partitioner is
> >>>> not changed (of course only for the case it's used; and if there is
> >>>> state)? -- Not sure what the best behavior is, but the KIP (and docs?)
> >>>> should explain it.
> >>>>
> >>>
> >>> N/A since the default partitioner config was removed
> >>>
> >>> (3)
> >>>>
> >>>>> This will be configurable for users via the new
> >>>> partition.autoscaling.timeout.ms config, which will start counting
> >> after
> >>>> the first failure (rather than when the autoscaling attempt began).
> >>>>
> >>>> If we have interleave failures and partial success (ie, progress to
> >>>> scale out some topic), would the timeout be reset on each success? I
> >>>> think resetting would be good, ie, we only time out if there is no
> >>>> progress at all for the configures timeout period.
> >>>>
> >>>
> >>> Yes, that's what I had in mind -- will add a note to clarify this in the
> >>> doc
> >>>
> >>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 11/28/22 12:25 AM, Bruno Cadonna wrote:
> >>>>> Hi Sophie,
> >>>>>
> >>>>> Thanks for the updates!
> >>>>>
> >>>>> I also feel the KIP is much cleaner now.
> >>>>>
> >>>>> I have one question:
> >>>>> What is exactly the motivation behind metric num-autoscaling-failures?
> >>>>> Actually, to realise that autoscaling did not work, we only need to
> >>>>> monitor subtopology-parallelism over partition.autoscaling.timeout.ms
> >>>>> time, right?
> >>>>> Is num-autoscaling-failures a way to verify that Streams went through
> >>>>> enough autoscaling attempts during partition.autoscaling.timeout.ms?
> >>>>> Could you maybe add one or two sentences on how users should use
> >>>>> num-autoscaling-failures?
> >>>>>
> >>>>> Apart from that, the KIP LGTM!
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>> On 19.11.22 20:33, Sophie Blee-Goldman wrote:
> >>>>>> Thanks for the feedback everyone. I went back to the drawing board
> >> with
> >>>> a
> >>>>>> different guiding
> >>>>>> philosophy: that the users of this feature will generally be fairly
> >>>>>> advanced, and we should
> >>>>>> give them full flexibility to implement whatever they need while
> >>>> trusting
> >>>>>> them to know
> >>>>>> what they are doing.
> >>>>>>
> >>>>>> With this in mind, a lot of my original proposal has been replaced and
> >>>>>> the KIP document
> >>>>>> has been updated with the new details. Rather than addressing each of
> >>>> the
> >>>>>> last questions,
> >>>>>> I'll refer everyone to read the new proposal and just call out some of
> >>>>>> the
> >>>>>> high-level changes.
> >>>>>>
> >>>>>>
> >>>>>> The primary difference is in how we'll expose this feature to users.
> >>>> I've
> >>>>>> opted to remove the
> >>>>>> guardrails and end the discussion on what kinds of applications we
> >>>> should
> >>>>>> allow by introducing
> >>>>>> a feature flag that will be available for everyone. This also has the
> >>>>>> advantage of letting users
> >>>>>> turn the feature on and off.
> >>>>>>
> >>>>>> Another big question was how we can enable users to monitor when
> >> Streams
> >>>>>> has finished
> >>>>>> autoscaling its internal topics. This was the point of the callback on
> >>>>>> the
> >>>>>> new partitioner
> >>>>>> interface in the original proposal, but this is too limiting as
> >>>>>> highlighted
> >>>>>> by some of the above
> >>>>>> examples. Since the point is to let the upstream pipeline logic know
> >>>> when
> >>>>>> it's safe to start
> >>>>>> producing to the new partitions, we should provide external monitoring
> >>>>>> for
> >>>>>> this such as metrics.
> >>>>>>
> >>>>>> The last important question was how to handle failures. This is
> >>>>>> covered in
> >>>>>> more details in the
> >>>>>> KIP, but after thinking the scenario through more carefully I've
> >>>> proposed
> >>>>>> to let Streams retry
> >>>>>> via followup rebalances up until a configurable maximum amount of
> >> time.
> >>>>>>
> >>>>>> Please call out anything you think I missed addressing either in this
> >>>>>> email
> >>>>>> or the updated KIP.
> >>>>>> Thanks to everyone who helped me refine the design of this feature; it
> >>>>>> feels much cleaner now.
> >>>>>>
> >>>>>> Give it a read and let me know what you think!
> >>>>>>
> >>>>>> On Mon, Nov 7, 2022 at 5:45 PM Matthias J. Sax <[email protected]>
> >>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the KIP Sophie. Seems there is a lively discussion going
> >> on.
> >>>>>>> I tried to read up on the history and I hope I don't repeat what was
> >>>>>>> already discussed.
> >>>>>>>
> >>>>>>> And sorry for the quite long email...
> >>>>>>>
> >>>>>>>
> >>>>>>> (1) Stateless vs Stateful
> >>>>>>>
> >>>>>>> I agree that stateless apps should be supported, even if I am not
> >> sure
> >>>>>>> how many stateless app will benefit from it. If an app is stateless,
> >>>> why
> >>>>>>> would one need to repartition to begin with? Stateless apps might
> >> most
> >>>>>>> likely be apps with a single sub-topology and thus don't need this
> >>>>>>> feature to handle input topic scale out. Of course, there could be
> >> some
> >>>>>>> apps with more than one sub-topology and I don't see any reason why
> >> we
> >>>>>>> should not support scaling out those?
> >>>>>>>
> >>>>>>> However, the point being is, that this feature is mainly useful for
> >>>>>>> stateful apps from my understanding.
> >>>>>>>
> >>>>>>>
> >>>>>>> (2) Config
> >>>>>>>
> >>>>>>> I am not sure if using `static.partitioner.class` is a good choice
> >> and
> >>>> I
> >>>>>>> would personally opt for a boolean config. The reason is (as already
> >>>>>>> mentioned by Bruno) that (stateful) apps might have a single
> >>>>>>> sub-topology: for this case, the static partitioning must be enforce
> >>>>>>> upstream already, and Kafka Streams must "just" add a new partition
> >> to
> >>>>>>> the state changelog topics to scale out. It seems odd to force users
> >> to
> >>>>>>> pass in a partitioner that might not be use by the runtime (the only
> >>>>>>> exception might be IQ which might not be used).
> >>>>>>>
> >>>>>>> I also don't understand why we would need to enforce that downstream
> >>>>>>> output topics are using the same static partitioning that the input
> >> or
> >>>>>>> any repartition topics? We don't know anything about the potential
> >>>>>>> chaining of apps, and it's also not clear to me, why the output topic
> >>>>>>> would need to be scaled as claimed (it's a possibility, but I am sure
> >>>>>>> there are many cases for which the output topic is not touched and
> >>>>>>> standard hash/range/random partitioning is used and just fine)? In
> >> the
> >>>>>>> end, it's the users responsibility and we should not enforce
> >> artificial
> >>>>>>> limitations (cf (4) below).
> >>>>>>>
> >>>>>>> I agree that we might want to add a new `default.partitioner` config
> >>>>>>> though to make it simpler for users to change the partitioner
> >> globally
> >>>>>>> instead of one-by-one method overwrites, for the case users need it.
> >>>>>>>
> >>>>>>>
> >>>>>>> (3) StaticPartitioner
> >>>>>>>
> >>>>>>> Do we really need this new interface? The only benefit I see is the
> >>>>>>> added callback `onPartitionExpansion(...)` (but we can add this to
> >>>>>>> existing `StreamPartitioner` interface, too). In particular, I don't
> >>>> see
> >>>>>>> any benefit in adding `staticPartition(...)` method -- if we say it's
> >>>>>>> the users responsibility to implement a static partitioning strategy,
> >>>>>>> they can just implement the existing `partition(...)` method IMHO. I
> >>>>>>> don't see what we gain by the new interface?
> >>>>>>>
> >>>>>>>
> >>>>>>> (3a) About `onPartitionExpansion()`: why do we need to pass in
> >> old/new
> >>>>>>> partition count?
> >>>>>>>
> >>>>>>>
> >>>>>>> (3b) Why should users throw a `TaskMigratedException` if they want to
> >>>>>>> put a record into a non-existing partition? The name seems
> >>>> inappropriate
> >>>>>>> to me.
> >>>>>>>      -> I am also not sure, how this could happen, except for a user
> >>>>>>> error,
> >>>>>>> ie, when the user writes new keys into the input topic before the
> >>>>>>> expansion operation is finished; and for this case it seems ok to
> >> just
> >>>>>>> crash (maybe the user did not even enable the feature or did not
> >> intent
> >>>>>>> to scale the app at all and wrote an "bad key" into the input topic;
> >>>> for
> >>>>>>> the later case, we might end up in an infinite rebalance as the input
> >>>>>>> topic was not scaled to begin with). -- Again, it seems we cannot
> >> (and
> >>>>>>> should not try to) guard the user for this case?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> (4) User Responsibility
> >>>>>>>
> >>>>>>> Using the feature is for advanced users only and they have a lot of
> >>>>>>> responsibility to use it correctly. For stateful single sub-topology
> >>>>>>> cases, their responsibility starts upstream by ensuring that the
> >> input
> >>>>>>> topic is partitioned statically.
> >>>>>>>
> >>>>>>> Thus, I don't understand why we want to disallow any overwrite of the
> >>>>>>> partitioner in the code (and enforce a single partitioner
> >>>>>>> implemenation)? Similar to anything else, it's the user's
> >>>> responsibility
> >>>>>>> to do the correct thing, and it feels like artificial safe-guards to
> >> me
> >>>>>>> to disallow it. I would prefer full flexibility, because if there are
> >>>>>>> 100 ways user can misuse this feature, it does not buy is much to
> >> limit
> >>>>>>> it to 99 ways by those restrictions and it will make the
> >> implementation
> >>>>>>> (for the feature) much simpler if we don't ha
[message truncated...]

Reply via email to