This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ddeb89f4a9f KAFKA-14595: Move AdminUtils to server-common (#14096)
ddeb89f4a9f is described below
commit ddeb89f4a9f31b5ff63661e18a94c403a8f45f69
Author: Nikolay <[email protected]>
AuthorDate: Wed Aug 9 11:32:45 2023 +0300
KAFKA-14595: Move AdminUtils to server-common (#14096)
Reviewers: Mickael Maison <[email protected]>
---
checkstyle/import-control-server-common.xml | 3 +
core/src/main/scala/kafka/admin/AdminUtils.scala | 239 -------------------
.../main/scala/kafka/admin/BrokerMetadata.scala | 23 --
.../kafka/admin/ReassignPartitionsCommand.scala | 11 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +-
.../main/scala/kafka/server/MetadataCache.scala | 2 +-
.../main/scala/kafka/server/ZkAdminManager.scala | 6 +-
.../kafka/server/metadata/KRaftMetadataCache.scala | 4 +-
.../kafka/server/metadata/ZkMetadataCache.scala | 7 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 5 +
core/src/main/scala/kafka/zk/AdminZkClient.scala | 20 +-
.../unit/kafka/admin/AdminRackAwareTest.scala | 67 +++---
.../scala/unit/kafka/admin/RackAwareTest.scala | 17 +-
.../kafka/admin/ReassignPartitionsUnitTest.scala | 15 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 7 +-
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 10 +-
.../java/org/apache/kafka/admin/AdminUtils.java | 256 +++++++++++++++++++++
.../org/apache/kafka/admin/BrokerMetadata.java | 52 +++++
18 files changed, 409 insertions(+), 339 deletions(-)
diff --git a/checkstyle/import-control-server-common.xml
b/checkstyle/import-control-server-common.xml
index ebf9984c7a0..a8d032c6221 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -101,4 +101,7 @@
</subpackage>
</subpackage>
+ <subpackage name="admin">
+ <allow pkg="org.apache.kafka.server.common" />
+ </subpackage>
</import-control>
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala
b/core/src/main/scala/kafka/admin/AdminUtils.scala
deleted file mode 100644
index 5ac09ab5348..00000000000
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import java.util.Random
-import kafka.utils.Logging
-import org.apache.kafka.common.errors.{InvalidPartitionsException,
InvalidReplicationFactorException}
-import org.apache.kafka.server.common.AdminOperationException
-
-import collection.{Map, mutable, _}
-
-object AdminUtils extends Logging {
- val rand = new Random
- val AdminClientId = "__admin_client"
-
- /**
- * There are 3 goals of replica assignment:
- *
- * <ol>
- * <li> Spread the replicas evenly among brokers.</li>
- * <li> For partitions assigned to a particular broker, their other replicas
are spread over the other brokers.</li>
- * <li> If all brokers have rack information, assign the replicas for each
partition to different racks if possible</li>
- * </ol>
- *
- * To achieve this goal for replica assignment without considering racks, we:
- * <ol>
- * <li> Assign the first replica of each partition by round-robin, starting
from a random position in the broker list.</li>
- * <li> Assign the remaining replicas of each partition with an increasing
shift.</li>
- * </ol>
- *
- * Here is an example of assigning
- * <table cellpadding="2" cellspacing="2">
- *
<tr><th>broker-0</th><th>broker-1</th><th>broker-2</th><th>broker-3</th><th>broker-4</th><th> </th></tr>
- * <tr><td>p0 </td><td>p1 </td><td>p2 </td><td>p3
</td><td>p4 </td><td>(1st replica)</td></tr>
- * <tr><td>p5 </td><td>p6 </td><td>p7 </td><td>p8
</td><td>p9 </td><td>(1st replica)</td></tr>
- * <tr><td>p4 </td><td>p0 </td><td>p1 </td><td>p2
</td><td>p3 </td><td>(2nd replica)</td></tr>
- * <tr><td>p8 </td><td>p9 </td><td>p5 </td><td>p6
</td><td>p7 </td><td>(2nd replica)</td></tr>
- * <tr><td>p3 </td><td>p4 </td><td>p0 </td><td>p1
</td><td>p2 </td><td>(3nd replica)</td></tr>
- * <tr><td>p7 </td><td>p8 </td><td>p9 </td><td>p5
</td><td>p6 </td><td>(3nd replica)</td></tr>
- * </table>
- *
- * <p>
- * To create rack aware assignment, this API will first create a rack
alternated broker list. For example,
- * from this brokerID -> rack mapping:</p>
- * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5
-> "rack1"
- * <br><br>
- * <p>
- * The rack alternated list will be:
- * </p>
- * 0, 3, 1, 5, 4, 2
- * <br><br>
- * <p>
- * Then an easy round-robin assignment can be applied. Assume 6 partitions
with replication factor of 3, the assignment
- * will be:
- * </p>
- * 0 -> 0,3,1 <br>
- * 1 -> 3,1,5 <br>
- * 2 -> 1,5,4 <br>
- * 3 -> 5,4,2 <br>
- * 4 -> 4,2,0 <br>
- * 5 -> 2,0,3 <br>
- * <br>
- * <p>
- * Once it has completed the first round-robin, if there are more partitions
to assign, the algorithm will start
- * shifting the followers. This is to ensure we will not always get the same
set of sequences.
- * In this case, if there is another partition to assign (partition #6), the
assignment will be:
- * </p>
- * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
- * <br><br>
- * <p>
- * The rack aware assignment always chooses the 1st replica of the partition
using round robin on the rack alternated
- * broker list. For rest of the replicas, it will be biased towards brokers
on racks that do not have
- * any replica assignment, until every rack has a replica. Then the
assignment will go back to round-robin on
- * the broker list.
- * </p>
- * <br>
- * <p>
- * As the result, if the number of replicas is equal to or greater than the
number of racks, it will ensure that
- * each rack will get at least one replica. Otherwise, each rack will get at
most one replica. In a perfect
- * situation where the number of replicas is the same as the number of racks
and each rack has the same number of
- * brokers, it guarantees that the replica distribution is even across
brokers and racks.
- * </p>
- * @return a Map from partition id to replica ids
- * @throws AdminOperationException If rack information is supplied but it is
incomplete, or if it is not possible to
- * assign each replica to a unique rack.
- *
- */
- def assignReplicasToBrokers(brokerMetadatas: Iterable[BrokerMetadata],
- nPartitions: Int,
- replicationFactor: Int,
- fixedStartIndex: Int = -1,
- startPartitionId: Int = -1): Map[Int, Seq[Int]]
= {
- if (nPartitions <= 0)
- throw new InvalidPartitionsException("Number of partitions must be
larger than 0.")
- if (replicationFactor <= 0)
- throw new InvalidReplicationFactorException("Replication factor must be
larger than 0.")
- if (replicationFactor > brokerMetadatas.size)
- throw new InvalidReplicationFactorException(s"Replication factor:
$replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
- if (brokerMetadatas.forall(_.rack.isEmpty))
- assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor,
brokerMetadatas.map(_.id), fixedStartIndex,
- startPartitionId)
- else {
- if (brokerMetadatas.exists(_.rack.isEmpty))
- throw new AdminOperationException("Not all brokers have rack
information for replica rack aware assignment.")
- assignReplicasToBrokersRackAware(nPartitions, replicationFactor,
brokerMetadatas, fixedStartIndex,
- startPartitionId)
- }
- }
-
- private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
- replicationFactor: Int,
- brokerList: Iterable[Int],
- fixedStartIndex: Int,
- startPartitionId: Int):
Map[Int, Seq[Int]] = {
- val ret = mutable.Map[Int, Seq[Int]]()
- val brokerArray = brokerList.toArray
- val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else
rand.nextInt(brokerArray.length)
- var currentPartitionId = math.max(0, startPartitionId)
- var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else
rand.nextInt(brokerArray.length)
- for (_ <- 0 until nPartitions) {
- if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length
== 0))
- nextReplicaShift += 1
- val firstReplicaIndex = (currentPartitionId + startIndex) %
brokerArray.length
- val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
- for (j <- 0 until replicationFactor - 1)
- replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex,
nextReplicaShift, j, brokerArray.length))
- ret.put(currentPartitionId, replicaBuffer)
- currentPartitionId += 1
- }
- ret
- }
-
- private def assignReplicasToBrokersRackAware(nPartitions: Int,
- replicationFactor: Int,
- brokerMetadatas:
Iterable[BrokerMetadata],
- fixedStartIndex: Int,
- startPartitionId: Int):
Map[Int, Seq[Int]] = {
- val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id,
Some(rack)) =>
- id -> rack
- }.toMap
- val numRacks = brokerRackMap.values.toSet.size
- val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
- val numBrokers = arrangedBrokerList.size
- val ret = mutable.Map[Int, Seq[Int]]()
- val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else
rand.nextInt(arrangedBrokerList.size)
- var currentPartitionId = math.max(0, startPartitionId)
- var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else
rand.nextInt(arrangedBrokerList.size)
- for (_ <- 0 until nPartitions) {
- if (currentPartitionId > 0 && (currentPartitionId %
arrangedBrokerList.size == 0))
- nextReplicaShift += 1
- val firstReplicaIndex = (currentPartitionId + startIndex) %
arrangedBrokerList.size
- val leader = arrangedBrokerList(firstReplicaIndex)
- val replicaBuffer = mutable.ArrayBuffer(leader)
- val racksWithReplicas = mutable.Set(brokerRackMap(leader))
- val brokersWithReplicas = mutable.Set(leader)
- var k = 0
- for (_ <- 0 until replicationFactor - 1) {
- var done = false
- while (!done) {
- val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex,
nextReplicaShift * numRacks, k, arrangedBrokerList.size))
- val rack = brokerRackMap(broker)
- // Skip this broker if
- // 1. there is already a broker in the same rack that has assigned a
replica AND there is one or more racks
- // that do not have any replica, or
- // 2. the broker has already assigned a replica AND there is one or
more brokers that do not have replica assigned
- if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size ==
numRacks)
- && (!brokersWithReplicas.contains(broker) ||
brokersWithReplicas.size == numBrokers)) {
- replicaBuffer += broker
- racksWithReplicas += rack
- brokersWithReplicas += broker
- done = true
- }
- k += 1
- }
- }
- ret.put(currentPartitionId, replicaBuffer)
- currentPartitionId += 1
- }
- ret
- }
-
- /**
- * Given broker and rack information, returns a list of brokers alternated
by the rack. Assume
- * this is the rack and its brokers:
- *
- * rack1: 0, 1, 2
- * rack2: 3, 4, 5
- * rack3: 6, 7, 8
- *
- * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
- *
- * This is essential to make sure that the assignReplicasToBrokers API can
use such list and
- * assign replicas to brokers in a simple round-robin fashion, while
ensuring an even
- * distribution of leader and replica counts on each broker and that
replicas are
- * distributed to all racks.
- */
- private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int,
String]): IndexedSeq[Int] = {
- val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack,
brokers) =>
- (rack, brokers.iterator)
- }
- val racks = brokersIteratorByRack.keys.toArray.sorted
- val result = new mutable.ArrayBuffer[Int]
- var rackIndex = 0
- while (result.size < brokerRackMap.size) {
- val rackIterator = brokersIteratorByRack(racks(rackIndex))
- if (rackIterator.hasNext)
- result += rackIterator.next()
- rackIndex = (rackIndex + 1) % racks.length
- }
- result
- }
-
- private[admin] def getInverseMap(brokerRackMap: Map[Int, String]):
Map[String, Seq[Int]] = {
- brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
- .groupBy { case (rack, _) => rack }
- .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_,
id) => id }.sorted) }
- }
-
- private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int,
replicaIndex: Int, nBrokers: Int): Int = {
- val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
- (firstReplicaIndex + shift) % nBrokers
- }
-
-}
diff --git a/core/src/main/scala/kafka/admin/BrokerMetadata.scala
b/core/src/main/scala/kafka/admin/BrokerMetadata.scala
deleted file mode 100644
index 86831e376e5..00000000000
--- a/core/src/main/scala/kafka/admin/BrokerMetadata.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.admin
-
-/**
- * Broker metadata used by admin tools.
- *
- * @param id an integer that uniquely identifies this broker
- * @param rack the rack of the broker, which is used to in rack aware
partition assignment for fault tolerance.
- * Examples: "RACK1", "us-east-1d"
- */
-case class BrokerMetadata(id: Int, rack: Option[String])
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 40f688c7085..2fc36648981 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -23,6 +23,7 @@ import kafka.server.DynamicConfig
import kafka.utils.{CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
import kafka.utils.json.JsonValue
+import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, ConfigEntry, NewPartitionReassignment, PartitionReassignment,
TopicDescription}
import org.apache.kafka.common.config.ConfigResource
@@ -606,8 +607,8 @@ object ReassignPartitionsCommand extends Logging {
val proposedAssignments = mutable.Map[TopicPartition, Seq[Int]]()
groupedByTopic.forKeyValue { (topic, assignment) =>
val (_, replicas) = assignment.head
- val assignedReplicas = AdminUtils.
- assignReplicasToBrokers(brokerMetadatas, assignment.size,
replicas.size)
+ val assignedReplicas =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.
+ assignReplicasToBrokers(brokerMetadatas.asJavaCollection,
assignment.size, replicas.size))
proposedAssignments ++= assignedReplicas.map { case (partition,
replicas) =>
new TopicPartition(topic, partition) -> replicas
}
@@ -688,12 +689,12 @@ object ReassignPartitionsCommand extends Logging {
filter(node => brokerSet.contains(node.id)).
map {
node => if (enableRackAwareness && node.rack != null) {
- BrokerMetadata(node.id, Some(node.rack))
+ new BrokerMetadata(node.id, Optional.of(node.rack))
} else {
- BrokerMetadata(node.id, None)
+ new BrokerMetadata(node.id, Optional.empty())
}
}.toSeq
- val numRackless = results.count(_.rack.isEmpty)
+ val numRackless = results.count(!_.rack.isPresent)
if (enableRackAwareness && numRackless != 0 && numRackless !=
results.size) {
throw new AdminOperationException("Not all brokers have rack
information. Add " +
"--disable-rack-aware in command line to make replica assignment
without rack " +
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5d6233d1100..15c4e356bdc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,7 +17,6 @@
package kafka.server
-import kafka.admin.AdminUtils
import kafka.api.ElectLeadersRequestOps
import kafka.controller.ReplicaAssignment
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
@@ -26,6 +25,7 @@ import kafka.server.QuotaFactory.{QuotaManagers,
UnboundedQuota}
import kafka.server.metadata.ConfigRepository
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.acl.AclOperation._
@@ -668,7 +668,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
- val internalTopicsAllowed = request.header.clientId ==
AdminUtils.AdminClientId
+ val internalTopicsAllowed = request.header.clientId ==
AdminUtils.ADMIN_CLIENT_ID
val transactionStatePartition =
if (produceRequest.transactionalId() == null)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index ecd64c17b91..2e60ce2ba7b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,8 +17,8 @@
package kafka.server
-import kafka.admin.BrokerMetadata
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData,
UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 00ef867de9c..90fbd7f3996 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -18,7 +18,6 @@ package kafka.server
import java.util
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs,
toLoggableProps}
import kafka.server.DynamicConfig.QuotaConfigs
@@ -26,6 +25,7 @@ import kafka.server.metadata.ZkConfigRepository
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource}
@@ -184,8 +184,8 @@ class ZkAdminManager(val config: KafkaConfig,
defaultReplicationFactor else topic.replicationFactor
val assignments = if (topic.assignments.isEmpty) {
- AdminUtils.assignReplicasToBrokers(
- brokers, resolvedNumPartitions, resolvedReplicationFactor)
+
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(
+ brokers.asJavaCollection, resolvedNumPartitions,
resolvedReplicationFactor))
} else {
val assignments = new mutable.HashMap[Int, Seq[Int]]
// Note: we don't check that replicaAssignment contains unknown
brokers - unlike in add-partitions case,
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 5e3ed11022b..559ccb2e917 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -20,6 +20,7 @@ package kafka.server.metadata
import kafka.controller.StateChangeLogger
import kafka.server.{CachedControllerId, KRaftCachedControllerId,
MetadataCache}
import kafka.utils.Logging
+import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
MetadataResponseTopic}
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition,
Uuid}
@@ -32,7 +33,6 @@ import org.apache.kafka.image.MetadataImage
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.ThreadLocalRandom
-import kafka.admin.BrokerMetadata
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData,
DescribeClientQuotasResponseData}
import
org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData,
DescribeUserScramCredentialsResponseData}
@@ -215,7 +215,7 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata]
= {
image.cluster().brokers().values().asScala.filterNot(_.fenced()).
- map(b => BrokerMetadata(b.id, b.rack.asScala))
+ map(b => new BrokerMetadata(b.id, b.rack))
}
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName):
Option[Node] = {
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 9159b791880..302d3fbf8de 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -18,10 +18,8 @@
package kafka.server.metadata
import java.util
-import java.util.Collections
+import java.util.{Collections, Optional}
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import kafka.admin.BrokerMetadata
-
import scala.collection.{Seq, Set, mutable}
import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint}
@@ -31,6 +29,7 @@ import kafka.server.{BrokerFeatures, CachedControllerId,
KRaftCachedControllerId
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import kafka.utils.Implicits._
+import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition,
Uuid}
@@ -254,7 +253,7 @@ class ZkMetadataCache(
override def hasAliveBroker(brokerId: Int): Boolean =
metadataSnapshot.aliveBrokers.contains(brokerId)
override def getAliveBrokers(): Iterable[BrokerMetadata] = {
- metadataSnapshot.aliveBrokers.values.map(b => new BrokerMetadata(b.id,
b.rack))
+ metadataSnapshot.aliveBrokers.values.map(b => new BrokerMetadata(b.id,
Optional.ofNullable(b.rack.orNull)))
}
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName):
Option[Node] = {
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 149d2a78374..8b960806042 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -34,7 +34,9 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.slf4j.event.Level
+import java.util
import scala.annotation.nowarn
+import scala.jdk.CollectionConverters._
/**
* General helper functions!
@@ -315,4 +317,7 @@ object CoreUtils {
elements.groupMapReduce(key)(f)(reduce)
}
+ def replicaToBrokerAssignmentAsScala(map: util.Map[Integer,
util.List[Integer]]): Map[Int, Seq[Int]] = {
+ map.asScala.map(e => (e._1.asInstanceOf[Int],
e._2.asScala.map(_.asInstanceOf[Int])))
+ }
}
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 07ca35cc9ae..d16394cab05 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -16,13 +16,14 @@
*/
package kafka.zk
-import java.util.Properties
-import kafka.admin.{AdminUtils, BrokerMetadata, RackAwareMode}
+import java.util.{Optional, Properties}
+import kafka.admin.RackAwareMode
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils._
import kafka.utils.Implicits._
+import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@@ -30,6 +31,7 @@ import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.NodeExistsException
+import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
/**
@@ -55,8 +57,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
usesTopicId: Boolean = false): Unit = {
- val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
- val replicaAssignment =
AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions,
replicationFactor)
+ val brokerMetadatas = getBrokerMetadatas(rackAwareMode).asJava
+ val replicaAssignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokerMetadatas,
partitions, replicationFactor))
createTopicWithAssignment(topic, topicConfig, replicaAssignment,
usesTopicId = usesTopicId)
}
@@ -77,10 +79,10 @@ class AdminZkClient(zkClient: KafkaZkClient) extends
Logging {
" to make replica assignment without rack information.")
}
val brokerMetadatas = rackAwareMode match {
- case RackAwareMode.Disabled => brokers.map(broker =>
BrokerMetadata(broker.id, None))
+ case RackAwareMode.Disabled => brokers.map(broker => new
BrokerMetadata(broker.id, Optional.empty()))
case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
- brokers.map(broker => BrokerMetadata(broker.id, None))
- case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+ brokers.map(broker => new BrokerMetadata(broker.id, Optional.empty()))
+ case _ => brokers.map(broker => new BrokerMetadata(broker.id,
Optional.ofNullable(broker.rack.orNull)))
}
brokerMetadatas.sortBy(_.id)
}
@@ -268,8 +270,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends
Logging {
val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
val startIndex = math.max(0, allBrokers.indexWhere(_.id >=
existingAssignmentPartition0.head))
- AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd,
existingAssignmentPartition0.size,
- startIndex, existingAssignment.size)
+
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(allBrokers.asJava,
partitionsToAdd, existingAssignmentPartition0.size,
+ startIndex, existingAssignment.size))
}
proposedAssignmentForNewPartitions.map { case (tp, replicas) =>
diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
index d2d665bef8a..9a3aaa31bc2 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
@@ -16,23 +16,26 @@
*/
package kafka.admin
-import kafka.utils.Logging
+import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
import org.apache.kafka.common.errors.InvalidReplicationFactorException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import java.util.Optional
import scala.collection.Map
+import scala.jdk.CollectionConverters._
class AdminRackAwareTest extends RackAwareTest with Logging {
@Test
def testGetRackAlternatedBrokerListAndAssignReplicasToBrokers(): Unit = {
val rackMap = Map(0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2",
4 -> "rack2", 5 -> "rack1")
- val newList = AdminUtils.getRackAlternatedBrokerList(rackMap)
- assertEquals(List(0, 3, 1, 5, 4, 2), newList)
- val anotherList = AdminUtils.getRackAlternatedBrokerList(rackMap.toMap - 5)
- assertEquals(List(0, 3, 1, 4, 2), anotherList)
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0)
+ val newList = AdminUtils.getRackAlternatedBrokerList(rackMap.map(e =>
(e._1.asInstanceOf[Integer], e._2)).asJava)
+ assertEquals(List(0, 3, 1, 5, 4, 2), newList.asScala.toList)
+ val anotherList = AdminUtils.getRackAlternatedBrokerList((rackMap.toMap -
5).map(e => (e._1.asInstanceOf[Integer], e._2)).asJava)
+ assertEquals(List(0, 3, 1, 4, 2), anotherList.asScala.toList)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap),
7, 3, 0, 0))
val expected = Map(0 -> List(0, 3, 1),
1 -> List(3, 1, 5),
2 -> List(1, 5, 4),
@@ -48,8 +51,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 6
val replicationFactor = 3
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor, 2, 0)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor, 2, 0))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor)
}
@@ -59,8 +62,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 6
val replicationFactor = 3
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor)
}
@@ -70,8 +73,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 13
val replicationFactor = 3
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor, 0, 0)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor, 0, 0))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor, verifyLeaderDistribution = false,
verifyReplicasDistribution = false)
}
@@ -81,8 +84,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 12
val replicationFactor = 3
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor, verifyReplicasDistribution = false)
}
@@ -92,8 +95,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 12
val replicationFactor = 2
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor)
}
@@ -103,8 +106,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
val brokerRackMapping = Map(6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 ->
"rack3", 10 -> "rack3", 11 -> "rack1")
val numPartitions = 12
val replicationFactor = 2
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor, startPartitionId = 12)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor, -1, 12))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor)
}
@@ -114,8 +117,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 6
val replicationFactor = 2
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor)
}
@@ -125,7 +128,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 4 -> "rack3")
val numPartitions = 3
val replicationFactor = 2
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions, replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions, replicationFactor))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions, replicationFactor)
}
@@ -135,8 +138,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
val replicationFactor = 3
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack1",
6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack1",
11 -> "rack3")
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor))
checkReplicaDistribution(assignment, brokerRackMapping,
brokerRackMapping.size, numPartitions,
replicationFactor)
}
@@ -146,7 +149,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
val numPartitions = 6
val replicationFactor = 5
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack2")
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions, replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions, replicationFactor))
assertEquals(List.fill(assignment.size)(replicationFactor),
assignment.values.toIndexedSeq.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 until numPartitions)
@@ -158,8 +161,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
val numPartitions = 6
val replicationFactor = 2
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 ->
"rack3", 4 -> "rack3", 5 -> "rack2")
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
- replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions,
+ replicationFactor))
assertEquals(List.fill(assignment.size)(replicationFactor),
assignment.values.toIndexedSeq.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 to 5)
@@ -171,7 +174,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
val numPartitions = 6
val replicationFactor = 3
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 ->
"rack1", 4 -> "rack1", 5 -> "rack1")
- val assignment =
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions, replicationFactor)
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
numPartitions, replicationFactor))
assertEquals(List.fill(assignment.size)(replicationFactor),
assignment.values.toIndexedSeq.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 until numPartitions)
@@ -187,16 +190,16 @@ class AdminRackAwareTest extends RackAwareTest with
Logging {
val numPartitions = 6
val replicationFactor = 4
val brokerMetadatas = toBrokerMetadata(rackInfo)
- assertEquals(brokerList, brokerMetadatas.map(_.id))
- val assignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas,
numPartitions, replicationFactor,
- fixedStartIndex = 2)
+ assertEquals(brokerList, brokerMetadatas.asScala.map(_.id))
+ val assignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokerMetadatas,
numPartitions, replicationFactor,
+ 2, -1))
checkReplicaDistribution(assignment, rackInfo, 5, 6, 4,
verifyRackAware = false, verifyLeaderDistribution = false,
verifyReplicasDistribution = false)
}
@Test
def testReplicaAssignment(): Unit = {
- val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
+ val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_,
Optional.empty())).asJava
// test 0 replication factor
assertThrows(classOf[InvalidReplicationFactorException],
@@ -219,7 +222,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging
{
8 -> List(3, 0, 1),
9 -> List(4, 1, 2))
- val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas,
10, 3, 0)
+ val actualAssignment =
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokerMetadatas,
10, 3, 0, -1))
assertEquals(expectedAssignment, actualAssignment)
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
index 6ce4f7ba55d..17ce08a39bf 100644
--- a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
@@ -16,9 +16,15 @@
*/
package kafka.admin
+import org.apache.kafka.admin.BrokerMetadata
+
import scala.collection.{Map, Seq, mutable}
import org.junit.jupiter.api.Assertions._
+import java.util
+import java.util.Optional
+import scala.jdk.CollectionConverters._
+
trait RackAwareTest {
def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
@@ -73,13 +79,16 @@ trait RackAwareTest {
ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
}
- def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int]
= Seq.empty): Seq[BrokerMetadata] =
- rackMap.toSeq.map { case (brokerId, rack) =>
- BrokerMetadata(brokerId, Some(rack))
+ def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int]
= Seq.empty): util.Collection[BrokerMetadata] = {
+ val res = rackMap.toSeq.map { case (brokerId, rack) =>
+ new BrokerMetadata(brokerId, Optional.of(rack))
} ++ brokersWithoutRack.map { brokerId =>
- BrokerMetadata(brokerId, None)
+ new BrokerMetadata(brokerId, Optional.empty())
}.sortBy(_.id)
+ res.asJavaCollection
+ }
+
}
case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]],
brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
index fab7907e17b..7aa0f8eec23 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
@@ -18,9 +18,10 @@
package kafka.admin
import java.util.concurrent.ExecutionException
-import java.util.{Arrays, Collections}
+import java.util.{Arrays, Collections, Optional}
import kafka.admin.ReassignPartitionsCommand._
import kafka.utils.Exit
+import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.clients.admin.{Config, MockAdminClient,
PartitionReassignment}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidReplicationFactorException,
UnknownTopicOrPartitionException}
@@ -230,19 +231,19 @@ class ReassignPartitionsUnitTest {
build()
try {
assertEquals(Seq(
- BrokerMetadata(0, Some("rack0")),
- BrokerMetadata(1, Some("rack1"))
+ new BrokerMetadata(0, Optional.of("rack0")),
+ new BrokerMetadata(1, Optional.of("rack1"))
), getBrokerMetadata(adminClient, Seq(0, 1), true))
assertEquals(Seq(
- BrokerMetadata(0, None),
- BrokerMetadata(1, None)
+ new BrokerMetadata(0, Optional.empty()),
+ new BrokerMetadata(1, Optional.empty())
), getBrokerMetadata(adminClient, Seq(0, 1), false))
assertStartsWith("Not all brokers have rack information",
assertThrows(classOf[AdminOperationException],
() => getBrokerMetadata(adminClient, Seq(1, 2), true)).getMessage)
assertEquals(Seq(
- BrokerMetadata(1, None),
- BrokerMetadata(2, None)
+ new BrokerMetadata(1, Optional.empty()),
+ new BrokerMetadata(2, Optional.empty())
), getBrokerMetadata(adminClient, Seq(1, 2), false))
} finally {
adminClient.close()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 15ccb7d339b..08cfffdb784 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -41,6 +41,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.utils.Implicits._
import kafka.zk._
+import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.clients.{ClientResponse, CommonClientConfigs}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
@@ -897,14 +898,14 @@ object TestUtils extends Logging {
}
def createBrokersInZk(zkClient: KafkaZkClient, ids: Seq[Int]): Seq[Broker] =
- createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkClient)
+ createBrokersInZk(ids.map(new BrokerMetadata(_, Optional.empty())),
zkClient)
- def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata],
zkClient: KafkaZkClient): Seq[Broker] = {
+ def createBrokersInZk(brokerMetadatas: Seq[BrokerMetadata], zkClient:
KafkaZkClient): Seq[Broker] = {
zkClient.makeSurePersistentPathExists(BrokerIdsZNode.path)
val brokers = brokerMetadatas.map { b =>
val protocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(protocol)
- Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)),
b.rack)
+ Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)),
if (b.rack.isPresent) Some(b.rack.get()) else None)
}
brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id,
b.endPoints, rack = b.rack),
MetadataVersion.latest, jmxPort = -1)))
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index ba0a2583598..e3171647ff1 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -17,7 +17,7 @@
package kafka.admin
import java.util
-import java.util.Properties
+import java.util.{Optional, Properties}
import kafka.controller.ReplicaAssignment
import kafka.server.DynamicConfig.Broker._
import kafka.server.KafkaConfig._
@@ -336,22 +336,22 @@ class AdminZkClientTest extends QuorumTestHarness with
Logging with RackAwareTes
val brokerList = 0 to 5
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
5 -> "rack3")
val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack =
brokerList.filterNot(rackInfo.keySet))
- TestUtils.createBrokersInZk(brokerMetadatas, zkClient)
+ TestUtils.createBrokersInZk(brokerMetadatas.asScala.toSeq, zkClient)
val processedMetadatas1 =
adminZkClient.getBrokerMetadatas(RackAwareMode.Disabled)
assertEquals(brokerList, processedMetadatas1.map(_.id))
- assertEquals(List.fill(brokerList.size)(None),
processedMetadatas1.map(_.rack))
+ assertEquals(List.fill(brokerList.size)(Optional.empty()),
processedMetadatas1.map(_.rack))
val processedMetadatas2 =
adminZkClient.getBrokerMetadatas(RackAwareMode.Safe)
assertEquals(brokerList, processedMetadatas2.map(_.id))
- assertEquals(List.fill(brokerList.size)(None),
processedMetadatas2.map(_.rack))
+ assertEquals(List.fill(brokerList.size)(Optional.empty()),
processedMetadatas2.map(_.rack))
assertThrows(classOf[AdminOperationException], () =>
adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced))
val partialList = List(0, 1, 2, 3, 5)
val processedMetadatas3 =
adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced, Some(partialList))
assertEquals(partialList, processedMetadatas3.map(_.id))
- assertEquals(partialList.map(rackInfo),
processedMetadatas3.flatMap(_.rack))
+ assertEquals(partialList.map(rackInfo),
processedMetadatas3.map(_.rack.get()))
val numPartitions = 3
adminZkClient.createTopic("foo", numPartitions, 2, rackAwareMode =
RackAwareMode.Safe)
diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
new file mode 100644
index 00000000000..9504954b76e
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.admin;
+
+import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.server.common.AdminOperationException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class AdminUtils {
+ static final Random RAND = new Random();
+
+ public static final String ADMIN_CLIENT_ID = "__admin_client";
+
+ public static Map<Integer, List<Integer>>
assignReplicasToBrokers(Collection<BrokerMetadata> brokerMetadatas,
+ int
nPartitions,
+ int
replicationFactor) {
+ return assignReplicasToBrokers(brokerMetadatas, nPartitions,
replicationFactor, -1, -1);
+ }
+
+ /**
+ * There are 3 goals of replica assignment:
+ *
+ * <ol>
+ * <li> Spread the replicas evenly among brokers.</li>
+ * <li> For partitions assigned to a particular broker, their other
replicas are spread over the other brokers.</li>
+ * <li> If all brokers have rack information, assign the replicas for each
partition to different racks if possible</li>
+ * </ol>
+ *
+ * To achieve this goal for replica assignment without considering racks,
we:
+ * <ol>
+ * <li> Assign the first replica of each partition by round-robin,
starting from a random position in the broker list.</li>
+ * <li> Assign the remaining replicas of each partition with an increasing
shift.</li>
+ * </ol>
+ *
+ * Here is an example of assigning
+ * <table cellpadding="2" cellspacing="2">
+ *
<tr><th>broker-0</th><th>broker-1</th><th>broker-2</th><th>broker-3</th><th>broker-4</th><th> </th></tr>
+ * <tr><td>p0 </td><td>p1 </td><td>p2 </td><td>p3
</td><td>p4 </td><td>(1st replica)</td></tr>
+ * <tr><td>p5 </td><td>p6 </td><td>p7 </td><td>p8
</td><td>p9 </td><td>(1st replica)</td></tr>
+ * <tr><td>p4 </td><td>p0 </td><td>p1 </td><td>p2
</td><td>p3 </td><td>(2nd replica)</td></tr>
+ * <tr><td>p8 </td><td>p9 </td><td>p5 </td><td>p6
</td><td>p7 </td><td>(2nd replica)</td></tr>
+ * <tr><td>p3 </td><td>p4 </td><td>p0 </td><td>p1
</td><td>p2 </td><td>(3nd replica)</td></tr>
+ * <tr><td>p7 </td><td>p8 </td><td>p9 </td><td>p5
</td><td>p6 </td><td>(3nd replica)</td></tr>
+ * </table>
+ *
+ * <p>
+ * To create rack aware assignment, this API will first create a rack
alternated broker list. For example,
+ * from this brokerID -> rack mapping:</p>
+ * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5
-> "rack1"
+ * <br><br>
+ * <p>
+ * The rack alternated list will be:
+ * </p>
+ * 0, 3, 1, 5, 4, 2
+ * <br><br>
+ * <p>
+ * Then an easy round-robin assignment can be applied. Assume 6 partitions
with replication factor of 3, the assignment
+ * will be:
+ * </p>
+ * 0 -> 0,3,1 <br>
+ * 1 -> 3,1,5 <br>
+ * 2 -> 1,5,4 <br>
+ * 3 -> 5,4,2 <br>
+ * 4 -> 4,2,0 <br>
+ * 5 -> 2,0,3 <br>
+ * <br>
+ * <p>
+ * Once it has completed the first round-robin, if there are more
partitions to assign, the algorithm will start
+ * shifting the followers. This is to ensure we will not always get the
same set of sequences.
+ * In this case, if there is another partition to assign (partition #6),
the assignment will be:
+ * </p>
+ * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
+ * <br><br>
+ * <p>
+ * The rack aware assignment always chooses the 1st replica of the
partition using round robin on the rack alternated
+ * broker list. For rest of the replicas, it will be biased towards
brokers on racks that do not have
+ * any replica assignment, until every rack has a replica. Then the
assignment will go back to round-robin on
+ * the broker list.
+ * </p>
+ * <br>
+ * <p>
+ * As the result, if the number of replicas is equal to or greater than
the number of racks, it will ensure that
+ * each rack will get at least one replica. Otherwise, each rack will get
at most one replica. In a perfect
+ * situation where the number of replicas is the same as the number of
racks and each rack has the same number of
+ * brokers, it guarantees that the replica distribution is even across
brokers and racks.
+ * </p>
+ * @return a Map from partition id to replica ids
+ * @throws AdminOperationException If rack information is supplied but it
is incomplete, or if it is not possible to
+ * assign each replica to a unique rack.
+ *
+ */
+ public static Map<Integer, List<Integer>>
assignReplicasToBrokers(Collection<BrokerMetadata> brokerMetadatas,
+ int
nPartitions,
+ int
replicationFactor,
+ int
fixedStartIndex,
+ int
startPartitionId) {
+ if (nPartitions <= 0)
+ throw new InvalidPartitionsException("Number of partitions must be
larger than 0.");
+ if (replicationFactor <= 0)
+ throw new InvalidReplicationFactorException("Replication factor
must be larger than 0.");
+ if (replicationFactor > brokerMetadatas.size())
+ throw new InvalidReplicationFactorException("Replication factor: "
+ replicationFactor + " larger than available brokers: " +
brokerMetadatas.size() + ".");
+ if (brokerMetadatas.stream().noneMatch(b -> b.rack.isPresent()))
+ return assignReplicasToBrokersRackUnaware(nPartitions,
replicationFactor, brokerMetadatas.stream().map(b ->
b.id).collect(Collectors.toList()), fixedStartIndex,
+ startPartitionId);
+ else {
+ if (brokerMetadatas.stream().anyMatch(b -> !b.rack.isPresent()))
+ throw new AdminOperationException("Not all brokers have rack
information for replica rack aware assignment.");
+ return assignReplicasToBrokersRackAware(nPartitions,
replicationFactor, brokerMetadatas, fixedStartIndex,
+ startPartitionId);
+ }
+ }
+
+ private static Map<Integer, List<Integer>>
assignReplicasToBrokersRackUnaware(int nPartitions,
+
int replicationFactor,
+
List<Integer> brokerList,
+
int fixedStartIndex,
+
int startPartitionId) {
+ Map<Integer, List<Integer>> ret = new HashMap<>();
+ int startIndex = fixedStartIndex >= 0 ? fixedStartIndex :
RAND.nextInt(brokerList.size());
+ int currentPartitionId = Math.max(0, startPartitionId);
+ int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex :
RAND.nextInt(brokerList.size());
+ for (int i = 0; i < nPartitions; i++) {
+ if (currentPartitionId > 0 && (currentPartitionId %
brokerList.size() == 0))
+ nextReplicaShift += 1;
+ int firstReplicaIndex = (currentPartitionId + startIndex) %
brokerList.size();
+ List<Integer> replicaBuffer = new ArrayList<>();
+ replicaBuffer.add(brokerList.get(firstReplicaIndex));
+ for (int j = 0; j < replicationFactor - 1; j++)
+
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex,
nextReplicaShift, j, brokerList.size())));
+ ret.put(currentPartitionId, replicaBuffer);
+ currentPartitionId += 1;
+ }
+ return ret;
+ }
+
+ private static Map<Integer, List<Integer>>
assignReplicasToBrokersRackAware(int nPartitions,
+
int replicationFactor,
+
Collection<BrokerMetadata> brokerMetadatas,
+
int fixedStartIndex,
+
int startPartitionId) {
+ Map<Integer, String> brokerRackMap = new HashMap<>();
+ brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get()));
+ int numRacks = new HashSet<>(brokerRackMap.values()).size();
+ List<Integer> arrangedBrokerList =
getRackAlternatedBrokerList(brokerRackMap);
+ int numBrokers = arrangedBrokerList.size();
+ Map<Integer, List<Integer>> ret = new HashMap<>();
+ int startIndex = fixedStartIndex >= 0 ? fixedStartIndex :
RAND.nextInt(arrangedBrokerList.size());
+ int currentPartitionId = Math.max(0, startPartitionId);
+ int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex :
RAND.nextInt(arrangedBrokerList.size());
+ for (int i = 0; i < nPartitions; i++) {
+ if (currentPartitionId > 0 && (currentPartitionId %
arrangedBrokerList.size() == 0))
+ nextReplicaShift += 1;
+ int firstReplicaIndex = (currentPartitionId + startIndex) %
arrangedBrokerList.size();
+ int leader = arrangedBrokerList.get(firstReplicaIndex);
+ List<Integer> replicaBuffer = new ArrayList<>();
+ replicaBuffer.add(leader);
+ Set<String> racksWithReplicas = new HashSet<>();
+ racksWithReplicas.add(brokerRackMap.get(leader));
+ Set<Integer> brokersWithReplicas = new HashSet<>();
+ brokersWithReplicas.add(leader);
+ int k = 0;
+ for (int j = 0; j < replicationFactor - 1; j++) {
+ boolean done = false;
+ while (!done) {
+ Integer broker =
arrangedBrokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift *
numRacks, k, arrangedBrokerList.size()));
+ String rack = brokerRackMap.get(broker);
+ // Skip this broker if
+ // 1. there is already a broker in the same rack that has
assigned a replica AND there is one or more racks
+ // that do not have any replica, or
+ // 2. the broker has already assigned a replica AND there
is one or more brokers that do not have replica assigned
+ if ((!racksWithReplicas.contains(rack) ||
racksWithReplicas.size() == numRacks)
+ && (!brokersWithReplicas.contains(broker) ||
brokersWithReplicas.size() == numBrokers)) {
+ replicaBuffer.add(broker);
+ racksWithReplicas.add(rack);
+ brokersWithReplicas.add(broker);
+ done = true;
+ }
+ k += 1;
+ }
+ }
+ ret.put(currentPartitionId, replicaBuffer);
+ currentPartitionId += 1;
+ }
+ return ret;
+ }
+
+ /**
+ * Given broker and rack information, returns a list of brokers alternated
by the rack. Assume
+ * this is the rack and its brokers:
+ *
+ * rack1: 0, 1, 2
+ * rack2: 3, 4, 5
+ * rack3: 6, 7, 8
+ *
+ * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
+ *
+ * This is essential to make sure that the assignReplicasToBrokers API can
use such list and
+ * assign replicas to brokers in a simple round-robin fashion, while
ensuring an even
+ * distribution of leader and replica counts on each broker and that
replicas are
+ * distributed to all racks.
+ */
+ public static List<Integer> getRackAlternatedBrokerList(Map<Integer,
String> brokerRackMap) {
+ Map<String, Iterator<Integer>> brokersIteratorByRack = new HashMap<>();
+ getInverseMap(brokerRackMap).forEach((rack, brokers) ->
brokersIteratorByRack.put(rack, brokers.iterator()));
+ String[] racks = brokersIteratorByRack.keySet().toArray(new String[0]);
+ Arrays.sort(racks);
+ List<Integer> result = new ArrayList<>();
+ int rackIndex = 0;
+ while (result.size() < brokerRackMap.size()) {
+ Iterator<Integer> rackIterator =
brokersIteratorByRack.get(racks[rackIndex]);
+ if (rackIterator.hasNext())
+ result.add(rackIterator.next());
+ rackIndex = (rackIndex + 1) % racks.length;
+ }
+ return result;
+ }
+
+ static Map<String, List<Integer>> getInverseMap(Map<Integer, String>
brokerRackMap) {
+ Map<String, List<Integer>> results = new HashMap<>();
+ brokerRackMap.forEach((id, rack) -> results.computeIfAbsent(rack, key
-> new ArrayList<>()).add(id));
+ results.forEach((rack, rackAndIdList) ->
rackAndIdList.sort(Integer::compareTo));
+ return results;
+ }
+
+ static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int
replicaIndex, int nBrokers) {
+ long shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1);
+ return (int) ((firstReplicaIndex + shift) % nBrokers);
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
b/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
new file mode 100644
index 00000000000..98216fa0858
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Broker metadata used by admin tools.
+ */
+public class BrokerMetadata {
+ public final int id;
+
+ public final Optional<String> rack;
+
+ /**
+ * @param id an integer that uniquely identifies this broker
+ * @param rack the rack of the broker, which is used to in rack aware
partition assignment for fault tolerance.
+ * Examples: "RACK1", "us-east-1d"
+ */
+ public BrokerMetadata(int id, Optional<String> rack) {
+ this.id = id;
+ this.rack = rack;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ BrokerMetadata that = (BrokerMetadata) o;
+ return id == that.id && Objects.equals(rack, that.rack);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, rack);
+ }
+}