This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ddeb89f4a9f KAFKA-14595: Move AdminUtils to server-common (#14096)
ddeb89f4a9f is described below

commit ddeb89f4a9f31b5ff63661e18a94c403a8f45f69
Author: Nikolay <[email protected]>
AuthorDate: Wed Aug 9 11:32:45 2023 +0300

    KAFKA-14595: Move AdminUtils to server-common (#14096)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 checkstyle/import-control-server-common.xml        |   3 +
 core/src/main/scala/kafka/admin/AdminUtils.scala   | 239 -------------------
 .../main/scala/kafka/admin/BrokerMetadata.scala    |  23 --
 .../kafka/admin/ReassignPartitionsCommand.scala    |  11 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   4 +-
 .../main/scala/kafka/server/MetadataCache.scala    |   2 +-
 .../main/scala/kafka/server/ZkAdminManager.scala   |   6 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |   4 +-
 .../kafka/server/metadata/ZkMetadataCache.scala    |   7 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala    |   5 +
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  20 +-
 .../unit/kafka/admin/AdminRackAwareTest.scala      |  67 +++---
 .../scala/unit/kafka/admin/RackAwareTest.scala     |  17 +-
 .../kafka/admin/ReassignPartitionsUnitTest.scala   |  15 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   7 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |  10 +-
 .../java/org/apache/kafka/admin/AdminUtils.java    | 256 +++++++++++++++++++++
 .../org/apache/kafka/admin/BrokerMetadata.java     |  52 +++++
 18 files changed, 409 insertions(+), 339 deletions(-)

diff --git a/checkstyle/import-control-server-common.xml 
b/checkstyle/import-control-server-common.xml
index ebf9984c7a0..a8d032c6221 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -101,4 +101,7 @@
         </subpackage>
     </subpackage>
 
+    <subpackage name="admin">
+        <allow pkg="org.apache.kafka.server.common" />
+    </subpackage>
 </import-control>
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
deleted file mode 100644
index 5ac09ab5348..00000000000
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import java.util.Random
-import kafka.utils.Logging
-import org.apache.kafka.common.errors.{InvalidPartitionsException, 
InvalidReplicationFactorException}
-import org.apache.kafka.server.common.AdminOperationException
-
-import collection.{Map, mutable, _}
-
-object AdminUtils extends Logging {
-  val rand = new Random
-  val AdminClientId = "__admin_client"
-
-  /**
-   * There are 3 goals of replica assignment:
-   *
-   * <ol>
-   * <li> Spread the replicas evenly among brokers.</li>
-   * <li> For partitions assigned to a particular broker, their other replicas 
are spread over the other brokers.</li>
-   * <li> If all brokers have rack information, assign the replicas for each 
partition to different racks if possible</li>
-   * </ol>
-   *
-   * To achieve this goal for replica assignment without considering racks, we:
-   * <ol>
-   * <li> Assign the first replica of each partition by round-robin, starting 
from a random position in the broker list.</li>
-   * <li> Assign the remaining replicas of each partition with an increasing 
shift.</li>
-   * </ol>
-   *
-   * Here is an example of assigning
-   * <table cellpadding="2" cellspacing="2">
-   * 
<tr><th>broker-0</th><th>broker-1</th><th>broker-2</th><th>broker-3</th><th>broker-4</th><th>&nbsp;</th></tr>
-   * <tr><td>p0      </td><td>p1      </td><td>p2      </td><td>p3      
</td><td>p4      </td><td>(1st replica)</td></tr>
-   * <tr><td>p5      </td><td>p6      </td><td>p7      </td><td>p8      
</td><td>p9      </td><td>(1st replica)</td></tr>
-   * <tr><td>p4      </td><td>p0      </td><td>p1      </td><td>p2      
</td><td>p3      </td><td>(2nd replica)</td></tr>
-   * <tr><td>p8      </td><td>p9      </td><td>p5      </td><td>p6      
</td><td>p7      </td><td>(2nd replica)</td></tr>
-   * <tr><td>p3      </td><td>p4      </td><td>p0      </td><td>p1      
</td><td>p2      </td><td>(3nd replica)</td></tr>
-   * <tr><td>p7      </td><td>p8      </td><td>p9      </td><td>p5      
</td><td>p6      </td><td>(3nd replica)</td></tr>
-   * </table>
-   *
-   * <p>
-   * To create rack aware assignment, this API will first create a rack 
alternated broker list. For example,
-   * from this brokerID -> rack mapping:</p>
-   * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 
-> "rack1"
-   * <br><br>
-   * <p>
-   * The rack alternated list will be:
-   * </p>
-   * 0, 3, 1, 5, 4, 2
-   * <br><br>
-   * <p>
-   * Then an easy round-robin assignment can be applied. Assume 6 partitions 
with replication factor of 3, the assignment
-   * will be:
-   * </p>
-   * 0 -> 0,3,1 <br>
-   * 1 -> 3,1,5 <br>
-   * 2 -> 1,5,4 <br>
-   * 3 -> 5,4,2 <br>
-   * 4 -> 4,2,0 <br>
-   * 5 -> 2,0,3 <br>
-   * <br>
-   * <p>
-   * Once it has completed the first round-robin, if there are more partitions 
to assign, the algorithm will start
-   * shifting the followers. This is to ensure we will not always get the same 
set of sequences.
-   * In this case, if there is another partition to assign (partition #6), the 
assignment will be:
-   * </p>
-   * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
-   * <br><br>
-   * <p>
-   * The rack aware assignment always chooses the 1st replica of the partition 
using round robin on the rack alternated
-   * broker list. For rest of the replicas, it will be biased towards brokers 
on racks that do not have
-   * any replica assignment, until every rack has a replica. Then the 
assignment will go back to round-robin on
-   * the broker list.
-   * </p>
-   * <br>
-   * <p>
-   * As the result, if the number of replicas is equal to or greater than the 
number of racks, it will ensure that
-   * each rack will get at least one replica. Otherwise, each rack will get at 
most one replica. In a perfect
-   * situation where the number of replicas is the same as the number of racks 
and each rack has the same number of
-   * brokers, it guarantees that the replica distribution is even across 
brokers and racks.
-   * </p>
-   * @return a Map from partition id to replica ids
-   * @throws AdminOperationException If rack information is supplied but it is 
incomplete, or if it is not possible to
-   *                                 assign each replica to a unique rack.
-   *
-   */
-  def assignReplicasToBrokers(brokerMetadatas: Iterable[BrokerMetadata],
-                              nPartitions: Int,
-                              replicationFactor: Int,
-                              fixedStartIndex: Int = -1,
-                              startPartitionId: Int = -1): Map[Int, Seq[Int]] 
= {
-    if (nPartitions <= 0)
-      throw new InvalidPartitionsException("Number of partitions must be 
larger than 0.")
-    if (replicationFactor <= 0)
-      throw new InvalidReplicationFactorException("Replication factor must be 
larger than 0.")
-    if (replicationFactor > brokerMetadatas.size)
-      throw new InvalidReplicationFactorException(s"Replication factor: 
$replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
-    if (brokerMetadatas.forall(_.rack.isEmpty))
-      assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, 
brokerMetadatas.map(_.id), fixedStartIndex,
-        startPartitionId)
-    else {
-      if (brokerMetadatas.exists(_.rack.isEmpty))
-        throw new AdminOperationException("Not all brokers have rack 
information for replica rack aware assignment.")
-      assignReplicasToBrokersRackAware(nPartitions, replicationFactor, 
brokerMetadatas, fixedStartIndex,
-        startPartitionId)
-    }
-  }
-
-  private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
-                                                 replicationFactor: Int,
-                                                 brokerList: Iterable[Int],
-                                                 fixedStartIndex: Int,
-                                                 startPartitionId: Int): 
Map[Int, Seq[Int]] = {
-    val ret = mutable.Map[Int, Seq[Int]]()
-    val brokerArray = brokerList.toArray
-    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else 
rand.nextInt(brokerArray.length)
-    var currentPartitionId = math.max(0, startPartitionId)
-    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else 
rand.nextInt(brokerArray.length)
-    for (_ <- 0 until nPartitions) {
-      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length 
== 0))
-        nextReplicaShift += 1
-      val firstReplicaIndex = (currentPartitionId + startIndex) % 
brokerArray.length
-      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
-      for (j <- 0 until replicationFactor - 1)
-        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, 
nextReplicaShift, j, brokerArray.length))
-      ret.put(currentPartitionId, replicaBuffer)
-      currentPartitionId += 1
-    }
-    ret
-  }
-
-  private def assignReplicasToBrokersRackAware(nPartitions: Int,
-                                               replicationFactor: Int,
-                                               brokerMetadatas: 
Iterable[BrokerMetadata],
-                                               fixedStartIndex: Int,
-                                               startPartitionId: Int): 
Map[Int, Seq[Int]] = {
-    val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, 
Some(rack)) =>
-      id -> rack
-    }.toMap
-    val numRacks = brokerRackMap.values.toSet.size
-    val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
-    val numBrokers = arrangedBrokerList.size
-    val ret = mutable.Map[Int, Seq[Int]]()
-    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else 
rand.nextInt(arrangedBrokerList.size)
-    var currentPartitionId = math.max(0, startPartitionId)
-    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else 
rand.nextInt(arrangedBrokerList.size)
-    for (_ <- 0 until nPartitions) {
-      if (currentPartitionId > 0 && (currentPartitionId % 
arrangedBrokerList.size == 0))
-        nextReplicaShift += 1
-      val firstReplicaIndex = (currentPartitionId + startIndex) % 
arrangedBrokerList.size
-      val leader = arrangedBrokerList(firstReplicaIndex)
-      val replicaBuffer = mutable.ArrayBuffer(leader)
-      val racksWithReplicas = mutable.Set(brokerRackMap(leader))
-      val brokersWithReplicas = mutable.Set(leader)
-      var k = 0
-      for (_ <- 0 until replicationFactor - 1) {
-        var done = false
-        while (!done) {
-          val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, 
nextReplicaShift * numRacks, k, arrangedBrokerList.size))
-          val rack = brokerRackMap(broker)
-          // Skip this broker if
-          // 1. there is already a broker in the same rack that has assigned a 
replica AND there is one or more racks
-          //    that do not have any replica, or
-          // 2. the broker has already assigned a replica AND there is one or 
more brokers that do not have replica assigned
-          if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == 
numRacks)
-              && (!brokersWithReplicas.contains(broker) || 
brokersWithReplicas.size == numBrokers)) {
-            replicaBuffer += broker
-            racksWithReplicas += rack
-            brokersWithReplicas += broker
-            done = true
-          }
-          k += 1
-        }
-      }
-      ret.put(currentPartitionId, replicaBuffer)
-      currentPartitionId += 1
-    }
-    ret
-  }
-
-  /**
-    * Given broker and rack information, returns a list of brokers alternated 
by the rack. Assume
-    * this is the rack and its brokers:
-    *
-    * rack1: 0, 1, 2
-    * rack2: 3, 4, 5
-    * rack3: 6, 7, 8
-    *
-    * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
-    *
-    * This is essential to make sure that the assignReplicasToBrokers API can 
use such list and
-    * assign replicas to brokers in a simple round-robin fashion, while 
ensuring an even
-    * distribution of leader and replica counts on each broker and that 
replicas are
-    * distributed to all racks.
-    */
-  private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, 
String]): IndexedSeq[Int] = {
-    val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, 
brokers) =>
-      (rack, brokers.iterator)
-    }
-    val racks = brokersIteratorByRack.keys.toArray.sorted
-    val result = new mutable.ArrayBuffer[Int]
-    var rackIndex = 0
-    while (result.size < brokerRackMap.size) {
-      val rackIterator = brokersIteratorByRack(racks(rackIndex))
-      if (rackIterator.hasNext)
-        result += rackIterator.next()
-      rackIndex = (rackIndex + 1) % racks.length
-    }
-    result
-  }
-
-  private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): 
Map[String, Seq[Int]] = {
-    brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
-      .groupBy { case (rack, _) => rack }
-      .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, 
id) => id }.sorted) }
-  }
-
-  private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, 
replicaIndex: Int, nBrokers: Int): Int = {
-    val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
-    (firstReplicaIndex + shift) % nBrokers
-  }
-
-}
diff --git a/core/src/main/scala/kafka/admin/BrokerMetadata.scala 
b/core/src/main/scala/kafka/admin/BrokerMetadata.scala
deleted file mode 100644
index 86831e376e5..00000000000
--- a/core/src/main/scala/kafka/admin/BrokerMetadata.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-
-package kafka.admin
-
-/**
-  * Broker metadata used by admin tools.
-  *
-  * @param id an integer that uniquely identifies this broker
-  * @param rack the rack of the broker, which is used to in rack aware 
partition assignment for fault tolerance.
-  *             Examples: "RACK1", "us-east-1d"
-  */
-case class BrokerMetadata(id: Int, rack: Option[String])
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 40f688c7085..2fc36648981 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -23,6 +23,7 @@ import kafka.server.DynamicConfig
 import kafka.utils.{CoreUtils, Exit, Json, Logging}
 import kafka.utils.Implicits._
 import kafka.utils.json.JsonValue
+import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, ConfigEntry, NewPartitionReassignment, PartitionReassignment, 
TopicDescription}
 import org.apache.kafka.common.config.ConfigResource
@@ -606,8 +607,8 @@ object ReassignPartitionsCommand extends Logging {
     val proposedAssignments = mutable.Map[TopicPartition, Seq[Int]]()
     groupedByTopic.forKeyValue { (topic, assignment) =>
       val (_, replicas) = assignment.head
-      val assignedReplicas = AdminUtils.
-        assignReplicasToBrokers(brokerMetadatas, assignment.size, 
replicas.size)
+      val assignedReplicas = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.
+        assignReplicasToBrokers(brokerMetadatas.asJavaCollection, 
assignment.size, replicas.size))
       proposedAssignments ++= assignedReplicas.map { case (partition, 
replicas) =>
         new TopicPartition(topic, partition) -> replicas
       }
@@ -688,12 +689,12 @@ object ReassignPartitionsCommand extends Logging {
       filter(node => brokerSet.contains(node.id)).
       map {
         node => if (enableRackAwareness && node.rack != null) {
-          BrokerMetadata(node.id, Some(node.rack))
+          new BrokerMetadata(node.id, Optional.of(node.rack))
         } else {
-          BrokerMetadata(node.id, None)
+          new BrokerMetadata(node.id, Optional.empty())
         }
       }.toSeq
-    val numRackless = results.count(_.rack.isEmpty)
+    val numRackless = results.count(!_.rack.isPresent)
     if (enableRackAwareness && numRackless != 0 && numRackless != 
results.size) {
       throw new AdminOperationException("Not all brokers have rack 
information. Add " +
         "--disable-rack-aware in command line to make replica assignment 
without rack " +
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5d6233d1100..15c4e356bdc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.admin.AdminUtils
 import kafka.api.ElectLeadersRequestOps
 import kafka.controller.ReplicaAssignment
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
@@ -26,6 +25,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, 
UnboundedQuota}
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.Implicits._
 import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.admin.AdminUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common.acl.AclOperation._
@@ -668,7 +668,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorizedRequestInfo.isEmpty)
       sendResponseCallback(Map.empty)
     else {
-      val internalTopicsAllowed = request.header.clientId == 
AdminUtils.AdminClientId
+      val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
 
       val transactionStatePartition =
         if (produceRequest.transactionalId() == null)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index ecd64c17b91..2e60ce2ba7b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,8 +17,8 @@
 
 package kafka.server
 
-import kafka.admin.BrokerMetadata
 import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala 
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 00ef867de9c..90fbd7f3996 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import java.util
 import java.util.Properties
-import kafka.admin.AdminUtils
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, 
toLoggableProps}
 import kafka.server.DynamicConfig.QuotaConfigs
@@ -26,6 +25,7 @@ import kafka.server.metadata.ZkConfigRepository
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.admin.AdminUtils
 import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource}
@@ -184,8 +184,8 @@ class ZkAdminManager(val config: KafkaConfig,
           defaultReplicationFactor else topic.replicationFactor
 
         val assignments = if (topic.assignments.isEmpty) {
-          AdminUtils.assignReplicasToBrokers(
-            brokers, resolvedNumPartitions, resolvedReplicationFactor)
+          
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(
+            brokers.asJavaCollection, resolvedNumPartitions, 
resolvedReplicationFactor))
         } else {
           val assignments = new mutable.HashMap[Int, Seq[Int]]
           // Note: we don't check that replicaAssignment contains unknown 
brokers - unlike in add-partitions case,
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 5e3ed11022b..559ccb2e917 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -20,6 +20,7 @@ package kafka.server.metadata
 import kafka.controller.StateChangeLogger
 import kafka.server.{CachedControllerId, KRaftCachedControllerId, 
MetadataCache}
 import kafka.utils.Logging
+import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
@@ -32,7 +33,6 @@ import org.apache.kafka.image.MetadataImage
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent.ThreadLocalRandom
-import kafka.admin.BrokerMetadata
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, 
DescribeClientQuotasResponseData}
 import 
org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, 
DescribeUserScramCredentialsResponseData}
@@ -215,7 +215,7 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 
   private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] 
= {
     image.cluster().brokers().values().asScala.filterNot(_.fenced()).
-      map(b => BrokerMetadata(b.id, b.rack.asScala))
+      map(b => new BrokerMetadata(b.id, b.rack))
   }
 
   override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 9159b791880..302d3fbf8de 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -18,10 +18,8 @@
 package kafka.server.metadata
 
 import java.util
-import java.util.Collections
+import java.util.{Collections, Optional}
 import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import kafka.admin.BrokerMetadata
-
 import scala.collection.{Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
 import kafka.cluster.{Broker, EndPoint}
@@ -31,6 +29,7 @@ import kafka.server.{BrokerFeatures, CachedControllerId, 
KRaftCachedControllerId
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import kafka.utils.Implicits._
+import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
@@ -254,7 +253,7 @@ class ZkMetadataCache(
   override def hasAliveBroker(brokerId: Int): Boolean = 
metadataSnapshot.aliveBrokers.contains(brokerId)
 
   override def getAliveBrokers(): Iterable[BrokerMetadata] = {
-    metadataSnapshot.aliveBrokers.values.map(b => new BrokerMetadata(b.id, 
b.rack))
+    metadataSnapshot.aliveBrokers.values.map(b => new BrokerMetadata(b.id, 
Optional.ofNullable(b.rack.orNull)))
   }
 
   override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 149d2a78374..8b960806042 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -34,7 +34,9 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 import org.slf4j.event.Level
 
+import java.util
 import scala.annotation.nowarn
+import scala.jdk.CollectionConverters._
 
 /**
  * General helper functions!
@@ -315,4 +317,7 @@ object CoreUtils {
     elements.groupMapReduce(key)(f)(reduce)
   }
 
+  def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, 
util.List[Integer]]): Map[Int, Seq[Int]] = {
+    map.asScala.map(e => (e._1.asInstanceOf[Int], 
e._2.asScala.map(_.asInstanceOf[Int])))
+  }
 }
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 07ca35cc9ae..d16394cab05 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -16,13 +16,14 @@
 */
 package kafka.zk
 
-import java.util.Properties
-import kafka.admin.{AdminUtils, BrokerMetadata, RackAwareMode}
+import java.util.{Optional, Properties}
+import kafka.admin.RackAwareMode
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.controller.ReplicaAssignment
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils._
 import kafka.utils.Implicits._
+import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
@@ -30,6 +31,7 @@ import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.zookeeper.KeeperException.NodeExistsException
 
+import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq}
 
 /**
@@ -55,8 +57,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
                   topicConfig: Properties = new Properties,
                   rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
                   usesTopicId: Boolean = false): Unit = {
-    val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
-    val replicaAssignment = 
AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, 
replicationFactor)
+    val brokerMetadatas = getBrokerMetadatas(rackAwareMode).asJava
+    val replicaAssignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokerMetadatas,
 partitions, replicationFactor))
     createTopicWithAssignment(topic, topicConfig, replicaAssignment, 
usesTopicId = usesTopicId)
   }
 
@@ -77,10 +79,10 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
         " to make replica assignment without rack information.")
     }
     val brokerMetadatas = rackAwareMode match {
-      case RackAwareMode.Disabled => brokers.map(broker => 
BrokerMetadata(broker.id, None))
+      case RackAwareMode.Disabled => brokers.map(broker => new 
BrokerMetadata(broker.id, Optional.empty()))
       case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
-        brokers.map(broker => BrokerMetadata(broker.id, None))
-      case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+        brokers.map(broker => new BrokerMetadata(broker.id, Optional.empty()))
+      case _ => brokers.map(broker => new BrokerMetadata(broker.id, 
Optional.ofNullable(broker.rack.orNull)))
     }
     brokerMetadatas.sortBy(_.id)
   }
@@ -268,8 +270,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
 
     val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
       val startIndex = math.max(0, allBrokers.indexWhere(_.id >= 
existingAssignmentPartition0.head))
-      AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, 
existingAssignmentPartition0.size,
-        startIndex, existingAssignment.size)
+      
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(allBrokers.asJava,
 partitionsToAdd, existingAssignmentPartition0.size,
+        startIndex, existingAssignment.size))
     }
 
     proposedAssignmentForNewPartitions.map { case (tp, replicas) =>
diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
index d2d665bef8a..9a3aaa31bc2 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
@@ -16,23 +16,26 @@
  */
 package kafka.admin
 
-import kafka.utils.Logging
+import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
 import org.apache.kafka.common.errors.InvalidReplicationFactorException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
+import java.util.Optional
 import scala.collection.Map
+import scala.jdk.CollectionConverters._
 
 class AdminRackAwareTest extends RackAwareTest with Logging {
 
   @Test
   def testGetRackAlternatedBrokerListAndAssignReplicasToBrokers(): Unit = {
     val rackMap = Map(0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 
4 -> "rack2", 5 -> "rack1")
-    val newList = AdminUtils.getRackAlternatedBrokerList(rackMap)
-    assertEquals(List(0, 3, 1, 5, 4, 2), newList)
-    val anotherList = AdminUtils.getRackAlternatedBrokerList(rackMap.toMap - 5)
-    assertEquals(List(0, 3, 1, 4, 2), anotherList)
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0)
+    val newList = AdminUtils.getRackAlternatedBrokerList(rackMap.map(e => 
(e._1.asInstanceOf[Integer], e._2)).asJava)
+    assertEquals(List(0, 3, 1, 5, 4, 2), newList.asScala.toList)
+    val anotherList = AdminUtils.getRackAlternatedBrokerList((rackMap.toMap - 
5).map(e => (e._1.asInstanceOf[Integer], e._2)).asJava)
+    assertEquals(List(0, 3, 1, 4, 2), anotherList.asScala.toList)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap),
 7, 3, 0, 0))
     val expected = Map(0 -> List(0, 3, 1),
                        1 -> List(3, 1, 5),
                        2 -> List(1, 5, 4),
@@ -48,8 +51,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack1")
     val numPartitions = 6
     val replicationFactor = 3
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor, 2, 0)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor, 2, 0))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor)
   }
@@ -59,8 +62,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack1")
     val numPartitions = 6
     val replicationFactor = 3
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor)
   }
@@ -70,8 +73,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack1")
     val numPartitions = 13
     val replicationFactor = 3
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor, 0, 0)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor, 0, 0))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor, verifyLeaderDistribution = false, 
verifyReplicasDistribution = false)
   }
@@ -81,8 +84,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack1")
     val numPartitions = 12
     val replicationFactor = 3
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor, verifyReplicasDistribution = false)
   }
@@ -92,8 +95,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack1")
     val numPartitions = 12
     val replicationFactor = 2
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor)
   }
@@ -103,8 +106,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
     val brokerRackMapping = Map(6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> 
"rack3", 10 -> "rack3", 11 -> "rack1")
     val numPartitions = 12
     val replicationFactor = 2
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor, startPartitionId = 12)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor, -1, 12))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor)
   }
@@ -114,8 +117,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack1")
     val numPartitions = 6
     val replicationFactor = 2
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor)
   }
@@ -125,7 +128,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 4 -> "rack3")
     val numPartitions = 3
     val replicationFactor = 2
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions, replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions, replicationFactor))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions, replicationFactor)
   }
 
@@ -135,8 +138,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
     val replicationFactor = 3
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack1",
       6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack1", 
11 -> "rack3")
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor))
     checkReplicaDistribution(assignment, brokerRackMapping, 
brokerRackMapping.size, numPartitions,
       replicationFactor)
   }
@@ -146,7 +149,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
     val numPartitions = 6
     val replicationFactor = 5
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack2")
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions, replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions, replicationFactor))
     assertEquals(List.fill(assignment.size)(replicationFactor), 
assignment.values.toIndexedSeq.map(_.size))
     val distribution = getReplicaDistribution(assignment, brokerRackMapping)
     for (partition <- 0 until numPartitions)
@@ -158,8 +161,8 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
     val numPartitions = 6
     val replicationFactor = 2
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> 
"rack3", 4 -> "rack3", 5 -> "rack2")
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions,
-      replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions,
+      replicationFactor))
     assertEquals(List.fill(assignment.size)(replicationFactor), 
assignment.values.toIndexedSeq.map(_.size))
     val distribution = getReplicaDistribution(assignment, brokerRackMapping)
     for (partition <- 0 to 5)
@@ -171,7 +174,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
     val numPartitions = 6
     val replicationFactor = 3
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> 
"rack1", 4 -> "rack1", 5 -> "rack1")
-    val assignment = 
AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), 
numPartitions, replicationFactor)
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping),
 numPartitions, replicationFactor))
     assertEquals(List.fill(assignment.size)(replicationFactor), 
assignment.values.toIndexedSeq.map(_.size))
     val distribution = getReplicaDistribution(assignment, brokerRackMapping)
     for (partition <- 0 until numPartitions)
@@ -187,16 +190,16 @@ class AdminRackAwareTest extends RackAwareTest with 
Logging {
     val numPartitions = 6
     val replicationFactor = 4
     val brokerMetadatas = toBrokerMetadata(rackInfo)
-    assertEquals(brokerList, brokerMetadatas.map(_.id))
-    val assignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 
numPartitions, replicationFactor,
-      fixedStartIndex = 2)
+    assertEquals(brokerList, brokerMetadatas.asScala.map(_.id))
+    val assignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokerMetadatas,
 numPartitions, replicationFactor,
+      2, -1))
     checkReplicaDistribution(assignment, rackInfo, 5, 6, 4,
       verifyRackAware = false, verifyLeaderDistribution = false, 
verifyReplicasDistribution = false)
   }
 
   @Test
   def testReplicaAssignment(): Unit = {
-    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
+    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, 
Optional.empty())).asJava
 
     // test 0 replication factor
     assertThrows(classOf[InvalidReplicationFactorException],
@@ -219,7 +222,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging 
{
         8 -> List(3, 0, 1),
         9 -> List(4, 1, 2))
 
-    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 
10, 3, 0)
+    val actualAssignment = 
CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokerMetadatas,
 10, 3, 0, -1))
     assertEquals(expectedAssignment, actualAssignment)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala 
b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
index 6ce4f7ba55d..17ce08a39bf 100644
--- a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
@@ -16,9 +16,15 @@
  */
 package kafka.admin
 
+import org.apache.kafka.admin.BrokerMetadata
+
 import scala.collection.{Map, Seq, mutable}
 import org.junit.jupiter.api.Assertions._
 
+import java.util
+import java.util.Optional
+import scala.jdk.CollectionConverters._
+
 trait RackAwareTest {
 
   def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
@@ -73,13 +79,16 @@ trait RackAwareTest {
     ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
   }
 
-  def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] 
= Seq.empty): Seq[BrokerMetadata] =
-    rackMap.toSeq.map { case (brokerId, rack) =>
-      BrokerMetadata(brokerId, Some(rack))
+  def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] 
= Seq.empty): util.Collection[BrokerMetadata] = {
+    val res = rackMap.toSeq.map { case (brokerId, rack) =>
+      new BrokerMetadata(brokerId, Optional.of(rack))
     } ++ brokersWithoutRack.map { brokerId =>
-      BrokerMetadata(brokerId, None)
+      new BrokerMetadata(brokerId, Optional.empty())
     }.sortBy(_.id)
 
+    res.asJavaCollection
+  }
+
 }
 
 case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], 
brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
index fab7907e17b..7aa0f8eec23 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
@@ -18,9 +18,10 @@
 package kafka.admin
 
 import java.util.concurrent.ExecutionException
-import java.util.{Arrays, Collections}
+import java.util.{Arrays, Collections, Optional}
 import kafka.admin.ReassignPartitionsCommand._
 import kafka.utils.Exit
+import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.clients.admin.{Config, MockAdminClient, 
PartitionReassignment}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{InvalidReplicationFactorException, 
UnknownTopicOrPartitionException}
@@ -230,19 +231,19 @@ class ReassignPartitionsUnitTest {
       build()
     try {
       assertEquals(Seq(
-        BrokerMetadata(0, Some("rack0")),
-        BrokerMetadata(1, Some("rack1"))
+        new BrokerMetadata(0, Optional.of("rack0")),
+        new BrokerMetadata(1, Optional.of("rack1"))
       ), getBrokerMetadata(adminClient, Seq(0, 1), true))
       assertEquals(Seq(
-        BrokerMetadata(0, None),
-        BrokerMetadata(1, None)
+        new BrokerMetadata(0, Optional.empty()),
+        new BrokerMetadata(1, Optional.empty())
       ), getBrokerMetadata(adminClient, Seq(0, 1), false))
       assertStartsWith("Not all brokers have rack information",
         assertThrows(classOf[AdminOperationException],
           () => getBrokerMetadata(adminClient, Seq(1, 2), true)).getMessage)
       assertEquals(Seq(
-        BrokerMetadata(1, None),
-        BrokerMetadata(2, None)
+        new BrokerMetadata(1, Optional.empty()),
+        new BrokerMetadata(2, Optional.empty())
       ), getBrokerMetadata(adminClient, Seq(1, 2), false))
     } finally {
       adminClient.close()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 15ccb7d339b..08cfffdb784 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -41,6 +41,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
 import kafka.utils.Implicits._
 import kafka.zk._
+import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.clients.{ClientResponse, CommonClientConfigs}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
@@ -897,14 +898,14 @@ object TestUtils extends Logging {
   }
 
   def createBrokersInZk(zkClient: KafkaZkClient, ids: Seq[Int]): Seq[Broker] =
-    createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkClient)
+    createBrokersInZk(ids.map(new BrokerMetadata(_, Optional.empty())), 
zkClient)
 
-  def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], 
zkClient: KafkaZkClient): Seq[Broker] = {
+  def createBrokersInZk(brokerMetadatas: Seq[BrokerMetadata], zkClient: 
KafkaZkClient): Seq[Broker] = {
     zkClient.makeSurePersistentPathExists(BrokerIdsZNode.path)
     val brokers = brokerMetadatas.map { b =>
       val protocol = SecurityProtocol.PLAINTEXT
       val listenerName = ListenerName.forSecurityProtocol(protocol)
-      Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), 
b.rack)
+      Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), 
if (b.rack.isPresent) Some(b.rack.get()) else None)
     }
     brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id, 
b.endPoints, rack = b.rack),
       MetadataVersion.latest, jmxPort = -1)))
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index ba0a2583598..e3171647ff1 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -17,7 +17,7 @@
 package kafka.admin
 
 import java.util
-import java.util.Properties
+import java.util.{Optional, Properties}
 import kafka.controller.ReplicaAssignment
 import kafka.server.DynamicConfig.Broker._
 import kafka.server.KafkaConfig._
@@ -336,22 +336,22 @@ class AdminZkClientTest extends QuorumTestHarness with 
Logging with RackAwareTes
     val brokerList = 0 to 5
     val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 
5 -> "rack3")
     val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = 
brokerList.filterNot(rackInfo.keySet))
-    TestUtils.createBrokersInZk(brokerMetadatas, zkClient)
+    TestUtils.createBrokersInZk(brokerMetadatas.asScala.toSeq, zkClient)
 
     val processedMetadatas1 = 
adminZkClient.getBrokerMetadatas(RackAwareMode.Disabled)
     assertEquals(brokerList, processedMetadatas1.map(_.id))
-    assertEquals(List.fill(brokerList.size)(None), 
processedMetadatas1.map(_.rack))
+    assertEquals(List.fill(brokerList.size)(Optional.empty()), 
processedMetadatas1.map(_.rack))
 
     val processedMetadatas2 = 
adminZkClient.getBrokerMetadatas(RackAwareMode.Safe)
     assertEquals(brokerList, processedMetadatas2.map(_.id))
-    assertEquals(List.fill(brokerList.size)(None), 
processedMetadatas2.map(_.rack))
+    assertEquals(List.fill(brokerList.size)(Optional.empty()), 
processedMetadatas2.map(_.rack))
 
     assertThrows(classOf[AdminOperationException], () => 
adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced))
 
     val partialList = List(0, 1, 2, 3, 5)
     val processedMetadatas3 = 
adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced, Some(partialList))
     assertEquals(partialList, processedMetadatas3.map(_.id))
-    assertEquals(partialList.map(rackInfo), 
processedMetadatas3.flatMap(_.rack))
+    assertEquals(partialList.map(rackInfo), 
processedMetadatas3.map(_.rack.get()))
 
     val numPartitions = 3
     adminZkClient.createTopic("foo", numPartitions, 2, rackAwareMode = 
RackAwareMode.Safe)
diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java 
b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
new file mode 100644
index 00000000000..9504954b76e
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.admin;
+
+import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.server.common.AdminOperationException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class AdminUtils {
+    static final Random RAND = new Random();
+
+    public static final String ADMIN_CLIENT_ID = "__admin_client";
+
+    public static Map<Integer, List<Integer>> 
assignReplicasToBrokers(Collection<BrokerMetadata> brokerMetadatas,
+                                                                      int 
nPartitions,
+                                                                      int 
replicationFactor) {
+        return assignReplicasToBrokers(brokerMetadatas, nPartitions, 
replicationFactor, -1, -1);
+    }
+
+    /**
+     * There are 3 goals of replica assignment:
+     *
+     * <ol>
+     * <li> Spread the replicas evenly among brokers.</li>
+     * <li> For partitions assigned to a particular broker, their other 
replicas are spread over the other brokers.</li>
+     * <li> If all brokers have rack information, assign the replicas for each 
partition to different racks if possible</li>
+     * </ol>
+     *
+     * To achieve this goal for replica assignment without considering racks, 
we:
+     * <ol>
+     * <li> Assign the first replica of each partition by round-robin, 
starting from a random position in the broker list.</li>
+     * <li> Assign the remaining replicas of each partition with an increasing 
shift.</li>
+     * </ol>
+     *
+     * Here is an example of assigning
+     * <table cellpadding="2" cellspacing="2">
+     * 
<tr><th>broker-0</th><th>broker-1</th><th>broker-2</th><th>broker-3</th><th>broker-4</th><th>&nbsp;</th></tr>
+     * <tr><td>p0      </td><td>p1      </td><td>p2      </td><td>p3      
</td><td>p4      </td><td>(1st replica)</td></tr>
+     * <tr><td>p5      </td><td>p6      </td><td>p7      </td><td>p8      
</td><td>p9      </td><td>(1st replica)</td></tr>
+     * <tr><td>p4      </td><td>p0      </td><td>p1      </td><td>p2      
</td><td>p3      </td><td>(2nd replica)</td></tr>
+     * <tr><td>p8      </td><td>p9      </td><td>p5      </td><td>p6      
</td><td>p7      </td><td>(2nd replica)</td></tr>
+     * <tr><td>p3      </td><td>p4      </td><td>p0      </td><td>p1      
</td><td>p2      </td><td>(3nd replica)</td></tr>
+     * <tr><td>p7      </td><td>p8      </td><td>p9      </td><td>p5      
</td><td>p6      </td><td>(3nd replica)</td></tr>
+     * </table>
+     *
+     * <p>
+     * To create rack aware assignment, this API will first create a rack 
alternated broker list. For example,
+     * from this brokerID -> rack mapping:</p>
+     * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 
-> "rack1"
+     * <br><br>
+     * <p>
+     * The rack alternated list will be:
+     * </p>
+     * 0, 3, 1, 5, 4, 2
+     * <br><br>
+     * <p>
+     * Then an easy round-robin assignment can be applied. Assume 6 partitions 
with replication factor of 3, the assignment
+     * will be:
+     * </p>
+     * 0 -> 0,3,1 <br>
+     * 1 -> 3,1,5 <br>
+     * 2 -> 1,5,4 <br>
+     * 3 -> 5,4,2 <br>
+     * 4 -> 4,2,0 <br>
+     * 5 -> 2,0,3 <br>
+     * <br>
+     * <p>
+     * Once it has completed the first round-robin, if there are more 
partitions to assign, the algorithm will start
+     * shifting the followers. This is to ensure we will not always get the 
same set of sequences.
+     * In this case, if there is another partition to assign (partition #6), 
the assignment will be:
+     * </p>
+     * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
+     * <br><br>
+     * <p>
+     * The rack aware assignment always chooses the 1st replica of the 
partition using round robin on the rack alternated
+     * broker list. For rest of the replicas, it will be biased towards 
brokers on racks that do not have
+     * any replica assignment, until every rack has a replica. Then the 
assignment will go back to round-robin on
+     * the broker list.
+     * </p>
+     * <br>
+     * <p>
+     * As the result, if the number of replicas is equal to or greater than 
the number of racks, it will ensure that
+     * each rack will get at least one replica. Otherwise, each rack will get 
at most one replica. In a perfect
+     * situation where the number of replicas is the same as the number of 
racks and each rack has the same number of
+     * brokers, it guarantees that the replica distribution is even across 
brokers and racks.
+     * </p>
+     * @return a Map from partition id to replica ids
+     * @throws AdminOperationException If rack information is supplied but it 
is incomplete, or if it is not possible to
+     *                                 assign each replica to a unique rack.
+     *
+     */
+    public static Map<Integer, List<Integer>> 
assignReplicasToBrokers(Collection<BrokerMetadata> brokerMetadatas,
+                                                                      int 
nPartitions,
+                                                                      int 
replicationFactor,
+                                                                      int 
fixedStartIndex,
+                                                                      int 
startPartitionId) {
+        if (nPartitions <= 0)
+            throw new InvalidPartitionsException("Number of partitions must be 
larger than 0.");
+        if (replicationFactor <= 0)
+            throw new InvalidReplicationFactorException("Replication factor 
must be larger than 0.");
+        if (replicationFactor > brokerMetadatas.size())
+            throw new InvalidReplicationFactorException("Replication factor: " 
+ replicationFactor + " larger than available brokers: " + 
brokerMetadatas.size() + ".");
+        if (brokerMetadatas.stream().noneMatch(b -> b.rack.isPresent()))
+            return assignReplicasToBrokersRackUnaware(nPartitions, 
replicationFactor, brokerMetadatas.stream().map(b -> 
b.id).collect(Collectors.toList()), fixedStartIndex,
+                startPartitionId);
+        else {
+            if (brokerMetadatas.stream().anyMatch(b -> !b.rack.isPresent()))
+                throw new AdminOperationException("Not all brokers have rack 
information for replica rack aware assignment.");
+            return assignReplicasToBrokersRackAware(nPartitions, 
replicationFactor, brokerMetadatas, fixedStartIndex,
+                startPartitionId);
+        }
+    }
+
+    private static Map<Integer, List<Integer>> 
assignReplicasToBrokersRackUnaware(int nPartitions,
+                                                                               
   int replicationFactor,
+                                                                               
   List<Integer> brokerList,
+                                                                               
   int fixedStartIndex,
+                                                                               
   int startPartitionId) {
+        Map<Integer, List<Integer>> ret = new HashMap<>();
+        int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : 
RAND.nextInt(brokerList.size());
+        int currentPartitionId = Math.max(0, startPartitionId);
+        int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : 
RAND.nextInt(brokerList.size());
+        for (int i = 0; i < nPartitions; i++) {
+            if (currentPartitionId > 0 && (currentPartitionId % 
brokerList.size() == 0))
+                nextReplicaShift += 1;
+            int firstReplicaIndex = (currentPartitionId + startIndex) % 
brokerList.size();
+            List<Integer> replicaBuffer = new ArrayList<>();
+            replicaBuffer.add(brokerList.get(firstReplicaIndex));
+            for (int j = 0; j < replicationFactor - 1; j++)
+                
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, 
nextReplicaShift, j, brokerList.size())));
+            ret.put(currentPartitionId, replicaBuffer);
+            currentPartitionId += 1;
+        }
+        return ret;
+    }
+
+    private static Map<Integer, List<Integer>> 
assignReplicasToBrokersRackAware(int nPartitions,
+                                                                               
 int replicationFactor,
+                                                                               
 Collection<BrokerMetadata> brokerMetadatas,
+                                                                               
 int fixedStartIndex,
+                                                                               
 int startPartitionId) {
+        Map<Integer, String> brokerRackMap = new HashMap<>();
+        brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get()));
+        int numRacks = new HashSet<>(brokerRackMap.values()).size();
+        List<Integer> arrangedBrokerList = 
getRackAlternatedBrokerList(brokerRackMap);
+        int numBrokers = arrangedBrokerList.size();
+        Map<Integer, List<Integer>> ret = new HashMap<>();
+        int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : 
RAND.nextInt(arrangedBrokerList.size());
+        int currentPartitionId = Math.max(0, startPartitionId);
+        int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : 
RAND.nextInt(arrangedBrokerList.size());
+        for (int i = 0; i < nPartitions; i++) {
+            if (currentPartitionId > 0 && (currentPartitionId % 
arrangedBrokerList.size() == 0))
+                nextReplicaShift += 1;
+            int firstReplicaIndex = (currentPartitionId + startIndex) % 
arrangedBrokerList.size();
+            int leader = arrangedBrokerList.get(firstReplicaIndex);
+            List<Integer> replicaBuffer = new ArrayList<>();
+            replicaBuffer.add(leader);
+            Set<String> racksWithReplicas = new HashSet<>();
+            racksWithReplicas.add(brokerRackMap.get(leader));
+            Set<Integer> brokersWithReplicas = new HashSet<>();
+            brokersWithReplicas.add(leader);
+            int k = 0;
+            for (int j = 0; j < replicationFactor - 1; j++) {
+                boolean done = false;
+                while (!done) {
+                    Integer broker = 
arrangedBrokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift * 
numRacks, k, arrangedBrokerList.size()));
+                    String rack = brokerRackMap.get(broker);
+                    // Skip this broker if
+                    // 1. there is already a broker in the same rack that has 
assigned a replica AND there is one or more racks
+                    //    that do not have any replica, or
+                    // 2. the broker has already assigned a replica AND there 
is one or more brokers that do not have replica assigned
+                    if ((!racksWithReplicas.contains(rack) || 
racksWithReplicas.size() == numRacks)
+                        && (!brokersWithReplicas.contains(broker) || 
brokersWithReplicas.size() == numBrokers)) {
+                        replicaBuffer.add(broker);
+                        racksWithReplicas.add(rack);
+                        brokersWithReplicas.add(broker);
+                        done = true;
+                    }
+                    k += 1;
+                }
+            }
+            ret.put(currentPartitionId, replicaBuffer);
+            currentPartitionId += 1;
+        }
+        return ret;
+    }
+
+    /**
+     * Given broker and rack information, returns a list of brokers alternated 
by the rack. Assume
+     * this is the rack and its brokers:
+     *
+     * rack1: 0, 1, 2
+     * rack2: 3, 4, 5
+     * rack3: 6, 7, 8
+     *
+     * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
+     *
+     * This is essential to make sure that the assignReplicasToBrokers API can 
use such list and
+     * assign replicas to brokers in a simple round-robin fashion, while 
ensuring an even
+     * distribution of leader and replica counts on each broker and that 
replicas are
+     * distributed to all racks.
+     */
+    public static List<Integer> getRackAlternatedBrokerList(Map<Integer, 
String> brokerRackMap) {
+        Map<String, Iterator<Integer>> brokersIteratorByRack = new HashMap<>();
+        getInverseMap(brokerRackMap).forEach((rack, brokers) -> 
brokersIteratorByRack.put(rack, brokers.iterator()));
+        String[] racks = brokersIteratorByRack.keySet().toArray(new String[0]);
+        Arrays.sort(racks);
+        List<Integer> result = new ArrayList<>();
+        int rackIndex = 0;
+        while (result.size() < brokerRackMap.size()) {
+            Iterator<Integer> rackIterator = 
brokersIteratorByRack.get(racks[rackIndex]);
+            if (rackIterator.hasNext())
+                result.add(rackIterator.next());
+            rackIndex = (rackIndex + 1) % racks.length;
+        }
+        return result;
+    }
+
+    static Map<String, List<Integer>> getInverseMap(Map<Integer, String> 
brokerRackMap) {
+        Map<String, List<Integer>> results = new HashMap<>();
+        brokerRackMap.forEach((id, rack) -> results.computeIfAbsent(rack, key 
-> new ArrayList<>()).add(id));
+        results.forEach((rack, rackAndIdList) -> 
rackAndIdList.sort(Integer::compareTo));
+        return results;
+    }
+
+    static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int 
replicaIndex, int nBrokers) {
+        long shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1);
+        return (int) ((firstReplicaIndex + shift) % nBrokers);
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java 
b/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
new file mode 100644
index 00000000000..98216fa0858
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Broker metadata used by admin tools.
+ */
+public class BrokerMetadata {
+    public final int id;
+
+    public final Optional<String> rack;
+
+    /**
+     * @param id an integer that uniquely identifies this broker
+     * @param rack the rack of the broker, which is used to in rack aware 
partition assignment for fault tolerance.
+     *             Examples: "RACK1", "us-east-1d"
+     */
+    public BrokerMetadata(int id, Optional<String> rack) {
+        this.id = id;
+        this.rack = rack;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        BrokerMetadata that = (BrokerMetadata) o;
+        return id == that.id && Objects.equals(rack, that.rack);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, rack);
+    }
+}

Reply via email to