Hey Colin,

Sure, that's understandable, we should keep the project stable first and
foremost. I'll work out the details of the client side reassignment changes
so perhaps we'll have better, clearer picture about this and we'll decide
when we feel the reassignment API can take more changes in :).

Viktor

On Mon, Dec 9, 2019 at 8:16 PM Colin McCabe <cmcc...@apache.org> wrote:

> Hi Viktor,
>
> Thanks for the thoughtful reply.
>
> I'm actually not a PMC on the Apache Kafka project, although I hope to
> become one in the future :)  In any case, the tasks that the PMC tackles
> tend to be more related to things like project policies, committers, legal
> stuff, brand stuff, and so on.  I don't think this discussion about
> reassignment needs to be a PMC discussion.
>
> I think it helps to stay focused on the pain points that users are
> experiencing.  Prior to KIP-455, we had a lot of them.  For example, there
> was no officially supported way to cancel reassignments or change them
> after they were created.  There was no official API so behavior could
> change between versions without much warning.  The tooling needed to depend
> on ZooKeeper, and many tools pulled in internal classes from Kafka.
>
> We've fixed these pain points now with the new API, but it will take time
> for these changes to propagate to all the external tools.  I think it would
> be good to see how the new API works out before we think about another
> major revision.  There will probably be pain points with the new API, but
> it's not completely clear what they will be yet.
>
> best,
> Colin
>
>
> On Fri, Dec 6, 2019, at 08:55, Viktor Somogyi-Vass wrote:
> > Hi Colin,
> >
> > Thanks for the honest answer. As a bottom line I think a better
> > reassignment logic should be included with Kafka somehow instead of
> relying
> > on external tools. It makes the system more mature on its own and
> > standardizes what others implemented many times.
> > Actually I also agree that decoupling this complexity would come with
> > benefits but I haven't looked into this direction yet. And frankly with
> > KIP-500 there will be a lot of changes in the controller itself.
> >
> > My overall vision with this is that there could be an API of some sort at
> > some point where you'd be able to specify how would you want to do your
> > rebalance, what plugins you'd use, what would be the default, etc.. I
> > really don't mind if this functionality is on the client or on the broker
> > as long as it gives a consistent experience for the users. For a better
> > overall user experience though I think this is a basic building block and
> > it made sense to me to put both functionality onto the server side as
> there
> > is nothing much to be configured on batching except partition, replica
> > batch sizes and maybe leader movements. Also users could very well limit
> > the blast radius of a reassignment by applying throttling too where they
> > don't care about the order of reassignments or any special requirements
> > just about the fact that it's sustainable in the cluster and finishes
> > without causing troubles. With small batches and throttling individual
> > partition reassignments finish earlier because you divide the given
> > bandwidth among fewer participants at a time, so it's more effective. By
> > just tweaking these knobs (batch size and throttling) we could achieve a
> > significantly better and more effective rebalance.
> >
> > Policies I think can be somewhat parallel to this as they may require
> some
> > information from metrics and you're right that the controller could
> gather
> > these easier but it's harder to make plugins, although I don't think it's
> > impossible to give a good framework for this. Also reassignment can be
> > extracted from the controller (Stan has already looked into separating
> the
> > reassignment logic from the KafkaController class in #7339) so I think we
> > could come up with a construct that is decoupled from the controller but
> > still resides on the broker side. Furthermore policies could also be
> > dynamically configured if needed.
> >
> > Any change that we make should be backward compatible and I'd default
> them
> > to the current behavior which means external tools wouldn't have to take
> > extra steps but those who don't use any external tools would benefit
> these
> > changes.
> >
> > I think it would be useful to discuss the overall vision on reassignments
> > as it would help us to decide whether this thing goes on the client or
> the
> > broker side so allow me to pose some questions to you and other PMCs as
> I'd
> > like to get some clarity and the overall stance on this so I can align my
> > work better.
> >
> > What would be the overall goal that we want to achieve? Do we want to
> give
> > a solution for automatic partition load balancing or just a framework
> that
> > standardizes what many others implemented? Or do we want to adopt a
> > solution or recommend a specific one?
> > - For the first it might be better to extend the existing tools but for
> the
> > other it might be better to provide an API.
> > - Giving a tool for this makes ramping up easier for newbies and overall
> > gives a better user experience.
> > - Adopting one could be easy as we get a bunch of functionality which are
> > relatively battle proven but I haven't seen developers from any of the
> > tools yet to volunteer. Or perhaps the Kafka PMC group would need to
> decide
> > on a tool (which is waaaay out of my league :) ).
> >
> > An alternative solution is we don't do anything but just suggest users to
> > use any tools. The question then is which tools should they use? Do we
> > recommend them to decide themselves?
> > - The variety of tools and deciding between them may not be easy and
> there
> > might be a number of factors based what functionality do you need, etc..
> > Also contributing to any of the projects might be more cumbersome than
> > contributing to Kafka itself. Therefore there might be benefits for the
> > users in consolidating the tools in Kafka.
> >
> > Would we want to achieve the state where Kafka automatically heals itself
> > through rebalances?
> > - As far as I know currently for most of the an administrator have to
> > initiate these tasks and approve proposals but it might be desirable if
> > Kafka worked on its own where possible. Less maintenance burden.
> >
> > To sum up: I could also lean towards having this on the client side so
> I'll
> > rework the KIP and look into how can we solve the reassignment batching
> > problem the best on the client side and will come back to you. However I
> > think there are important questions above that we need to clear up to set
> > the directions. Also if you think answering the above questions would
> > exceed the boundaries of email communication I can write up a KIP about
> > automatic partition balancing (of course if you can provide the
> directions
> > I should go) and we can have a separate discussion.
> >
> > Thanks,
> > Viktor
> >
> >
> > On Thu, Dec 5, 2019 at 1:23 AM Colin McCabe <co...@cmccabe.xyz> wrote:
> >
> > > Hi Stanislav & Viktor,
> > >
> > > Kafka has historically delegated partition reassignment to external
> > > tools.  For example, the choice of which partitions to reassign is
> > > determined by an external tool.  The choice of what throttle to use is
> also
> > > set externally.
> > >  So I think that putting this into the controller is not very
> consistent
> > > with how Kafka currently works.  It seems better to continue to let
> > > batching be handled by an external tool, rather than handled by the
> > > controller.
> > >
> > > The big advantage of having reassignment done by an external tool is
> that
> > > users are free to set their own reassignment policies without hacking
> Kafka
> > > itself.  So, for example, you can build a reassignment tool that moves
> > > partitions only when they're smaller than 1 GB without writing a KIP
> for a
> > > new reassign.only.if.smaller.than.1g configuration.
> > >
> > > The same is true of batching, I think.  Do you want to set up your
> batch
> > > so that you're always moving a given number of megabytes per second?
> Or
> > > maybe you want to move 3 partitions at all times?  Do you want to move
> the
> > > biggest partitions first?  Etc. etc.  These things are all policy, and
> it's
> > > a lot easier to iterate on the policy when it's not hard-coded in the
> > > controller.  Even changing these configurations at runtime will be very
> > > difficult if they are controller configs.
> > >
> > > Of course, there are disadvantages to having reassignment done by an
> > > external tool.  The external tool is another thing you have to deploy
> and
> > > run.  The tool also needs to collect statistics about the cluster
> which the
> > > controller already mostly has, so there is some duplication there.
> > >
> > > Having some reassignment policies set by the controller and some set
> by an
> > > external tool gets us the worst of both worlds.  Users would need to
> deploy
> > > and run external reassignment tools, but they would also need to worry
> > > about correctly configuring the controller so that it could execute
> their
> > > preferred policies.
> > >
> > > We just got done making a major change to the reassignment API, and
> that
> > > change is still percolating through the environment.  Most external
> tools
> > > haven't been updated yet to use the new API.  I think we should see
> how the
> > > new API works out before making more major changes to reassignment.
> > >
> > > In practice, external reassignment tools already do handle batching.
> For
> > > example, Cruise Control decides what to move, but also divides up the
> work
> > > into batches.  The main functionality that we lack is a way to do this
> with
> > > just the code shipped as part of Apache Kafka itself.
> > >
> > > So, if we want to give users a better out-of-the-box experience, why
> not
> > > add a mode to kafka-reassign-partitions.sh that lets it keep running
> for a
> > > while and breaks down the requested partition moves into batches?  I
> think
> > > this is much more consistent with how the system currently works, and
> much
> > > less likely to create technical debt for the future, than moving
> > > functionality into the controller.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Wed, Dec 4, 2019, at 06:11, Stanislav Kozlovski wrote:
> > > > Hey all,
> > > >
> > > > I spoke offline with Viktor regarding this KIP and intentions on
> > > starting a
> > > > vote soon, so I made another pass on it. I have some comments:
> > > >
> > > > From our past discussion:
> > > >
> > > > > I reiterated on the current algorithm and indeed it would change
> the
> > > order
> > > > of replicas in ZK but wouldn't do a leader election, so one would
> need to
> > > > run the preferred replica election tool. However still in the light
> of
> > > this
> > > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > > anything
> > > > with it.
> > > >
> > > > I think we should mention the change in the KIP at the very least
> such
> > > that
> > > > we can discuss it with other reviewers.
> > > >
> > > > > If I understand correctly, you're suggesting that we count the
> > > > "reassignment.parallel.leader.movements" config against such
> preferred
> > > > elections. I think that makes sense. If we are to do that we should
> > > > explicitly mention that this KIP touches the preferred election
> logic as
> > > > well. Would that config also bound the number of leader movements the
> > > auto
> > > > rebalance inside the broker would do as well?
> > > > > ... It might be easier to limit the scope of this KIP and not
> place a
> > > > bound on preferred leader election.
> > > >
> > > > What are your thoughts on this?
> > > >
> > > > And now comments on the KIP:
> > > >
> > > > 1.
> > > >
> > > >    1. > Update CR in Zookeeper
> > > >    (/brokers/topics/[topic]/partitions/[partitionId]/state) with TR
> for
> > > the
> > > >    given partitions.
> > > >
> > > >
> > > > I believe the leader for the partition is the "owner" of that znode.
> > > Having
> > > > the controller update it may complicate things.
> > > > Has this been updated with the latest KIP-455? We keep reassignment
> state
> > > > in the /brokers/topics/[topic] znode
> > > >
> > > > 2.
> > > >
> > > > How does reassignment cancellation work with these new incremental
> steps?
> > > > In your example, what happens if I cancel the current reassignment
> while
> > > > the replica set is (5, 6, 2, 3, 4)? Where and how would we keep the
> state
> > > > to revert back to (0, 1, 2, 3, 4)?
> > > >
> > > >
> > > > 3.
> > > >
> > > > > The only exception is the first step where (if needed) we add that
> many
> > > > replicas that is enough to fulfil the min.insync.replicas
> requirement set
> > > > on the broker, even if it exceeds the limit on parallel replica
> > > > reassignments. For instance if there are 4 brokers,
> min.insync.replicas
> > > set
> > > > to 3 but there are only 1 in-sync replica, then we immediately add 2
> > > other
> > > > in one step, so the producers are able to continue.
> > > >
> > > > Wouldn't that be adding extra replication traffic and pressure on an
> > > > already problematic partition? I may be missing something but I
> actually
> > > > think that this wouldn't help in common circumstances.
> > > >
> > > > 4.
> > > >
> > > > > Calculate the replicas to be dropped (DR):
> > > >
> > > >    1. Calculate n = max(reassignment.parallel.replica.count,
> size(FTR) -
> > > >    size(CR))
> > > >
> > > >
> > > > Do you mean min() here?
> > > >
> > > > 5.
> > > >
> > > >    1.
> > > >       1. Take the first *reassignment.parallel.replica.count*
> replicas of
> > > >       ER, that will be the set of dropped replicas
> > > >
> > > >
> > > > Do you mean `N replicas of ER` here?
> > > >
> > > > 6.
> > > >
> > > > > Calculate the new replicas (NR) to be added
> > > >
> > > >    1. Calculate that if the partition has enough online replicas to
> > > fulfil
> > > >    the min.insync.replicas config so the producers are able to
> continue.
> > > >    2. If the preferred leader is different in FTR and it is not yet
> > > >    reassigned, then add it to the NR
> > > >    3. If size(NR) < min.insync.replicas then take
> > > min(min.insync.replicas,
> > > >    reassignment.parallel.replica.count) - size(NR) replicas from FTR
> > > >    4. Otherwise take as many replica as
> > > >    *reassignment.parallel.replica.count* allows
> > > >
> > > >
> > > > Not sure I understand this. Wouldn't we always take as many new
> replicas
> > > > (NR) as the ones we are dropping (DR)?
> > > >
> > > > 7.
> > > >
> > > > This new configuration would tell how many replicas of a single
> partition
> > > > can be moved at once.
> > > >
> > > > I believe this description is outdated
> > > >
> > > > 8.
> > > >
> > > > I don't see it as a rejected alternative - have we considered
> delegating
> > > > this logic out to the client and making Kafka accept a series of
> > > > reassignment "steps" to run?
> > > >
> > > >
> > > > Thanks!
> > > > Stanislav
> > > >
> > > > On Wed, Aug 21, 2019 at 12:33 PM Viktor Somogyi-Vass <
> > > > viktorsomo...@gmail.com> wrote:
> > > >
> > > > > Hey Folks,
> > > > >
> > > > > I think I'll open a vote early next week about this if there are no
> > > more
> > > > > feedback.
> > > > >
> > > > > Thanks,
> > > > > Viktor
> > > > >
> > > > > On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <
> > > > > viktorsomo...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey Stanislav,
> > > > > >
> > > > > > I reiterated on the current algorithm and indeed it would change
> the
> > > > > order
> > > > > > of replicas in ZK but wouldn't do a leader election, so one would
> > > need to
> > > > > > run the preferred replica election tool. However still in the
> light
> > > of
> > > > > this
> > > > > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > > > > anything
> > > > > > with it. Changing leadership automatically would result in a
> simpler,
> > > > > > easier and more responsive reassign algorithm which is especially
> > > > > important
> > > > > > if the reassignment is done to free up the broker from under
> heavy
> > > load.
> > > > > > Automated tools (Cruise Control) would also have simpler life.
> > > > > > Let me know what you think.
> > > > > >
> > > > > > Viktor
> > > > > >
> > > > > > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> > > > > > viktorsomo...@gmail.com> wrote:
> > > > > >
> > > > > >> Hi Stan,
> > > > > >>
> > > > > >> I meant the following (maybe rare) scenario - we have replicas
> [1,
> > > 2, 3]
> > > > > >> on
> > > > > >> a lot of partitions and the user runs a massive rebalance to
> change
> > > them
> > > > > >> all to [3, 2, 1]. In the old behavior, I think that this would
> not
> > > do
> > > > > >> anything but simply change the replica set in ZK.
> > > > > >> Then, the user could run kafka-preferred-replica-election.sh on
> a
> > > given
> > > > > >> set
> > > > > >> of partitions to make sure the new leader 3 gets elected.
> > > > > >>
> > > > > >> I thought the old algorithm would elect 3 as the leader in this
> case
> > > > > >> right away at the end but I have to double check this. In any
> case I
> > > > > think
> > > > > >> it would make sense in the new algorithm if we elected the new
> > > preferred
> > > > > >> leader right away, regardless of the new leader is chosen from
> the
> > > > > existing
> > > > > >> replicas or not. If the whole reassignment is in fact just
> changing
> > > the
> > > > > >> replica order then either way it is a simple (trivial)
> operation and
> > > > > doing
> > > > > >> it batched wouldn't slow it down much as there is no data
> movement
> > > > > >> involved. If the reassignment is mixed, meaning it contains
> > > reordering
> > > > > and
> > > > > >> real movement as well then in fact it would help to even out the
> > > load
> > > > > >> faster as data movements can get long. For instance in case of a
> > > > > >> reassignment batch of two partitions concurrently: P1: (1,2,3)
> ->
> > > > > (3,2,1)
> > > > > >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new
> > > leader
> > > > > but
> > > > > >> P1 wouldn't and it wouldn't help the goal of normalizing
> traffic on
> > > > > broker
> > > > > >> 1 that much.
> > > > > >> Again, I'll have to check how the current algorithm works and
> if it
> > > has
> > > > > >> any unknown drawbacks to implement what I sketched up above.
> > > > > >>
> > > > > >> As for generic preferred leader election when called from the
> admin
> > > api
> > > > > >> or the auto leader balance feature I think you're right that we
> > > should
> > > > > >> leave it as it is. It doesn't involve any data movement so it's
> > > fairly
> > > > > fast
> > > > > >> and it normalizes the cluster state quickly.
> > > > > >>
> > > > > >> Viktor
> > > > > >>
> > > > > >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
> > > > > >> stanis...@confluent.io> wrote:
> > > > > >>
> > > > > >>> Hey Viktor,
> > > > > >>>
> > > > > >>>  I think it is intuitive if there are on a global level...If we
> > > applied
> > > > > >>> > them on every batch then we
> > > > > >>> > couldn't really guarantee any limits as the user would be
> able
> > > to get
> > > > > >>> > around it with submitting lots of reassignments.
> > > > > >>>
> > > > > >>>
> > > > > >>> Agreed. Could we mention this explicitly in the KIP?
> > > > > >>>
> > > > > >>> Also if I understand correctly,
> AlterPartitionAssignmentsRequest
> > > would
> > > > > >>> be a
> > > > > >>> > partition level batching, isn't it? So if you submit 10
> > > partitions at
> > > > > >>> once
> > > > > >>> > then they'll all be started by the controller immediately as
> per
> > > my
> > > > > >>> > understanding.
> > > > > >>>
> > > > > >>>
> > > > > >>> Yep, absolutely
> > > > > >>>
> > > > > >>> I've raised the ordering problem on the discussion of KIP-455
> in a
> > > bit
> > > > > >>> > different form and as I remember the verdict there was that
> we
> > > > > >>> shouldn't
> > > > > >>> > expose ordering as an API. It might not be easy as you say
> and
> > > there
> > > > > >>> might
> > > > > >>> > be much better strategies to follow (like disk or network
> > > utilization
> > > > > >>> > goals). Therefore I'll remove this section from the KIP.
> > > > > >>>
> > > > > >>>
> > > > > >>> Sounds good to me.
> > > > > >>>
> > > > > >>> I'm not sure I get this scenario. So are you saying that after
> they
> > > > > >>> > submitted a reassignment they also submit a preferred leader
> > > change?
> > > > > >>> > In my mind what I would do is:
> > > > > >>> > i) make auto.leader.rebalance.enable to obey the leader
> movement
> > > > > limit
> > > > > >>> as
> > > > > >>> > this way it will be easier to calculate the reassignment
> batches.
> > > > > >>> >
> > > > > >>>
> > > > > >>> Sorry, this is my fault -- I should have been more clear.
> > > > > >>> First, I didn't think through this well enough at the time, I
> don't
> > > > > >>> think.
> > > > > >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5,
> 6],
> > > it is
> > > > > >>> obvious that a leader shift will happen. Your KIP proposes we
> shift
> > > > > >>> replicas 1 and 4 first.
> > > > > >>>
> > > > > >>> I meant the following (maybe rare) scenario - we have replicas
> [1,
> > > 2,
> > > > > 3]
> > > > > >>> on
> > > > > >>> a lot of partitions and the user runs a massive rebalance to
> change
> > > > > them
> > > > > >>> all to [3, 2, 1]. In the old behavior, I think that this would
> not
> > > do
> > > > > >>> anything but simply change the replica set in ZK.
> > > > > >>> Then, the user could run kafka-preferred-replica-election.sh
> on a
> > > given
> > > > > >>> set
> > > > > >>> of partitions to make sure the new leader 3 gets elected.
> > > > > >>>
> > > > > >>> ii) the user could submit preferred leader election but it's
> > > basically
> > > > > a
> > > > > >>> > form of reassignment so it would fall under the batching
> > > criterias.
> > > > > If
> > > > > >>> the
> > > > > >>> > batch they submit is smaller than the internal, then it'd be
> > > executed
> > > > > >>> > immediately but otherwise it would be split into more
> batches. It
> > > > > >>> might be
> > > > > >>> > a different behavior as it may not be executed it in one
> batch
> > > but I
> > > > > >>> think
> > > > > >>> > it isn't a problem because we'll default to Int.MAX with the
> > > batches
> > > > > >>> and
> > > > > >>> > otherwise because since it's a form of reassignment I think
> it
> > > makes
> > > > > >>> sense
> > > > > >>> > to apply the same logic.
> > > > > >>>
> > > > > >>>
> > > > > >>> The AdminAPI for that is
> > > > > >>> "electPreferredLeaders(Collection<TopicPartition>
> > > > > >>> partitions)" and the old zNode is "{"partitions": [{"topic":
> "a",
> > > > > >>> "partition": 0}]}" so it is a bit less explicit than our other
> > > > > >>> reassignment
> > > > > >>> API, but the functionality is the same.
> > > > > >>> You're 100% right that it is a form of reassignment, but I
> hadn't
> > > > > thought
> > > > > >>> of it like that and I some other people wouldn't have either.
> > > > > >>> If I understand correctly, you're suggesting that we count the
> > > > > >>> "reassignment.parallel.leader.movements" config against such
> > > preferred
> > > > > >>> elections. I think that makes sense. If we are to do that we
> should
> > > > > >>> explicitly mention that this KIP touches the preferred election
> > > logic
> > > > > as
> > > > > >>> well. Would that config also bound the number of leader
> movements
> > > the
> > > > > >>> auto
> > > > > >>> rebalance inside the broker would do as well?
> > > > > >>>
> > > > > >>> Then again, the "reassignment.parallel.leader.movements" config
> > > has a
> > > > > bit
> > > > > >>> of a different meaning when used in the context of a real
> > > reassignment
> > > > > >>> (where data moves from the leader to N more replicas) versus
> in the
> > > > > >>> preferred election switch (where all we need is two lightweight
> > > > > >>> LeaderAndIsr requests), so I am not entirely convinced we may
> want
> > > to
> > > > > use
> > > > > >>> the same config for that.
> > > > > >>> It might be easier to limit the scope of this KIP and not
> place a
> > > bound
> > > > > >>> on
> > > > > >>> preferred leader election.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Stanislav
> > > > > >>>
> > > > > >>>
> > > > > >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> > > > > >>> viktorsomo...@gmail.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>> > Hey Stanislav,
> > > > > >>> >
> > > > > >>> > Thanks for the thorough look at the KIP! :)
> > > > > >>> >
> > > > > >>> > > Let me first explicitly confirm my understanding of the
> > > configs and
> > > > > >>> the
> > > > > >>> > > algorithm:
> > > > > >>> > > * reassignment.parallel.replica.count - the maximum total
> > > number of
> > > > > >>> > > replicas that we can move at once, *per partition*
> > > > > >>> > > * reassignment.parallel.partition.count - the maximum
> number of
> > > > > >>> > partitions
> > > > > >>> > > we can move at once
> > > > > >>> > > * reassignment.parallel.leader.movements - the maximum
> number
> > > of
> > > > > >>> leader
> > > > > >>> > > movements we can have at once
> > > > > >>> >
> > > > > >>> > Yes.
> > > > > >>> >
> > > > > >>> > > As far as I currently understand it, your proposed
> algorithm
> > > will
> > > > > >>> > naturally
> > > > > >>> > > prioritize leader movement first. e.g if
> > > > > >>> > > reassignment.parallel.replica.count=1 and
> > > > > >>> > >
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > > > >>> > > we will always move one, the first possible, replica in the
> > > replica
> > > > > >>> set
> > > > > >>> > > (which will be the leader if part of the excess replica set
> > > (ER)).
> > > > > >>> > > Am I correct in saying that?
> > > > > >>> >
> > > > > >>> > Yes.
> > > > > >>> >
> > > > > >>> > > 1. Does it make sense to add `max` somewhere in the
> configs'
> > > names?
> > > > > >>> >
> > > > > >>> > If you imply that it's not always an exact number (because
> the
> > > last
> > > > > >>> batch
> > > > > >>> > could be smaller) than I think it's a good idea. I don't mind
> > > this
> > > > > >>> naming
> > > > > >>> > or having max in it either.
> > > > > >>> >
> > > > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> > > multiple
> > > > > >>> > > rebalances - do the configs apply on a
> > > > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > > > >>> >
> > > > > >>> > I think it is intuitive if there are on a global level and
> the
> > > user
> > > > > >>> would
> > > > > >>> > have the option to set KIP-435's settings to Int.MAX and thus
> > > > > >>> eliminate any
> > > > > >>> > batching and submit their own batches through
> > > > > >>> > AlterPartitionAssignmentsRequest. If we applied them on every
> > > batch
> > > > > >>> then we
> > > > > >>> > couldn't really guarantee any limits as the user would be
> able
> > > to get
> > > > > >>> > around it with submitting lots of reassignments. By default
> > > though we
> > > > > >>> set
> > > > > >>> > the batching limits to Int.MAX so effectively they're
> disabled.
> > > > > >>> > Also if I understand correctly,
> AlterPartitionAssignmentsRequest
> > > > > would
> > > > > >>> be a
> > > > > >>> > partition level batching, isn't it? So if you submit 10
> > > partitions at
> > > > > >>> once
> > > > > >>> > then they'll all be started by the controller immediately as
> per
> > > my
> > > > > >>> > understanding.
> > > > > >>> >
> > > > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> > > account
> > > > > >>> > > `reassignment.parallel.leader.movements`
> > > > > >>> >
> > > > > >>> > Yea the reassignment step calculation doesn't contain that
> > > because it
> > > > > >>> > describes only one partition's reassignment step
> calculation. We
> > > > > simply
> > > > > >>> > fill our reassignment batch with as many leader movements as
> we
> > > can
> > > > > and
> > > > > >>> > then fill the rest with reassignments which don't require
> leader
> > > > > >>> movement
> > > > > >>> > if we have space. I'll create a pseudo code block on the KIP
> for
> > > > > this.
> > > > > >>> >
> > > > > >>> > > 4. The KIP says that the order of the input has some
> control
> > > over
> > > > > >>> how the
> > > > > >>> > > batches are created - i.e it's deterministic. What would
> the
> > > > > batches
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > following reassignment look like:
> > > > > >>> > > reassignment.parallel.replica.count=1
> > > > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > > > >>> > > reassignment.parallel.leader.movements=1
> > > > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > From my understanding, we would start with A(0->3),
> B(1->4) and
> > > > > >>> C(1->4).
> > > > > >>> > Is
> > > > > >>> > > that correct? Would the second step then continue with
> B(0->3)?
> > > > > >>> > >
> > > > > >>> > > If the configurations are global, I can imagine we will
> have a
> > > bit
> > > > > >>> more
> > > > > >>> > > trouble in preserving the expected ordering, especially
> across
> > > > > >>> controller
> > > > > >>> > > failovers -- but I'll avoid speculating until you confirm
> the
> > > scope
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > config.
> > > > > >>> >
> > > > > >>> > I've raised the ordering problem on the discussion of
> KIP-455 in
> > > a
> > > > > bit
> > > > > >>> > different form and as I remember the verdict there was that
> we
> > > > > >>> shouldn't
> > > > > >>> > expose ordering as an API. It might not be easy as you say
> and
> > > there
> > > > > >>> might
> > > > > >>> > be much better strategies to follow (like disk or network
> > > utilization
> > > > > >>> > goals). Therefore I'll remove this section from the KIP.
> > > (Otherwise
> > > > > >>> yes, I
> > > > > >>> > would have thought of the same behavior what you described.)
> > > > > >>> > Since #3 also was about the pseudo code, it would be
> something
> > > like
> > > > > >>> this:
> > > > > >>> > Config:
> > > > > >>> > reassignment.parallel.replica.count=R
> > > > > >>> > reassignment.parallel.partition.count=P
> > > > > >>> > reassignment.parallel.leader.movements=L
> > > > > >>> > Code:
> > > > > >>> > val batchSize = max(L,P)
> > > > > >>> > // split the individual partition reassignments whether they
> > > involve
> > > > > >>> leader
> > > > > >>> > movement or not
> > > > > >>> > val partitionMovements =
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> > > > > >>> > // fill the batch with as much leader movements as possible
> and
> > > take
> > > > > >>> the
> > > > > >>> > rest from other reassignments
> > > > > >>> > val currentBatch = if
> (partitionMovements.leaderMovements.size <
> > > > > >>> batchSize)
> > > > > >>> >   partitionMovements.leaderMovements ++
> > > > > >>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> > > > > >>> > partitionMovements.leaderMovements.size)
> > > > > >>> > else
> > > > > >>> >  partitionMovements.leaderMovements.take(batchSize)
> > > > > >>> > executeReassignmentOnBatch(currentBatch)
> > > > > >>> >
> > > > > >>> > > 5. Regarding the new behavior of electing the new preferred
> > > leader
> > > > > >>> in the
> > > > > >>> > > "first step" of the reassignment - does this obey the
> > > > > >>> > > `auto.leader.rebalance.enable` config?
> > > > > >>> > > If not, I have concerns regarding how backwards compatible
> this
> > > > > >>> might be
> > > > > >>> > -
> > > > > >>> > > e.g imagine a user does a huge reassignment (as they have
> > > always
> > > > > >>> done)
> > > > > >>> > and
> > > > > >>> > > suddenly a huge leader shift happens, whereas the user
> > > expected to
> > > > > >>> > manually
> > > > > >>> > > shift preferred leaders at a slower rate via
> > > > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > > > >>> >
> > > > > >>> > I'm not sure I get this scenario. So are you saying that
> after
> > > they
> > > > > >>> > submitted a reassignment they also submit a preferred leader
> > > change?
> > > > > >>> > In my mind what I would do is:
> > > > > >>> > i) make auto.leader.rebalance.enable to obey the leader
> movement
> > > > > limit
> > > > > >>> as
> > > > > >>> > this way it will be easier to calculate the reassignment
> batches.
> > > > > >>> > ii) the user could submit preferred leader election but it's
> > > > > basically
> > > > > >>> a
> > > > > >>> > form of reassignment so it would fall under the batching
> > > criterias.
> > > > > If
> > > > > >>> the
> > > > > >>> > batch they submit is smaller than the internal, then it'd be
> > > executed
> > > > > >>> > immediately but otherwise it would be split into more
> batches. It
> > > > > >>> might be
> > > > > >>> > a different behavior as it may not be executed it in one
> batch
> > > but I
> > > > > >>> think
> > > > > >>> > it isn't a problem because we'll default to Int.MAX with the
> > > batches
> > > > > >>> and
> > > > > >>> > otherwise because since it's a form of reassignment I think
> it
> > > makes
> > > > > >>> sense
> > > > > >>> > to apply the same logic.
> > > > > >>> >
> > > > > >>> > > 6. What is the expected behavior if we dynamically change
> one
> > > of
> > > > > the
> > > > > >>> > > configs to a lower value while a reassignment is happening.
> > > Would
> > > > > we
> > > > > >>> > cancel
> > > > > >>> > > some of the currently reassigned partitions or would we
> > > account for
> > > > > >>> the
> > > > > >>> > new
> > > > > >>> > > values on the next reassignment? I assume the latter but
> it's
> > > good
> > > > > >>> to be
> > > > > >>> > > explicit
> > > > > >>> >
> > > > > >>> > Yep, it would be the latter. I'll make this explicit in the
> KIP
> > > too.
> > > > > >>> >
> > > > > >>> > About the nits:
> > > > > >>> > Noted, will update the KIP to reflect your suggestions :)
> > > > > >>> >
> > > > > >>> > Viktor
> > > > > >>> >
> > > > > >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> > > > > >>> > stanis...@confluent.io>
> > > > > >>> > wrote:
> > > > > >>> > >
> > > > > >>> > > Hey there Viktor,
> > > > > >>> > >
> > > > > >>> > > Thanks for working on this KIP! I agree with the notion
> that
> > > > > >>> reliability,
> > > > > >>> > > stability and predictability of a reassignment should be a
> core
> > > > > >>> feature
> > > > > >>> > of
> > > > > >>> > > Kafka.
> > > > > >>> > >
> > > > > >>> > > Let me first explicitly confirm my understanding of the
> > > configs and
> > > > > >>> the
> > > > > >>> > > algorithm:
> > > > > >>> > > * reassignment.parallel.replica.count - the maximum total
> > > number of
> > > > > >>> > > replicas that we can move at once, *per partition*
> > > > > >>> > > * reassignment.parallel.partition.count - the maximum
> number of
> > > > > >>> > partitions
> > > > > >>> > > we can move at once
> > > > > >>> > > * reassignment.parallel.leader.movements - the maximum
> number
> > > of
> > > > > >>> leader
> > > > > >>> > > movements we can have at once
> > > > > >>> > >
> > > > > >>> > > As far as I currently understand it, your proposed
> algorithm
> > > will
> > > > > >>> > naturally
> > > > > >>> > > prioritize leader movement first. e.g if
> > > > > >>> > > reassignment.parallel.replica.count=1 and
> > > > > >>> > >
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > > > >>> > > we will always move one, the first possible, replica in the
> > > replica
> > > > > >>> set
> > > > > >>> > > (which will be the leader if part of the excess replica set
> > > (ER)).
> > > > > >>> > > Am I correct in saying that?
> > > > > >>> > >
> > > > > >>> > > Regarding the KIP, I've got a couple of
> comments/questions::
> > > > > >>> > >
> > > > > >>> > > 1. Does it make sense to add `max` somewhere in the
> configs'
> > > names?
> > > > > >>> > >
> > > > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> > > multiple
> > > > > >>> > > rebalances - do the configs apply on a
> > > > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > > > >>> > >
> > > > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> > > account
> > > > > >>> > > `reassignment.parallel.leader.movements`
> > > > > >>> > >
> > > > > >>> > > 4. The KIP says that the order of the input has some
> control
> > > over
> > > > > >>> how the
> > > > > >>> > > batches are created - i.e it's deterministic. What would
> the
> > > > > batches
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > following reassignment look like:
> > > > > >>> > > reassignment.parallel.replica.count=1
> > > > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > > > >>> > > reassignment.parallel.leader.movements=1
> > > > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > From my understanding, we would start with A(0->3),
> B(1->4) and
> > > > > >>> C(1->4).
> > > > > >>> > Is
> > > > > >>> > > that correct? Would the second step then continue with
> B(0->3)?
> > > > > >>> > >
> > > > > >>> > > If the configurations are global, I can imagine we will
> have a
> > > bit
> > > > > >>> more
> > > > > >>> > > trouble in preserving the expected ordering, especially
> across
> > > > > >>> controller
> > > > > >>> > > failovers -- but I'll avoid speculating until you confirm
> the
> > > scope
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > config.
> > > > > >>> > >
> > > > > >>> > > 5. Regarding the new behavior of electing the new preferred
> > > leader
> > > > > >>> in the
> > > > > >>> > > "first step" of the reassignment - does this obey the
> > > > > >>> > > `auto.leader.rebalance.enable` config?
> > > > > >>> > > If not, I have concerns regarding how backwards compatible
> this
> > > > > >>> might be
> > > > > >>> > -
> > > > > >>> > > e.g imagine a user does a huge reassignment (as they have
> > > always
> > > > > >>> done)
> > > > > >>> > and
> > > > > >>> > > suddenly a huge leader shift happens, whereas the user
> > > expected to
> > > > > >>> > manually
> > > > > >>> > > shift preferred leaders at a slower rate via
> > > > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > > > >>> > >
> > > > > >>> > > 6. What is the expected behavior if we dynamically change
> one
> > > of
> > > > > the
> > > > > >>> > > configs to a lower value while a reassignment is happening.
> > > Would
> > > > > we
> > > > > >>> > cancel
> > > > > >>> > > some of the currently reassigned partitions or would we
> > > account for
> > > > > >>> the
> > > > > >>> > new
> > > > > >>> > > values on the next reassignment? I assume the latter but
> it's
> > > good
> > > > > >>> to be
> > > > > >>> > > explicit
> > > > > >>> > >
> > > > > >>> > > As some small nits:
> > > > > >>> > > - could we have a section in the KIP where we explicitly
> define
> > > > > what
> > > > > >>> each
> > > > > >>> > > config does? This can be inferred from the KIP as is but
> > > requires
> > > > > >>> careful
> > > > > >>> > > reading, whereas some developers might want to skim
> through the
> > > > > >>> change to
> > > > > >>> > > get a quick sense. It also improves readability but that's
> my
> > > > > >>> personal
> > > > > >>> > > opinion.
> > > > > >>> > > - could you better clarify how a reassignment step is
> different
> > > > > from
> > > > > >>> the
> > > > > >>> > > currently existing algorithm? maybe laying both algorithms
> out
> > > in
> > > > > >>> the KIP
> > > > > >>> > > would be most clear
> > > > > >>> > > - the names for the OngoingPartitionReassignment and
> > > > > >>> > > CurrentPartitionReassignment fields in the
> > > > > >>> > > ListPartitionReassignmentsResponse are a bit confusing to
> me.
> > > > > >>> > > Unfortunately, I don't have a better suggestion, but maybe
> > > somebody
> > > > > >>> else
> > > > > >>> > in
> > > > > >>> > > the community has.
> > > > > >>> > >
> > > > > >>> > > Thanks,
> > > > > >>> > > Stanislav
> > > > > >>> > >
> > > > > >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> > > > > >>> > viktorsomo...@gmail.com>
> > > > > >>> > > wrote:
> > > > > >>> > >
> > > > > >>> > > > Hi All,
> > > > > >>> > > >
> > > > > >>> > > > I've renamed my KIP as its name was a bit confusing so
> we'll
> > > > > >>> continue
> > > > > >>> > it in
> > > > > >>> > > > this thread.
> > > > > >>> > > > The previous thread for record:
> > > > > >>> > > >
> > > > > >>> > > >
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> > > > > >>> > > >
> > > > > >>> > > > A short summary of the KIP:
> > > > > >>> > > > In case of a vast partition reassignment (thousands of
> > > partitions
> > > > > >>> at
> > > > > >>> > once)
> > > > > >>> > > > Kafka can collapse under the increased replication
> traffic.
> > > This
> > > > > >>> KIP
> > > > > >>> > will
> > > > > >>> > > > mitigate it by introducing internal batching done by the
> > > > > >>> controller.
> > > > > >>> > > > Besides putting a bandwidth limit on the replication it
> is
> > > useful
> > > > > >>> to
> > > > > >>> > batch
> > > > > >>> > > > partition movements as fewer number of partitions will
> use
> > > the
> > > > > >>> > available
> > > > > >>> > > > bandwidth for reassignment and they finish faster.
> > > > > >>> > > > The main control handles are:
> > > > > >>> > > > - the number of parallel leader movements,
> > > > > >>> > > > - the number of parallel partition movements
> > > > > >>> > > > - and the number of parallel replica movements.
> > > > > >>> > > >
> > > > > >>> > > > Thank you for the feedback and ideas so far in the
> previous
> > > > > thread
> > > > > >>> and
> > > > > >>> > I'm
> > > > > >>> > > > happy to receive more.
> > > > > >>> > > >
> > > > > >>> > > > Regards,
> > > > > >>> > > > Viktor
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > > --
> > > > > >>> > > Best,
> > > > > >>> > > Stanislav
> > > > > >>> >
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> Best,
> > > > > >>> Stanislav
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
>

Reply via email to