This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c4472d701a KAFKA-14867: Trigger rebalance when replica racks change
if client.rack is configured (KIP-881) (#13474)
3c4472d701a is described below
commit 3c4472d701a7e9d9b8714a0b9d87ae190d1679fb
Author: Rajini Sivaram <[email protected]>
AuthorDate: Fri Mar 31 15:01:07 2023 +0100
KAFKA-14867: Trigger rebalance when replica racks change if client.rack is
configured (KIP-881) (#13474)
When `client.rack` is configured for consumers, we perform rack-aware
consumer partition assignment to improve locality. After/during reassignments,
replica racks may change, so to ensure optimal consumer assignment, trigger
rebalance from the leader when set of racks of any partition changes.
Reviewers: David Jacot <[email protected]>
---
.../consumer/internals/ConsumerCoordinator.java | 58 +++++--
.../internals/ConsumerCoordinatorTest.java | 171 ++++++++++++++++++---
.../server/FetchFromFollowerIntegrationTest.scala | 26 +++-
3 files changed, 224 insertions(+), 31 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fec31fe80f8..1f6c5ef0d75 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -35,6 +36,7 @@ import
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparato
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -174,7 +176,8 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
- this.metadataSnapshot = new MetadataSnapshot(subscriptions,
metadata.fetch(), metadata.updateVersion());
+ this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() :
Optional.of(rackId);
+ this.metadataSnapshot = new MetadataSnapshot(this.rackId,
subscriptions, metadata.fetch(), metadata.updateVersion());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
this.autoCommitEnabled = autoCommitEnabled;
@@ -188,7 +191,6 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
JoinGroupRequest.UNKNOWN_GENERATION_ID,
JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId);
this.throwOnFetchStableOffsetsUnsupported =
throwOnFetchStableOffsetsUnsupported;
- this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() :
Optional.of(rackId);
if (autoCommitEnabled)
this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
@@ -489,7 +491,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// Update the current snapshot, which will be used to check for
subscription
// changes that would require a rebalance (e.g. new partitions).
- metadataSnapshot = new MetadataSnapshot(subscriptions, cluster,
version);
+ metadataSnapshot = new MetadataSnapshot(rackId, subscriptions,
cluster, version);
}
}
@@ -1613,14 +1615,18 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
private static class MetadataSnapshot {
private final int version;
- private final Map<String, Integer> partitionsPerTopic;
+ private final Map<String, List<PartitionRackInfo>> partitionsPerTopic;
- private MetadataSnapshot(SubscriptionState subscription, Cluster
cluster, int version) {
- Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ private MetadataSnapshot(Optional<String> clientRack,
SubscriptionState subscription, Cluster cluster, int version) {
+ Map<String, List<PartitionRackInfo>> partitionsPerTopic = new
HashMap<>();
for (String topic : subscription.metadataTopics()) {
- Integer numPartitions = cluster.partitionCountForTopic(topic);
- if (numPartitions != null)
- partitionsPerTopic.put(topic, numPartitions);
+ List<PartitionInfo> partitions =
cluster.partitionsForTopic(topic);
+ if (partitions != null) {
+ List<PartitionRackInfo> partitionRacks =
partitions.stream()
+ .map(p -> new PartitionRackInfo(clientRack, p))
+ .collect(Collectors.toList());
+ partitionsPerTopic.put(topic, partitionRacks);
+ }
}
this.partitionsPerTopic = partitionsPerTopic;
this.version = version;
@@ -1636,6 +1642,40 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
}
}
+ private static class PartitionRackInfo {
+ private final Set<String> racks;
+
+ PartitionRackInfo(Optional<String> clientRack, PartitionInfo
partition) {
+ if (clientRack.isPresent() && partition.replicas() != null) {
+ racks =
Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet());
+ } else {
+ racks = Collections.emptySet();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PartitionRackInfo)) {
+ return false;
+ }
+ PartitionRackInfo rackInfo = (PartitionRackInfo) o;
+ return Objects.equals(racks, rackInfo.racks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(racks);
+ }
+
+ @Override
+ public String toString() {
+ return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks;
+ }
+ }
+
private static class OffsetCommitCompletion {
private final OffsetCommitCallback callback;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0f1256e1689..f3c8026dff3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
@@ -68,6 +69,7 @@ import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -108,6 +110,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
@@ -1448,12 +1451,106 @@ public abstract class ConsumerCoordinatorTest {
*/
@Test
public void testRebalanceWithMetadataChange() {
+ MetadataResponse metadataResponse1 =
RequestTestUtils.metadataUpdateWith(1,
+ Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2,
1)));
+ MetadataResponse metadataResponse2 =
RequestTestUtils.metadataUpdateWith(1, singletonMap(topic1, 1));
+ verifyRebalanceWithMetadataChange(Optional.empty(), partitionAssignor,
metadataResponse1, metadataResponse2, true);
+ }
+
+ @Test
+ public void testRackAwareConsumerRebalanceWithDifferentRacks() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(0, 2), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ true, true);
+ }
+
+ @Test
+ public void testNonRackAwareConsumerRebalanceWithDifferentRacks() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(0, 2), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ false, false);
+ }
+
+ @Test
+ public void testRackAwareConsumerRebalanceWithAdditionalRacks() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ true, true);
+ }
+
+ @Test
+ public void testRackAwareConsumerRebalanceWithLessRacks() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Collections.singletonList(2)),
+ true, true);
+ }
+
+ @Test
+ public void testRackAwareConsumerRebalanceWithNewPartitions() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0), Arrays.asList(0, 1)),
+ true, true);
+ }
+
+ @Test
+ public void testRackAwareConsumerRebalanceWithNoMetadataChange() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ true, false);
+ }
+
+ @Test
+ public void testRackAwareConsumerRebalanceWithNoRackChange() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(3, 4), Arrays.asList(4, 5),
Arrays.asList(5, 3)),
+ true, false);
+ }
+
+ @Test
+ public void testRackAwareConsumerRebalanceWithNewReplicasOnSameRacks() {
+ verifyRackAwareConsumerRebalance(
+ Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2),
Arrays.asList(2, 0)),
+ Arrays.asList(Arrays.asList(0, 1, 3), Arrays.asList(1, 2, 5),
Arrays.asList(2, 0, 3)),
+ true, false);
+ }
+
+ private void verifyRackAwareConsumerRebalance(List<List<Integer>>
partitionReplicas1,
+ List<List<Integer>>
partitionReplicas2,
+ boolean rackAwareConsumer,
+ boolean expectRebalance) {
+ List<String> racks = Arrays.asList("rack-a", "rack-b", "rack-c");
+ MockPartitionAssignor assignor = partitionAssignor;
+ String consumerRackId = null;
+ if (rackAwareConsumer) {
+ consumerRackId = racks.get(0);
+ assignor = new RackAwareAssignor(protocol);
+ createRackAwareCoordinator(consumerRackId, assignor);
+ }
+
+ MetadataResponse metadataResponse1 = rackAwareMetadata(6, racks,
Collections.singletonMap(topic1, partitionReplicas1));
+ MetadataResponse metadataResponse2 = rackAwareMetadata(6, racks,
Collections.singletonMap(topic1, partitionReplicas2));
+ verifyRebalanceWithMetadataChange(Optional.ofNullable(consumerRackId),
assignor, metadataResponse1, metadataResponse2, expectRebalance);
+ }
+
+ private void verifyRebalanceWithMetadataChange(Optional<String> rackId,
+ MockPartitionAssignor
partitionAssignor,
+ MetadataResponse
metadataResponse1,
+ MetadataResponse
metadataResponse2,
+ boolean expectRebalance) {
final String consumerId = "leader";
final List<String> topics = Arrays.asList(topic1, topic2);
- final List<TopicPartition> partitions = Arrays.asList(t1p, t2p);
+ final List<TopicPartition> partitions =
metadataResponse1.topicMetadata().stream()
+ .flatMap(t -> t.partitionMetadata().stream().map(p -> new
TopicPartition(t.topic(), p.partition())))
+ .collect(Collectors.toList());
subscriptions.subscribe(toSet(topics), rebalanceListener);
- client.updateMetadata(RequestTestUtils.metadataUpdateWith(1,
- Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2,
1))));
+ client.updateMetadata(metadataResponse1);
coordinator.maybeUpdateSubscriptionMetadata();
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -1462,7 +1559,7 @@ public abstract class ConsumerCoordinatorTest {
Map<String, List<String>> initialSubscription =
singletonMap(consumerId, topics);
partitionAssignor.prepare(singletonMap(consumerId, partitions));
- client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
initialSubscription, Errors.NONE));
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
initialSubscription, false, Errors.NONE, rackId));
client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -1475,13 +1572,18 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(1, rebalanceListener.assignedCount);
// Change metadata to trigger rebalance.
- client.updateMetadata(RequestTestUtils.metadataUpdateWith(1,
singletonMap(topic1, 1)));
+ client.updateMetadata(metadataResponse2);
coordinator.poll(time.timer(0));
+ if (!expectRebalance) {
+ assertEquals(0, client.requests().size());
+ return;
+ }
+ assertEquals(1, client.requests().size());
+
// Revert metadata to original value. Fail pending JoinGroup. Another
// JoinGroup should be sent, which will be completed successfully.
- client.updateMetadata(RequestTestUtils.metadataUpdateWith(1,
- Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2,
1))));
+ client.updateMetadata(metadataResponse1);
client.respond(joinGroupFollowerResponse(1, consumerId, "leader",
Errors.NOT_COORDINATOR));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
assertFalse(client.hasInFlightRequests());
@@ -1495,7 +1597,7 @@ public abstract class ConsumerCoordinatorTest {
JoinGroupRequest joinRequest = (JoinGroupRequest) request;
return consumerId.equals(joinRequest.data().memberId());
}
- }, joinGroupLeaderResponse(2, consumerId, initialSubscription,
Errors.NONE));
+ }, joinGroupLeaderResponse(2, consumerId, initialSubscription, false,
Errors.NONE, rackId));
client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -3494,16 +3596,10 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testSubscriptionRackId() {
- metrics.close();
- coordinator.close(time.timer(0));
String rackId = "rack-a";
- metrics = new Metrics(time);
- RackAwareAssignor assignor = new RackAwareAssignor();
-
- coordinator = new ConsumerCoordinator(rebalanceConfig, new
LogContext(), consumerClient,
- Collections.singletonList(assignor), metadata, subscriptions,
- metrics, consumerId + groupId, time, false,
autoCommitIntervalMs, null, false, rackId);
+ RackAwareAssignor assignor = new RackAwareAssignor(protocol);
+ createRackAwareCoordinator(rackId, assignor);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
client.updateMetadata(metadataResponse);
@@ -3927,6 +4023,45 @@ public abstract class ConsumerCoordinatorTest {
};
}
+ private void createRackAwareCoordinator(String rackId,
MockPartitionAssignor assignor) {
+ metrics.close();
+ coordinator.close(time.timer(0));
+
+ metrics = new Metrics(time);
+
+ coordinator = new ConsumerCoordinator(rebalanceConfig, new
LogContext(), consumerClient,
+ Collections.singletonList(assignor), metadata, subscriptions,
+ metrics, consumerId + groupId, time, false,
autoCommitIntervalMs, null, false, rackId);
+ }
+
+ private static MetadataResponse rackAwareMetadata(int numNodes,
+ List<String> racks,
+ Map<String,
List<List<Integer>>> partitionReplicas) {
+ final List<Node> nodes = new ArrayList<>(numNodes);
+ for (int i = 0; i < numNodes; i++)
+ nodes.add(new Node(i, "localhost", 1969 + i, racks.get(i %
racks.size())));
+
+ List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+ for (Map.Entry<String, List<List<Integer>>> topicPartitionCountEntry :
partitionReplicas.entrySet()) {
+ String topic = topicPartitionCountEntry.getKey();
+ int numPartitions = topicPartitionCountEntry.getValue().size();
+
+ List<MetadataResponse.PartitionMetadata> partitionMetadata = new
ArrayList<>(numPartitions);
+ for (int i = 0; i < numPartitions; i++) {
+ TopicPartition tp = new TopicPartition(topic, i);
+ List<Integer> replicaIds =
topicPartitionCountEntry.getValue().get(i);
+ partitionMetadata.add(new PartitionMetadata(
+ Errors.NONE, tp, Optional.of(replicaIds.get(0)),
Optional.empty(),
+ replicaIds, replicaIds, Collections.emptyList()));
+ }
+
+ topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE,
topic, Uuid.ZERO_UUID,
+ Topic.isInternal(topic), partitionMetadata,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
+ }
+
+ return RequestTestUtils.metadataResponse(nodes, "kafka-cluster", 0,
topicMetadata, ApiKeys.METADATA.latestVersion());
+ }
+
private static class MockCommitCallback implements OffsetCommitCallback {
public int invoked = 0;
public Exception exception = null;
@@ -3941,8 +4076,8 @@ public abstract class ConsumerCoordinatorTest {
private static class RackAwareAssignor extends MockPartitionAssignor {
private final Set<String> rackIds = new HashSet<>();
- RackAwareAssignor() {
- super(Arrays.asList(RebalanceProtocol.EAGER));
+ RackAwareAssignor(RebalanceProtocol rebalanceProtocol) {
+ super(Collections.singletonList(rebalanceProtocol));
}
@Override
diff --git
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index 1f940efc422..3a2f9430aa8 100644
---
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -18,6 +18,7 @@ package integration.kafka.server
import kafka.server.{BaseFetchRequestTest, KafkaConfig}
import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.NewPartitionReassignment
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer,
RangeAssignor}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.{Executors, TimeUnit}
import scala.jdk.CollectionConverters._
@@ -187,13 +189,17 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG,
server.config.rack.orNull)
consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
s"instance-${server.config.brokerId}")
+ consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG,
"1000")
createConsumer()
}
val producer = createProducer()
val executor = Executors.newFixedThreadPool(consumers.size)
- def verifyAssignments(assignments: List[Set[TopicPartition]]): Unit = {
+ def verifyAssignments(partitionOrder: List[Int], topics: String*): Unit = {
+ val assignments = partitionOrder.map { p =>
+ topics.map(topic => new TopicPartition(topic, p)).toSet
+ }
val assignmentFutures = consumers.zipWithIndex.map { case (consumer, i)
=>
executor.submit(() => {
val expectedAssignment = assignments(i)
@@ -212,18 +218,30 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
}
}
+
try {
// Rack-based assignment results in partitions assigned in reverse order
since partition racks are in the reverse order.
consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions)))
- verifyAssignments(partitionList.reverse.map(p => Set(new
TopicPartition(topicWithSingleRackPartitions, p))))
+ verifyAssignments(partitionList.reverse, topicWithSingleRackPartitions)
// Non-rack-aware assignment results in ordered partitions.
consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks)))
- verifyAssignments(partitionList.map(p => Set(new
TopicPartition(topicWithAllPartitionsOnAllRacks, p))))
+ verifyAssignments(partitionList, topicWithAllPartitionsOnAllRacks)
// Rack-aware assignment with co-partitioning results in reverse
assignment for both topics.
consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions,
topicWithAllPartitionsOnAllRacks).asJava))
- verifyAssignments(partitionList.reverse.map(p => Set(new
TopicPartition(topicWithAllPartitionsOnAllRacks, p), new
TopicPartition(topicWithSingleRackPartitions, p))))
+ verifyAssignments(partitionList.reverse,
topicWithAllPartitionsOnAllRacks, topicWithSingleRackPartitions)
+
+ // Perform reassignment for topicWithSingleRackPartitions to reverse the
replica racks and
+ // verify that change in replica racks results in re-assignment based on
new racks.
+ val admin = createAdminClient()
+ val reassignments = new util.HashMap[TopicPartition,
util.Optional[NewPartitionReassignment]]()
+ partitionList.foreach { p =>
+ val newAssignment = new
NewPartitionReassignment(Collections.singletonList(p))
+ reassignments.put(new TopicPartition(topicWithSingleRackPartitions,
p), util.Optional.of(newAssignment))
+ }
+ admin.alterPartitionReassignments(reassignments).all().get(15,
TimeUnit.SECONDS)
+ verifyAssignments(partitionList, topicWithAllPartitionsOnAllRacks,
topicWithSingleRackPartitions)
} finally {
executor.shutdownNow()