Andrew Grant created KAFKA-14437:
------------------------------------
Summary: Enhance StripedReplicaPlacer to account for existing
partition assignments
Key: KAFKA-14437
URL: https://issues.apache.org/jira/browse/KAFKA-14437
Project: Kafka
Issue Type: Improvement
Reporter: Andrew Grant
Currently, in StripedReplicaPlacer we don’t take existing partition assignments
into consideration when the place method is called. This means for new
partitions added, they may get the same assignments as existing partitions.
This differs from AdminUtils, which has some logic to try and shift where in
the list of brokers we start making assignments from for new partitions added.
For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4,
StripedReplicaPlacer does not take into account P0 and P1. It creates a random
rack offset and a random broker offset. So it could easily create the same
assignment for P3 and P4 that it created for P0 and P1. This is easily
reproduced in a unit test.
My suggestion is to enhance StripedReplicaPlacer to account for existing
partition assignments. Intuitively, we’d like to make assignments for added
partitions from “where we left off” when we were making the previous
assignments. In practice, its not possible to know exactly what the state was
during the previous partition assignments because, for example, brokers fencing
state may have changed. But I do think we can make a best effort attempt to do
so that is optimized for the common case where most brokers are unfenced. Note,
all the changes suggested below only will affect StripedReplicaPlacer when
place is called and there are existing partition assignments, which happens
when its servicing CreatePartitions requests. If there are no existing
partition assignments, which happens during CreateTopics, the logic is
unchanged.
First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
/**
* Get an iterator through the usable brokers.
*/
Iterator<UsableBroker> usableBrokers();
List<List<Integer>> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will
enable StripedReplicaPlacer to know about existing partition assignments when
they exist.
When place is called, some initialization is done in both RackList and
BrokerList. One thing that is initialized is the offset variable - this is a
variable used in both RackList and BrokerList that determines where in the list
of either racks or brokers respectively we should start from when making the
next assignment. Currently, it is initialized to a random value, based off the
size of the list.
I suggest we add some logic during initialization that sets the offset for both
RackList and BrokerList to a value based off the previous assignments.
Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
P0: 6, 8, 2
P1: 9, 3, 7
{code}
Lets imagine a user wants to create a new partition, called P3.
First, we need to determine which rack to start from for P3: this corresponds
to the initial offset in RackList. We can look at the leader of P1 (not P0
because P1 is the “last” partition we made an assignment for) and see its on
rack 3. So, the next rack we should start from should be rack 1. This means we
set offset in RackList to 0, instead of a random value, during initialization.
Second, we need to determine which broker to start from {_}per rack{_}: this
corresponds to the initial offset in BrokerList. We can look at all the
existing partition assignments, P0 and P1 in our example, and _per rack_ infer
the last offset started from during previous assignments. For each rack, we do
this by iterating through each partition, in reverse order because we care
about the most recent starting position, and try to find the first broker in
the assignment. This enables us to know where we last started from when making
an assignment for that rack, which can be used to determine where to continue
on from.
So in our example, for rack 1 we can see the last broker we started from was
broker 3 in P1: so the next broker we should choose for that rack should be 0
which means the initial offset is set to 0 in the BrokerList for rack 1 during
initialization. For rack 2 we can see the last broker we started with was
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in
the BrokerList for rack 2. For rack 3 we can see the last broker we started
with was was broker 9 in P1: so the next broker should be 10 which means the
offset is 2 in the BrokerList for rack 3.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)