Hi, it seems under the new rebalance protocol in KIP-1071 we'd still have this gap for auto-scaling under-partitioned internal topics… If so, would it be worthwhile to still make auto-scaling available as an unsafe option in streams, under KIP-1071?
Thanks, Guang On 2022/12/07 16:24:47 "Matthias J. Sax" wrote: > Thanks for the background. Was just curious about the details. I agree > that we should not add a new backoff config at this point. > > -Matthias > > On 12/2/22 4:47 PM, Sophie Blee-Goldman wrote: > >> > >> I missed the default config values as they were put into comments... > > > > You don't read code comments????? (jk...sorry, wasn't sure where the best > > place for this would be, suppose I could've just included the full config > > definition > > > > About the default timeout: what is the follow up rebalance cadence (I > >> though it would be 10 minutes?). For this case, a default timeout of 15 > >> minutes would imply that we only allow a single retry before we hit the > >> timeout. Would this be sufficient (sounds rather aggressive to me)? > > > > Well no, because we will trigger the followup rebalance for this case > > immediately > > after like we do for cooperative rebalances, not 10 minutes later as in the > > case of > > probing rebalances. I thought 10 minutes was a rather extreme backoff time > > that > > there was no motivation for here, unlike with probing rebalances where > > we're > > explicitly giving the clients time to finish warming up tasks and an > > immediate > > followup rebalance wouldn't make any sense. > > > > We could of course provide another config for users to tune the backoff > > time here, > > but I felt that triggering one right away was justified here -- and we can > > always add > > a backoff config in a followup KIP if there is demand for it. But why > > complicate > > things for users in the first iteration of this feature when following up > > right away > > doesn't cause too much harm -- all other threads can continue processing > > during > > the rebalance, and the leader can fit in some processing between > > rebalances as > > well. > > > > Does this sound reasonable to you or would you prefer including the backoff > > config > > right off the bat? > > > > On Fri, Dec 2, 2022 at 9:21 AM Matthias J. Sax <[email protected]> wrote: > > > >> Thanks Sophie. > >> > >> Good catch on the default partitioner issue! > >> > >> I missed the default config values as they were put into comments... > >> > >> About the default timeout: what is the follow up rebalance cadence (I > >> though it would be 10 minutes?). For this case, a default timeout of 15 > >> minutes would imply that we only allow a single retry before we hit the > >> timeout. Would this be sufficient (sounds rather aggressive to me)? > >> > >> > >> -Matthias > >> > >> On 12/2/22 8:00 AM, Sophie Blee-Goldman wrote: > >>> Thanks again for the responses -- just want to say up front that I > >> realized > >>> the concept of a > >>> default partitioner is actually substantially more complicated than I > >> first > >>> assumed due to > >>> key/value typing, so I pulled it from this KIP and filed a ticket for it > >>> for now. > >>> > >>> Bruno, > >>> > >>> What is exactly the motivation behind metric num-autoscaling-failures? > >>>> Actually, to realise that autoscaling did not work, we only need to > >>>> monitor subtopology-parallelism over partition.autoscaling.timeout.ms > >>>> time, right? > >>> > >>> That is exactly the motivation -- I imagine some users may want to retry > >>> indefinitely, and it would not be practical (or very nice) to require > >> users > >>> monitor for up to *partition.autoscaling.timeout.ms > >>> <http://partition.autoscaling.timeout.ms>* when that's been > >>> configured to MAX_VALUE > >>> > >>> Is num-autoscaling-failures a way to verify that Streams went through > >>>> enough autoscaling attempts during partition.autoscaling.timeout.ms? > >>>> Could you maybe add one or two sentences on how users should use > >>>> num-autoscaling-failures? > >>> > >>> Not really, for the reason outlined above -- I just figured users might > >> be > >>> monitoring how often the autoscaling is failing and alert past some > >>> threshold > >>> since this implies something funny is going on. This is more of a "health > >>> check" > >>> kind of metric than a "scaling completed" status gauge. At the very > >> least, > >>> users will want to know when a failure has occurred, even if it's a > >> single > >>> failure, > >>> no? > >>> > >>> Hopefully that makes more sense now, but I suppose I can write something > >>> like that in > >>> the KIP too > >>> > >>> > >>> Matthias -- answers inline below: > >>> > >>> On Thu, Dec 1, 2022 at 10:44 PM Matthias J. Sax <[email protected]> > >> wrote: > >>> > >>>> Thanks for updating the KIP Sophie. > >>>> > >>>> I have the same question as Bruno. How can the user use the failure > >>>> metric and what actions can be taken to react if the metric increases? > >>>> > >>> > >>> I guess this depends on how important the autoscaling is, but presumably > >> in > >>> most cases > >>> if you see things failing you probably want to at least look into the > >> logs > >>> to figure out why > >>> (for example quota violation), and at the most stop your application > >> while > >>> investigating? > >>> > >>> > >>>> Plus a few more: > >>>> > >>>> (1) Do we assume that user can reason about `subtopology-parallelism` > >>>> metric to figure out if auto-scaling is finished? Given that a topology > >>>> might be complex and the rules to determine the partition count of > >>>> internal topic are not easy, it might be hard to use? > >>>> > >>>> Even if the feature is for advanced users, I don't think we should push > >>>> the burden to understand the partition count details onto them. > >>>> > >>>> We could add a second `target-subtopology-parallelism` metric (or > >>>> `expected-subtopology-paralleslism` or some other name)? This way, users > >>>> can compare "target/expected" and "actual" value and easily figure out > >>>> if some sub-topologies are not expanded yet. > >>>> > >>>> Thoughts? > >>>> > >>> > >>> Makes sense to me -- will add a `expected-subtopology-paralleslism` > >> metric > >>> > >>> > >>>> (2) What are the default values for the newly added configs? It's > >>>> obvious that `partition.autoscaling.enabled == false` by default, but > >>>> what timeout would we use? > >>>> > >>> > >>> This is in the KIP already -- look at the config definition > >>> > >>> > >>>> Also, what's the `default.stream.partitioner.class`? Should it be > >>>> `DefaultStreamPartitioner.class`? > >>>> > >>>> Would we fail if auto-scaling is enabled and the default partitioner is > >>>> not changed (of course only for the case it's used; and if there is > >>>> state)? -- Not sure what the best behavior is, but the KIP (and docs?) > >>>> should explain it. > >>>> > >>> > >>> N/A since the default partitioner config was removed > >>> > >>> (3) > >>>> > >>>>> This will be configurable for users via the new > >>>> partition.autoscaling.timeout.ms config, which will start counting > >> after > >>>> the first failure (rather than when the autoscaling attempt began). > >>>> > >>>> If we have interleave failures and partial success (ie, progress to > >>>> scale out some topic), would the timeout be reset on each success? I > >>>> think resetting would be good, ie, we only time out if there is no > >>>> progress at all for the configures timeout period. > >>>> > >>> > >>> Yes, that's what I had in mind -- will add a note to clarify this in the > >>> doc > >>> > >>> > >>>> -Matthias > >>>> > >>>> > >>>> On 11/28/22 12:25 AM, Bruno Cadonna wrote: > >>>>> Hi Sophie, > >>>>> > >>>>> Thanks for the updates! > >>>>> > >>>>> I also feel the KIP is much cleaner now. > >>>>> > >>>>> I have one question: > >>>>> What is exactly the motivation behind metric num-autoscaling-failures? > >>>>> Actually, to realise that autoscaling did not work, we only need to > >>>>> monitor subtopology-parallelism over partition.autoscaling.timeout.ms > >>>>> time, right? > >>>>> Is num-autoscaling-failures a way to verify that Streams went through > >>>>> enough autoscaling attempts during partition.autoscaling.timeout.ms? > >>>>> Could you maybe add one or two sentences on how users should use > >>>>> num-autoscaling-failures? > >>>>> > >>>>> Apart from that, the KIP LGTM! > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> On 19.11.22 20:33, Sophie Blee-Goldman wrote: > >>>>>> Thanks for the feedback everyone. I went back to the drawing board > >> with > >>>> a > >>>>>> different guiding > >>>>>> philosophy: that the users of this feature will generally be fairly > >>>>>> advanced, and we should > >>>>>> give them full flexibility to implement whatever they need while > >>>> trusting > >>>>>> them to know > >>>>>> what they are doing. > >>>>>> > >>>>>> With this in mind, a lot of my original proposal has been replaced and > >>>>>> the KIP document > >>>>>> has been updated with the new details. Rather than addressing each of > >>>> the > >>>>>> last questions, > >>>>>> I'll refer everyone to read the new proposal and just call out some of > >>>>>> the > >>>>>> high-level changes. > >>>>>> > >>>>>> > >>>>>> The primary difference is in how we'll expose this feature to users. > >>>> I've > >>>>>> opted to remove the > >>>>>> guardrails and end the discussion on what kinds of applications we > >>>> should > >>>>>> allow by introducing > >>>>>> a feature flag that will be available for everyone. This also has the > >>>>>> advantage of letting users > >>>>>> turn the feature on and off. > >>>>>> > >>>>>> Another big question was how we can enable users to monitor when > >> Streams > >>>>>> has finished > >>>>>> autoscaling its internal topics. This was the point of the callback on > >>>>>> the > >>>>>> new partitioner > >>>>>> interface in the original proposal, but this is too limiting as > >>>>>> highlighted > >>>>>> by some of the above > >>>>>> examples. Since the point is to let the upstream pipeline logic know > >>>> when > >>>>>> it's safe to start > >>>>>> producing to the new partitions, we should provide external monitoring > >>>>>> for > >>>>>> this such as metrics. > >>>>>> > >>>>>> The last important question was how to handle failures. This is > >>>>>> covered in > >>>>>> more details in the > >>>>>> KIP, but after thinking the scenario through more carefully I've > >>>> proposed > >>>>>> to let Streams retry > >>>>>> via followup rebalances up until a configurable maximum amount of > >> time. > >>>>>> > >>>>>> Please call out anything you think I missed addressing either in this > >>>>>> email > >>>>>> or the updated KIP. > >>>>>> Thanks to everyone who helped me refine the design of this feature; it > >>>>>> feels much cleaner now. > >>>>>> > >>>>>> Give it a read and let me know what you think! > >>>>>> > >>>>>> On Mon, Nov 7, 2022 at 5:45 PM Matthias J. Sax <[email protected]> > >>>> wrote: > >>>>>> > >>>>>>> Thanks for the KIP Sophie. Seems there is a lively discussion going > >> on. > >>>>>>> I tried to read up on the history and I hope I don't repeat what was > >>>>>>> already discussed. > >>>>>>> > >>>>>>> And sorry for the quite long email... > >>>>>>> > >>>>>>> > >>>>>>> (1) Stateless vs Stateful > >>>>>>> > >>>>>>> I agree that stateless apps should be supported, even if I am not > >> sure > >>>>>>> how many stateless app will benefit from it. If an app is stateless, > >>>> why > >>>>>>> would one need to repartition to begin with? Stateless apps might > >> most > >>>>>>> likely be apps with a single sub-topology and thus don't need this > >>>>>>> feature to handle input topic scale out. Of course, there could be > >> some > >>>>>>> apps with more than one sub-topology and I don't see any reason why > >> we > >>>>>>> should not support scaling out those? > >>>>>>> > >>>>>>> However, the point being is, that this feature is mainly useful for > >>>>>>> stateful apps from my understanding. > >>>>>>> > >>>>>>> > >>>>>>> (2) Config > >>>>>>> > >>>>>>> I am not sure if using `static.partitioner.class` is a good choice > >> and > >>>> I > >>>>>>> would personally opt for a boolean config. The reason is (as already > >>>>>>> mentioned by Bruno) that (stateful) apps might have a single > >>>>>>> sub-topology: for this case, the static partitioning must be enforce > >>>>>>> upstream already, and Kafka Streams must "just" add a new partition > >> to > >>>>>>> the state changelog topics to scale out. It seems odd to force users > >> to > >>>>>>> pass in a partitioner that might not be use by the runtime (the only > >>>>>>> exception might be IQ which might not be used). > >>>>>>> > >>>>>>> I also don't understand why we would need to enforce that downstream > >>>>>>> output topics are using the same static partitioning that the input > >> or > >>>>>>> any repartition topics? We don't know anything about the potential > >>>>>>> chaining of apps, and it's also not clear to me, why the output topic > >>>>>>> would need to be scaled as claimed (it's a possibility, but I am sure > >>>>>>> there are many cases for which the output topic is not touched and > >>>>>>> standard hash/range/random partitioning is used and just fine)? In > >> the > >>>>>>> end, it's the users responsibility and we should not enforce > >> artificial > >>>>>>> limitations (cf (4) below). > >>>>>>> > >>>>>>> I agree that we might want to add a new `default.partitioner` config > >>>>>>> though to make it simpler for users to change the partitioner > >> globally > >>>>>>> instead of one-by-one method overwrites, for the case users need it. > >>>>>>> > >>>>>>> > >>>>>>> (3) StaticPartitioner > >>>>>>> > >>>>>>> Do we really need this new interface? The only benefit I see is the > >>>>>>> added callback `onPartitionExpansion(...)` (but we can add this to > >>>>>>> existing `StreamPartitioner` interface, too). In particular, I don't > >>>> see > >>>>>>> any benefit in adding `staticPartition(...)` method -- if we say it's > >>>>>>> the users responsibility to implement a static partitioning strategy, > >>>>>>> they can just implement the existing `partition(...)` method IMHO. I > >>>>>>> don't see what we gain by the new interface? > >>>>>>> > >>>>>>> > >>>>>>> (3a) About `onPartitionExpansion()`: why do we need to pass in > >> old/new > >>>>>>> partition count? > >>>>>>> > >>>>>>> > >>>>>>> (3b) Why should users throw a `TaskMigratedException` if they want to > >>>>>>> put a record into a non-existing partition? The name seems > >>>> inappropriate > >>>>>>> to me. > >>>>>>> -> I am also not sure, how this could happen, except for a user > >>>>>>> error, > >>>>>>> ie, when the user writes new keys into the input topic before the > >>>>>>> expansion operation is finished; and for this case it seems ok to > >> just > >>>>>>> crash (maybe the user did not even enable the feature or did not > >> intent > >>>>>>> to scale the app at all and wrote an "bad key" into the input topic; > >>>> for > >>>>>>> the later case, we might end up in an infinite rebalance as the input > >>>>>>> topic was not scaled to begin with). -- Again, it seems we cannot > >> (and > >>>>>>> should not try to) guard the user for this case? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> (4) User Responsibility > >>>>>>> > >>>>>>> Using the feature is for advanced users only and they have a lot of > >>>>>>> responsibility to use it correctly. For stateful single sub-topology > >>>>>>> cases, their responsibility starts upstream by ensuring that the > >> input > >>>>>>> topic is partitioned statically. > >>>>>>> > >>>>>>> Thus, I don't understand why we want to disallow any overwrite of the > >>>>>>> partitioner in the code (and enforce a single partitioner > >>>>>>> implemenation)? Similar to anything else, it's the user's > >>>> responsibility > >>>>>>> to do the correct thing, and it feels like artificial safe-guards to > >> me > >>>>>>> to disallow it. I would prefer full flexibility, because if there are > >>>>>>> 100 ways user can misuse this feature, it does not buy is much to > >> limit > >>>>>>> it to 99 ways by those restrictions and it will make the > >> implementation > >>>>>>> (for the feature) much simpler if we don't ha [message truncated...]
