Hey Folks,

I think I'll open a vote early next week about this if there are no more
feedback.

Thanks,
Viktor

On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <viktorsomo...@gmail.com>
wrote:

> Hey Stanislav,
>
> I reiterated on the current algorithm and indeed it would change the order
> of replicas in ZK but wouldn't do a leader election, so one would need to
> run the preferred replica election tool. However still in the light of this
> I'm not sure I'd keep the existing behavior as users wouldn't win anything
> with it. Changing leadership automatically would result in a simpler,
> easier and more responsive reassign algorithm which is especially important
> if the reassignment is done to free up the broker from under heavy load.
> Automated tools (Cruise Control) would also have simpler life.
> Let me know what you think.
>
> Viktor
>
> On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com> wrote:
>
>> Hi Stan,
>>
>> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
>> on
>> a lot of partitions and the user runs a massive rebalance to change them
>> all to [3, 2, 1]. In the old behavior, I think that this would not do
>> anything but simply change the replica set in ZK.
>> Then, the user could run kafka-preferred-replica-election.sh on a given
>> set
>> of partitions to make sure the new leader 3 gets elected.
>>
>> I thought the old algorithm would elect 3 as the leader in this case
>> right away at the end but I have to double check this. In any case I think
>> it would make sense in the new algorithm if we elected the new preferred
>> leader right away, regardless of the new leader is chosen from the existing
>> replicas or not. If the whole reassignment is in fact just changing the
>> replica order then either way it is a simple (trivial) operation and doing
>> it batched wouldn't slow it down much as there is no data movement
>> involved. If the reassignment is mixed, meaning it contains reordering and
>> real movement as well then in fact it would help to even out the load
>> faster as data movements can get long. For instance in case of a
>> reassignment batch of two partitions concurrently: P1: (1,2,3) -> (3,2,1)
>> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader but
>> P1 wouldn't and it wouldn't help the goal of normalizing traffic on broker
>> 1 that much.
>> Again, I'll have to check how the current algorithm works and if it has
>> any unknown drawbacks to implement what I sketched up above.
>>
>> As for generic preferred leader election when called from the admin api
>> or the auto leader balance feature I think you're right that we should
>> leave it as it is. It doesn't involve any data movement so it's fairly fast
>> and it normalizes the cluster state quickly.
>>
>> Viktor
>>
>> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
>> stanis...@confluent.io> wrote:
>>
>>> Hey Viktor,
>>>
>>>  I think it is intuitive if there are on a global level...If we applied
>>> > them on every batch then we
>>> > couldn't really guarantee any limits as the user would be able to get
>>> > around it with submitting lots of reassignments.
>>>
>>>
>>> Agreed. Could we mention this explicitly in the KIP?
>>>
>>> Also if I understand correctly, AlterPartitionAssignmentsRequest would
>>> be a
>>> > partition level batching, isn't it? So if you submit 10 partitions at
>>> once
>>> > then they'll all be started by the controller immediately as per my
>>> > understanding.
>>>
>>>
>>> Yep, absolutely
>>>
>>> I've raised the ordering problem on the discussion of KIP-455 in a bit
>>> > different form and as I remember the verdict there was that we
>>> shouldn't
>>> > expose ordering as an API. It might not be easy as you say and there
>>> might
>>> > be much better strategies to follow (like disk or network utilization
>>> > goals). Therefore I'll remove this section from the KIP.
>>>
>>>
>>> Sounds good to me.
>>>
>>> I'm not sure I get this scenario. So are you saying that after they
>>> > submitted a reassignment they also submit a preferred leader change?
>>> > In my mind what I would do is:
>>> > i) make auto.leader.rebalance.enable to obey the leader movement limit
>>> as
>>> > this way it will be easier to calculate the reassignment batches.
>>> >
>>>
>>> Sorry, this is my fault -- I should have been more clear.
>>> First, I didn't think through this well enough at the time, I don't
>>> think.
>>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is
>>> obvious that a leader shift will happen. Your KIP proposes we shift
>>> replicas 1 and 4 first.
>>>
>>> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
>>> on
>>> a lot of partitions and the user runs a massive rebalance to change them
>>> all to [3, 2, 1]. In the old behavior, I think that this would not do
>>> anything but simply change the replica set in ZK.
>>> Then, the user could run kafka-preferred-replica-election.sh on a given
>>> set
>>> of partitions to make sure the new leader 3 gets elected.
>>>
>>> ii) the user could submit preferred leader election but it's basically a
>>> > form of reassignment so it would fall under the batching criterias. If
>>> the
>>> > batch they submit is smaller than the internal, then it'd be executed
>>> > immediately but otherwise it would be split into more batches. It
>>> might be
>>> > a different behavior as it may not be executed it in one batch but I
>>> think
>>> > it isn't a problem because we'll default to Int.MAX with the batches
>>> and
>>> > otherwise because since it's a form of reassignment I think it makes
>>> sense
>>> > to apply the same logic.
>>>
>>>
>>> The AdminAPI for that is
>>> "electPreferredLeaders(Collection<TopicPartition>
>>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
>>> "partition": 0}]}" so it is a bit less explicit than our other
>>> reassignment
>>> API, but the functionality is the same.
>>> You're 100% right that it is a form of reassignment, but I hadn't thought
>>> of it like that and I some other people wouldn't have either.
>>> If I understand correctly, you're suggesting that we count the
>>> "reassignment.parallel.leader.movements" config against such preferred
>>> elections. I think that makes sense. If we are to do that we should
>>> explicitly mention that this KIP touches the preferred election logic as
>>> well. Would that config also bound the number of leader movements the
>>> auto
>>> rebalance inside the broker would do as well?
>>>
>>> Then again, the "reassignment.parallel.leader.movements" config has a bit
>>> of a different meaning when used in the context of a real reassignment
>>> (where data moves from the leader to N more replicas) versus in the
>>> preferred election switch (where all we need is two lightweight
>>> LeaderAndIsr requests), so I am not entirely convinced we may want to use
>>> the same config for that.
>>> It might be easier to limit the scope of this KIP and not place a bound
>>> on
>>> preferred leader election.
>>>
>>> Thanks,
>>> Stanislav
>>>
>>>
>>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
>>> viktorsomo...@gmail.com>
>>> wrote:
>>>
>>> > Hey Stanislav,
>>> >
>>> > Thanks for the thorough look at the KIP! :)
>>> >
>>> > > Let me first explicitly confirm my understanding of the configs and
>>> the
>>> > > algorithm:
>>> > > * reassignment.parallel.replica.count - the maximum total number of
>>> > > replicas that we can move at once, *per partition*
>>> > > * reassignment.parallel.partition.count - the maximum number of
>>> > partitions
>>> > > we can move at once
>>> > > * reassignment.parallel.leader.movements - the maximum number of
>>> leader
>>> > > movements we can have at once
>>> >
>>> > Yes.
>>> >
>>> > > As far as I currently understand it, your proposed algorithm will
>>> > naturally
>>> > > prioritize leader movement first. e.g if
>>> > > reassignment.parallel.replica.count=1 and
>>> > >
>>> >
>>> >
>>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
>>> > > we will always move one, the first possible, replica in the replica
>>> set
>>> > > (which will be the leader if part of the excess replica set (ER)).
>>> > > Am I correct in saying that?
>>> >
>>> > Yes.
>>> >
>>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
>>> >
>>> > If you imply that it's not always an exact number (because the last
>>> batch
>>> > could be smaller) than I think it's a good idea. I don't mind this
>>> naming
>>> > or having max in it either.
>>> >
>>> > > 2. How does this KIP play along with KIP-455's notion of multiple
>>> > > rebalances - do the configs apply on a
>>> > > single AlterPartitionAssignmentsRequest or are they global?
>>> >
>>> > I think it is intuitive if there are on a global level and the user
>>> would
>>> > have the option to set KIP-435's settings to Int.MAX and thus
>>> eliminate any
>>> > batching and submit their own batches through
>>> > AlterPartitionAssignmentsRequest. If we applied them on every batch
>>> then we
>>> > couldn't really guarantee any limits as the user would be able to get
>>> > around it with submitting lots of reassignments. By default though we
>>> set
>>> > the batching limits to Int.MAX so effectively they're disabled.
>>> > Also if I understand correctly, AlterPartitionAssignmentsRequest would
>>> be a
>>> > partition level batching, isn't it? So if you submit 10 partitions at
>>> once
>>> > then they'll all be started by the controller immediately as per my
>>> > understanding.
>>> >
>>> > > 3. Unless I've missed it, the algorithm does not take into account
>>> > > `reassignment.parallel.leader.movements`
>>> >
>>> > Yea the reassignment step calculation doesn't contain that because it
>>> > describes only one partition's reassignment step calculation. We simply
>>> > fill our reassignment batch with as many leader movements as we can and
>>> > then fill the rest with reassignments which don't require leader
>>> movement
>>> > if we have space. I'll create a pseudo code block on the KIP for this.
>>> >
>>> > > 4. The KIP says that the order of the input has some control over
>>> how the
>>> > > batches are created - i.e it's deterministic. What would the batches
>>> of
>>> > the
>>> > > following reassignment look like:
>>> > > reassignment.parallel.replica.count=1
>>> > > reassignment.parallel.partition.count=MAX_INT
>>> > > reassignment.parallel.leader.movements=1
>>> > > partitionA - (0,1,2) -> (3, 4, 5)
>>> > > partitionB - (0,1,2) -> (3,4,5)
>>> > > partitionC - (0,1,2) -> (3, 4, 5)
>>> > > From my understanding, we would start with A(0->3), B(1->4) and
>>> C(1->4).
>>> > Is
>>> > > that correct? Would the second step then continue with B(0->3)?
>>> > >
>>> > > If the configurations are global, I can imagine we will have a bit
>>> more
>>> > > trouble in preserving the expected ordering, especially across
>>> controller
>>> > > failovers -- but I'll avoid speculating until you confirm the scope
>>> of
>>> > the
>>> > > config.
>>> >
>>> > I've raised the ordering problem on the discussion of KIP-455 in a bit
>>> > different form and as I remember the verdict there was that we
>>> shouldn't
>>> > expose ordering as an API. It might not be easy as you say and there
>>> might
>>> > be much better strategies to follow (like disk or network utilization
>>> > goals). Therefore I'll remove this section from the KIP. (Otherwise
>>> yes, I
>>> > would have thought of the same behavior what you described.)
>>> > Since #3 also was about the pseudo code, it would be something like
>>> this:
>>> > Config:
>>> > reassignment.parallel.replica.count=R
>>> > reassignment.parallel.partition.count=P
>>> > reassignment.parallel.leader.movements=L
>>> > Code:
>>> > val batchSize = max(L,P)
>>> > // split the individual partition reassignments whether they involve
>>> leader
>>> > movement or not
>>> > val partitionMovements =
>>> >
>>> >
>>> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
>>> > // fill the batch with as much leader movements as possible and take
>>> the
>>> > rest from other reassignments
>>> > val currentBatch = if (partitionMovements.leaderMovements.size <
>>> batchSize)
>>> >   partitionMovements.leaderMovements ++
>>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
>>> > partitionMovements.leaderMovements.size)
>>> > else
>>> >  partitionMovements.leaderMovements.take(batchSize)
>>> > executeReassignmentOnBatch(currentBatch)
>>> >
>>> > > 5. Regarding the new behavior of electing the new preferred leader
>>> in the
>>> > > "first step" of the reassignment - does this obey the
>>> > > `auto.leader.rebalance.enable` config?
>>> > > If not, I have concerns regarding how backwards compatible this
>>> might be
>>> > -
>>> > > e.g imagine a user does a huge reassignment (as they have always
>>> done)
>>> > and
>>> > > suddenly a huge leader shift happens, whereas the user expected to
>>> > manually
>>> > > shift preferred leaders at a slower rate via
>>> > > the kafka-preferred-replica-election.sh tool.
>>> >
>>> > I'm not sure I get this scenario. So are you saying that after they
>>> > submitted a reassignment they also submit a preferred leader change?
>>> > In my mind what I would do is:
>>> > i) make auto.leader.rebalance.enable to obey the leader movement limit
>>> as
>>> > this way it will be easier to calculate the reassignment batches.
>>> > ii) the user could submit preferred leader election but it's basically
>>> a
>>> > form of reassignment so it would fall under the batching criterias. If
>>> the
>>> > batch they submit is smaller than the internal, then it'd be executed
>>> > immediately but otherwise it would be split into more batches. It
>>> might be
>>> > a different behavior as it may not be executed it in one batch but I
>>> think
>>> > it isn't a problem because we'll default to Int.MAX with the batches
>>> and
>>> > otherwise because since it's a form of reassignment I think it makes
>>> sense
>>> > to apply the same logic.
>>> >
>>> > > 6. What is the expected behavior if we dynamically change one of the
>>> > > configs to a lower value while a reassignment is happening. Would we
>>> > cancel
>>> > > some of the currently reassigned partitions or would we account for
>>> the
>>> > new
>>> > > values on the next reassignment? I assume the latter but it's good
>>> to be
>>> > > explicit
>>> >
>>> > Yep, it would be the latter. I'll make this explicit in the KIP too.
>>> >
>>> > About the nits:
>>> > Noted, will update the KIP to reflect your suggestions :)
>>> >
>>> > Viktor
>>> >
>>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
>>> > stanis...@confluent.io>
>>> > wrote:
>>> > >
>>> > > Hey there Viktor,
>>> > >
>>> > > Thanks for working on this KIP! I agree with the notion that
>>> reliability,
>>> > > stability and predictability of a reassignment should be a core
>>> feature
>>> > of
>>> > > Kafka.
>>> > >
>>> > > Let me first explicitly confirm my understanding of the configs and
>>> the
>>> > > algorithm:
>>> > > * reassignment.parallel.replica.count - the maximum total number of
>>> > > replicas that we can move at once, *per partition*
>>> > > * reassignment.parallel.partition.count - the maximum number of
>>> > partitions
>>> > > we can move at once
>>> > > * reassignment.parallel.leader.movements - the maximum number of
>>> leader
>>> > > movements we can have at once
>>> > >
>>> > > As far as I currently understand it, your proposed algorithm will
>>> > naturally
>>> > > prioritize leader movement first. e.g if
>>> > > reassignment.parallel.replica.count=1 and
>>> > >
>>> >
>>> >
>>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
>>> > > we will always move one, the first possible, replica in the replica
>>> set
>>> > > (which will be the leader if part of the excess replica set (ER)).
>>> > > Am I correct in saying that?
>>> > >
>>> > > Regarding the KIP, I've got a couple of comments/questions::
>>> > >
>>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
>>> > >
>>> > > 2. How does this KIP play along with KIP-455's notion of multiple
>>> > > rebalances - do the configs apply on a
>>> > > single AlterPartitionAssignmentsRequest or are they global?
>>> > >
>>> > > 3. Unless I've missed it, the algorithm does not take into account
>>> > > `reassignment.parallel.leader.movements`
>>> > >
>>> > > 4. The KIP says that the order of the input has some control over
>>> how the
>>> > > batches are created - i.e it's deterministic. What would the batches
>>> of
>>> > the
>>> > > following reassignment look like:
>>> > > reassignment.parallel.replica.count=1
>>> > > reassignment.parallel.partition.count=MAX_INT
>>> > > reassignment.parallel.leader.movements=1
>>> > > partitionA - (0,1,2) -> (3, 4, 5)
>>> > > partitionB - (0,1,2) -> (3,4,5)
>>> > > partitionC - (0,1,2) -> (3, 4, 5)
>>> > > From my understanding, we would start with A(0->3), B(1->4) and
>>> C(1->4).
>>> > Is
>>> > > that correct? Would the second step then continue with B(0->3)?
>>> > >
>>> > > If the configurations are global, I can imagine we will have a bit
>>> more
>>> > > trouble in preserving the expected ordering, especially across
>>> controller
>>> > > failovers -- but I'll avoid speculating until you confirm the scope
>>> of
>>> > the
>>> > > config.
>>> > >
>>> > > 5. Regarding the new behavior of electing the new preferred leader
>>> in the
>>> > > "first step" of the reassignment - does this obey the
>>> > > `auto.leader.rebalance.enable` config?
>>> > > If not, I have concerns regarding how backwards compatible this
>>> might be
>>> > -
>>> > > e.g imagine a user does a huge reassignment (as they have always
>>> done)
>>> > and
>>> > > suddenly a huge leader shift happens, whereas the user expected to
>>> > manually
>>> > > shift preferred leaders at a slower rate via
>>> > > the kafka-preferred-replica-election.sh tool.
>>> > >
>>> > > 6. What is the expected behavior if we dynamically change one of the
>>> > > configs to a lower value while a reassignment is happening. Would we
>>> > cancel
>>> > > some of the currently reassigned partitions or would we account for
>>> the
>>> > new
>>> > > values on the next reassignment? I assume the latter but it's good
>>> to be
>>> > > explicit
>>> > >
>>> > > As some small nits:
>>> > > - could we have a section in the KIP where we explicitly define what
>>> each
>>> > > config does? This can be inferred from the KIP as is but requires
>>> careful
>>> > > reading, whereas some developers might want to skim through the
>>> change to
>>> > > get a quick sense. It also improves readability but that's my
>>> personal
>>> > > opinion.
>>> > > - could you better clarify how a reassignment step is different from
>>> the
>>> > > currently existing algorithm? maybe laying both algorithms out in
>>> the KIP
>>> > > would be most clear
>>> > > - the names for the OngoingPartitionReassignment and
>>> > > CurrentPartitionReassignment fields in the
>>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
>>> > > Unfortunately, I don't have a better suggestion, but maybe somebody
>>> else
>>> > in
>>> > > the community has.
>>> > >
>>> > > Thanks,
>>> > > Stanislav
>>> > >
>>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
>>> > viktorsomo...@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi All,
>>> > > >
>>> > > > I've renamed my KIP as its name was a bit confusing so we'll
>>> continue
>>> > it in
>>> > > > this thread.
>>> > > > The previous thread for record:
>>> > > >
>>> > > >
>>> >
>>> >
>>> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
>>> > > >
>>> > > > A short summary of the KIP:
>>> > > > In case of a vast partition reassignment (thousands of partitions
>>> at
>>> > once)
>>> > > > Kafka can collapse under the increased replication traffic. This
>>> KIP
>>> > will
>>> > > > mitigate it by introducing internal batching done by the
>>> controller.
>>> > > > Besides putting a bandwidth limit on the replication it is useful
>>> to
>>> > batch
>>> > > > partition movements as fewer number of partitions will use the
>>> > available
>>> > > > bandwidth for reassignment and they finish faster.
>>> > > > The main control handles are:
>>> > > > - the number of parallel leader movements,
>>> > > > - the number of parallel partition movements
>>> > > > - and the number of parallel replica movements.
>>> > > >
>>> > > > Thank you for the feedback and ideas so far in the previous thread
>>> and
>>> > I'm
>>> > > > happy to receive more.
>>> > > >
>>> > > > Regards,
>>> > > > Viktor
>>> > > >
>>> > >
>>> > >
>>> > > --
>>> > > Best,
>>> > > Stanislav
>>> >
>>>
>>>
>>> --
>>> Best,
>>> Stanislav
>>>
>>

Reply via email to