Hey Folks, I think I'll open a vote early next week about this if there are no more feedback.
Thanks, Viktor On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <viktorsomo...@gmail.com> wrote: > Hey Stanislav, > > I reiterated on the current algorithm and indeed it would change the order > of replicas in ZK but wouldn't do a leader election, so one would need to > run the preferred replica election tool. However still in the light of this > I'm not sure I'd keep the existing behavior as users wouldn't win anything > with it. Changing leadership automatically would result in a simpler, > easier and more responsive reassign algorithm which is especially important > if the reassignment is done to free up the broker from under heavy load. > Automated tools (Cruise Control) would also have simpler life. > Let me know what you think. > > Viktor > > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass < > viktorsomo...@gmail.com> wrote: > >> Hi Stan, >> >> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3] >> on >> a lot of partitions and the user runs a massive rebalance to change them >> all to [3, 2, 1]. In the old behavior, I think that this would not do >> anything but simply change the replica set in ZK. >> Then, the user could run kafka-preferred-replica-election.sh on a given >> set >> of partitions to make sure the new leader 3 gets elected. >> >> I thought the old algorithm would elect 3 as the leader in this case >> right away at the end but I have to double check this. In any case I think >> it would make sense in the new algorithm if we elected the new preferred >> leader right away, regardless of the new leader is chosen from the existing >> replicas or not. If the whole reassignment is in fact just changing the >> replica order then either way it is a simple (trivial) operation and doing >> it batched wouldn't slow it down much as there is no data movement >> involved. If the reassignment is mixed, meaning it contains reordering and >> real movement as well then in fact it would help to even out the load >> faster as data movements can get long. For instance in case of a >> reassignment batch of two partitions concurrently: P1: (1,2,3) -> (3,2,1) >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader but >> P1 wouldn't and it wouldn't help the goal of normalizing traffic on broker >> 1 that much. >> Again, I'll have to check how the current algorithm works and if it has >> any unknown drawbacks to implement what I sketched up above. >> >> As for generic preferred leader election when called from the admin api >> or the auto leader balance feature I think you're right that we should >> leave it as it is. It doesn't involve any data movement so it's fairly fast >> and it normalizes the cluster state quickly. >> >> Viktor >> >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski < >> stanis...@confluent.io> wrote: >> >>> Hey Viktor, >>> >>> I think it is intuitive if there are on a global level...If we applied >>> > them on every batch then we >>> > couldn't really guarantee any limits as the user would be able to get >>> > around it with submitting lots of reassignments. >>> >>> >>> Agreed. Could we mention this explicitly in the KIP? >>> >>> Also if I understand correctly, AlterPartitionAssignmentsRequest would >>> be a >>> > partition level batching, isn't it? So if you submit 10 partitions at >>> once >>> > then they'll all be started by the controller immediately as per my >>> > understanding. >>> >>> >>> Yep, absolutely >>> >>> I've raised the ordering problem on the discussion of KIP-455 in a bit >>> > different form and as I remember the verdict there was that we >>> shouldn't >>> > expose ordering as an API. It might not be easy as you say and there >>> might >>> > be much better strategies to follow (like disk or network utilization >>> > goals). Therefore I'll remove this section from the KIP. >>> >>> >>> Sounds good to me. >>> >>> I'm not sure I get this scenario. So are you saying that after they >>> > submitted a reassignment they also submit a preferred leader change? >>> > In my mind what I would do is: >>> > i) make auto.leader.rebalance.enable to obey the leader movement limit >>> as >>> > this way it will be easier to calculate the reassignment batches. >>> > >>> >>> Sorry, this is my fault -- I should have been more clear. >>> First, I didn't think through this well enough at the time, I don't >>> think. >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is >>> obvious that a leader shift will happen. Your KIP proposes we shift >>> replicas 1 and 4 first. >>> >>> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3] >>> on >>> a lot of partitions and the user runs a massive rebalance to change them >>> all to [3, 2, 1]. In the old behavior, I think that this would not do >>> anything but simply change the replica set in ZK. >>> Then, the user could run kafka-preferred-replica-election.sh on a given >>> set >>> of partitions to make sure the new leader 3 gets elected. >>> >>> ii) the user could submit preferred leader election but it's basically a >>> > form of reassignment so it would fall under the batching criterias. If >>> the >>> > batch they submit is smaller than the internal, then it'd be executed >>> > immediately but otherwise it would be split into more batches. It >>> might be >>> > a different behavior as it may not be executed it in one batch but I >>> think >>> > it isn't a problem because we'll default to Int.MAX with the batches >>> and >>> > otherwise because since it's a form of reassignment I think it makes >>> sense >>> > to apply the same logic. >>> >>> >>> The AdminAPI for that is >>> "electPreferredLeaders(Collection<TopicPartition> >>> partitions)" and the old zNode is "{"partitions": [{"topic": "a", >>> "partition": 0}]}" so it is a bit less explicit than our other >>> reassignment >>> API, but the functionality is the same. >>> You're 100% right that it is a form of reassignment, but I hadn't thought >>> of it like that and I some other people wouldn't have either. >>> If I understand correctly, you're suggesting that we count the >>> "reassignment.parallel.leader.movements" config against such preferred >>> elections. I think that makes sense. If we are to do that we should >>> explicitly mention that this KIP touches the preferred election logic as >>> well. Would that config also bound the number of leader movements the >>> auto >>> rebalance inside the broker would do as well? >>> >>> Then again, the "reassignment.parallel.leader.movements" config has a bit >>> of a different meaning when used in the context of a real reassignment >>> (where data moves from the leader to N more replicas) versus in the >>> preferred election switch (where all we need is two lightweight >>> LeaderAndIsr requests), so I am not entirely convinced we may want to use >>> the same config for that. >>> It might be easier to limit the scope of this KIP and not place a bound >>> on >>> preferred leader election. >>> >>> Thanks, >>> Stanislav >>> >>> >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass < >>> viktorsomo...@gmail.com> >>> wrote: >>> >>> > Hey Stanislav, >>> > >>> > Thanks for the thorough look at the KIP! :) >>> > >>> > > Let me first explicitly confirm my understanding of the configs and >>> the >>> > > algorithm: >>> > > * reassignment.parallel.replica.count - the maximum total number of >>> > > replicas that we can move at once, *per partition* >>> > > * reassignment.parallel.partition.count - the maximum number of >>> > partitions >>> > > we can move at once >>> > > * reassignment.parallel.leader.movements - the maximum number of >>> leader >>> > > movements we can have at once >>> > >>> > Yes. >>> > >>> > > As far as I currently understand it, your proposed algorithm will >>> > naturally >>> > > prioritize leader movement first. e.g if >>> > > reassignment.parallel.replica.count=1 and >>> > > >>> > >>> > >>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements, >>> > > we will always move one, the first possible, replica in the replica >>> set >>> > > (which will be the leader if part of the excess replica set (ER)). >>> > > Am I correct in saying that? >>> > >>> > Yes. >>> > >>> > > 1. Does it make sense to add `max` somewhere in the configs' names? >>> > >>> > If you imply that it's not always an exact number (because the last >>> batch >>> > could be smaller) than I think it's a good idea. I don't mind this >>> naming >>> > or having max in it either. >>> > >>> > > 2. How does this KIP play along with KIP-455's notion of multiple >>> > > rebalances - do the configs apply on a >>> > > single AlterPartitionAssignmentsRequest or are they global? >>> > >>> > I think it is intuitive if there are on a global level and the user >>> would >>> > have the option to set KIP-435's settings to Int.MAX and thus >>> eliminate any >>> > batching and submit their own batches through >>> > AlterPartitionAssignmentsRequest. If we applied them on every batch >>> then we >>> > couldn't really guarantee any limits as the user would be able to get >>> > around it with submitting lots of reassignments. By default though we >>> set >>> > the batching limits to Int.MAX so effectively they're disabled. >>> > Also if I understand correctly, AlterPartitionAssignmentsRequest would >>> be a >>> > partition level batching, isn't it? So if you submit 10 partitions at >>> once >>> > then they'll all be started by the controller immediately as per my >>> > understanding. >>> > >>> > > 3. Unless I've missed it, the algorithm does not take into account >>> > > `reassignment.parallel.leader.movements` >>> > >>> > Yea the reassignment step calculation doesn't contain that because it >>> > describes only one partition's reassignment step calculation. We simply >>> > fill our reassignment batch with as many leader movements as we can and >>> > then fill the rest with reassignments which don't require leader >>> movement >>> > if we have space. I'll create a pseudo code block on the KIP for this. >>> > >>> > > 4. The KIP says that the order of the input has some control over >>> how the >>> > > batches are created - i.e it's deterministic. What would the batches >>> of >>> > the >>> > > following reassignment look like: >>> > > reassignment.parallel.replica.count=1 >>> > > reassignment.parallel.partition.count=MAX_INT >>> > > reassignment.parallel.leader.movements=1 >>> > > partitionA - (0,1,2) -> (3, 4, 5) >>> > > partitionB - (0,1,2) -> (3,4,5) >>> > > partitionC - (0,1,2) -> (3, 4, 5) >>> > > From my understanding, we would start with A(0->3), B(1->4) and >>> C(1->4). >>> > Is >>> > > that correct? Would the second step then continue with B(0->3)? >>> > > >>> > > If the configurations are global, I can imagine we will have a bit >>> more >>> > > trouble in preserving the expected ordering, especially across >>> controller >>> > > failovers -- but I'll avoid speculating until you confirm the scope >>> of >>> > the >>> > > config. >>> > >>> > I've raised the ordering problem on the discussion of KIP-455 in a bit >>> > different form and as I remember the verdict there was that we >>> shouldn't >>> > expose ordering as an API. It might not be easy as you say and there >>> might >>> > be much better strategies to follow (like disk or network utilization >>> > goals). Therefore I'll remove this section from the KIP. (Otherwise >>> yes, I >>> > would have thought of the same behavior what you described.) >>> > Since #3 also was about the pseudo code, it would be something like >>> this: >>> > Config: >>> > reassignment.parallel.replica.count=R >>> > reassignment.parallel.partition.count=P >>> > reassignment.parallel.leader.movements=L >>> > Code: >>> > val batchSize = max(L,P) >>> > // split the individual partition reassignments whether they involve >>> leader >>> > movement or not >>> > val partitionMovements = >>> > >>> > >>> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment) >>> > // fill the batch with as much leader movements as possible and take >>> the >>> > rest from other reassignments >>> > val currentBatch = if (partitionMovements.leaderMovements.size < >>> batchSize) >>> > partitionMovements.leaderMovements ++ >>> > partitionsToReassign.otherPartitionMovements.take(batchSize - >>> > partitionMovements.leaderMovements.size) >>> > else >>> > partitionMovements.leaderMovements.take(batchSize) >>> > executeReassignmentOnBatch(currentBatch) >>> > >>> > > 5. Regarding the new behavior of electing the new preferred leader >>> in the >>> > > "first step" of the reassignment - does this obey the >>> > > `auto.leader.rebalance.enable` config? >>> > > If not, I have concerns regarding how backwards compatible this >>> might be >>> > - >>> > > e.g imagine a user does a huge reassignment (as they have always >>> done) >>> > and >>> > > suddenly a huge leader shift happens, whereas the user expected to >>> > manually >>> > > shift preferred leaders at a slower rate via >>> > > the kafka-preferred-replica-election.sh tool. >>> > >>> > I'm not sure I get this scenario. So are you saying that after they >>> > submitted a reassignment they also submit a preferred leader change? >>> > In my mind what I would do is: >>> > i) make auto.leader.rebalance.enable to obey the leader movement limit >>> as >>> > this way it will be easier to calculate the reassignment batches. >>> > ii) the user could submit preferred leader election but it's basically >>> a >>> > form of reassignment so it would fall under the batching criterias. If >>> the >>> > batch they submit is smaller than the internal, then it'd be executed >>> > immediately but otherwise it would be split into more batches. It >>> might be >>> > a different behavior as it may not be executed it in one batch but I >>> think >>> > it isn't a problem because we'll default to Int.MAX with the batches >>> and >>> > otherwise because since it's a form of reassignment I think it makes >>> sense >>> > to apply the same logic. >>> > >>> > > 6. What is the expected behavior if we dynamically change one of the >>> > > configs to a lower value while a reassignment is happening. Would we >>> > cancel >>> > > some of the currently reassigned partitions or would we account for >>> the >>> > new >>> > > values on the next reassignment? I assume the latter but it's good >>> to be >>> > > explicit >>> > >>> > Yep, it would be the latter. I'll make this explicit in the KIP too. >>> > >>> > About the nits: >>> > Noted, will update the KIP to reflect your suggestions :) >>> > >>> > Viktor >>> > >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski < >>> > stanis...@confluent.io> >>> > wrote: >>> > > >>> > > Hey there Viktor, >>> > > >>> > > Thanks for working on this KIP! I agree with the notion that >>> reliability, >>> > > stability and predictability of a reassignment should be a core >>> feature >>> > of >>> > > Kafka. >>> > > >>> > > Let me first explicitly confirm my understanding of the configs and >>> the >>> > > algorithm: >>> > > * reassignment.parallel.replica.count - the maximum total number of >>> > > replicas that we can move at once, *per partition* >>> > > * reassignment.parallel.partition.count - the maximum number of >>> > partitions >>> > > we can move at once >>> > > * reassignment.parallel.leader.movements - the maximum number of >>> leader >>> > > movements we can have at once >>> > > >>> > > As far as I currently understand it, your proposed algorithm will >>> > naturally >>> > > prioritize leader movement first. e.g if >>> > > reassignment.parallel.replica.count=1 and >>> > > >>> > >>> > >>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements, >>> > > we will always move one, the first possible, replica in the replica >>> set >>> > > (which will be the leader if part of the excess replica set (ER)). >>> > > Am I correct in saying that? >>> > > >>> > > Regarding the KIP, I've got a couple of comments/questions:: >>> > > >>> > > 1. Does it make sense to add `max` somewhere in the configs' names? >>> > > >>> > > 2. How does this KIP play along with KIP-455's notion of multiple >>> > > rebalances - do the configs apply on a >>> > > single AlterPartitionAssignmentsRequest or are they global? >>> > > >>> > > 3. Unless I've missed it, the algorithm does not take into account >>> > > `reassignment.parallel.leader.movements` >>> > > >>> > > 4. The KIP says that the order of the input has some control over >>> how the >>> > > batches are created - i.e it's deterministic. What would the batches >>> of >>> > the >>> > > following reassignment look like: >>> > > reassignment.parallel.replica.count=1 >>> > > reassignment.parallel.partition.count=MAX_INT >>> > > reassignment.parallel.leader.movements=1 >>> > > partitionA - (0,1,2) -> (3, 4, 5) >>> > > partitionB - (0,1,2) -> (3,4,5) >>> > > partitionC - (0,1,2) -> (3, 4, 5) >>> > > From my understanding, we would start with A(0->3), B(1->4) and >>> C(1->4). >>> > Is >>> > > that correct? Would the second step then continue with B(0->3)? >>> > > >>> > > If the configurations are global, I can imagine we will have a bit >>> more >>> > > trouble in preserving the expected ordering, especially across >>> controller >>> > > failovers -- but I'll avoid speculating until you confirm the scope >>> of >>> > the >>> > > config. >>> > > >>> > > 5. Regarding the new behavior of electing the new preferred leader >>> in the >>> > > "first step" of the reassignment - does this obey the >>> > > `auto.leader.rebalance.enable` config? >>> > > If not, I have concerns regarding how backwards compatible this >>> might be >>> > - >>> > > e.g imagine a user does a huge reassignment (as they have always >>> done) >>> > and >>> > > suddenly a huge leader shift happens, whereas the user expected to >>> > manually >>> > > shift preferred leaders at a slower rate via >>> > > the kafka-preferred-replica-election.sh tool. >>> > > >>> > > 6. What is the expected behavior if we dynamically change one of the >>> > > configs to a lower value while a reassignment is happening. Would we >>> > cancel >>> > > some of the currently reassigned partitions or would we account for >>> the >>> > new >>> > > values on the next reassignment? I assume the latter but it's good >>> to be >>> > > explicit >>> > > >>> > > As some small nits: >>> > > - could we have a section in the KIP where we explicitly define what >>> each >>> > > config does? This can be inferred from the KIP as is but requires >>> careful >>> > > reading, whereas some developers might want to skim through the >>> change to >>> > > get a quick sense. It also improves readability but that's my >>> personal >>> > > opinion. >>> > > - could you better clarify how a reassignment step is different from >>> the >>> > > currently existing algorithm? maybe laying both algorithms out in >>> the KIP >>> > > would be most clear >>> > > - the names for the OngoingPartitionReassignment and >>> > > CurrentPartitionReassignment fields in the >>> > > ListPartitionReassignmentsResponse are a bit confusing to me. >>> > > Unfortunately, I don't have a better suggestion, but maybe somebody >>> else >>> > in >>> > > the community has. >>> > > >>> > > Thanks, >>> > > Stanislav >>> > > >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass < >>> > viktorsomo...@gmail.com> >>> > > wrote: >>> > > >>> > > > Hi All, >>> > > > >>> > > > I've renamed my KIP as its name was a bit confusing so we'll >>> continue >>> > it in >>> > > > this thread. >>> > > > The previous thread for record: >>> > > > >>> > > > >>> > >>> > >>> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E >>> > > > >>> > > > A short summary of the KIP: >>> > > > In case of a vast partition reassignment (thousands of partitions >>> at >>> > once) >>> > > > Kafka can collapse under the increased replication traffic. This >>> KIP >>> > will >>> > > > mitigate it by introducing internal batching done by the >>> controller. >>> > > > Besides putting a bandwidth limit on the replication it is useful >>> to >>> > batch >>> > > > partition movements as fewer number of partitions will use the >>> > available >>> > > > bandwidth for reassignment and they finish faster. >>> > > > The main control handles are: >>> > > > - the number of parallel leader movements, >>> > > > - the number of parallel partition movements >>> > > > - and the number of parallel replica movements. >>> > > > >>> > > > Thank you for the feedback and ideas so far in the previous thread >>> and >>> > I'm >>> > > > happy to receive more. >>> > > > >>> > > > Regards, >>> > > > Viktor >>> > > > >>> > > >>> > > >>> > > -- >>> > > Best, >>> > > Stanislav >>> > >>> >>> >>> -- >>> Best, >>> Stanislav >>> >>