This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 2c147c7c0f6 KAFKA-18583; Fix getPartitionReplicaEndpoints for KRaft 
(#18657)
2c147c7c0f6 is described below

commit 2c147c7c0f6d6d73d48737b5e6e00a1220f68391
Author: Dimitar Dimitrov <[email protected]>
AuthorDate: Wed Jan 22 09:39:18 2025 +0100

    KAFKA-18583; Fix getPartitionReplicaEndpoints for KRaft (#18657)
    
    The cherry-pick required reimplementing the accompanying test to work
    with UpdateMetadataRequest (removed in 4.0 and trunk) in order to also
    apply to `ZkMetadataCache`. If the removal of UpdateMetadataRequest is
    backported here as well, the test can be changed to match the trunk
    version.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/server/metadata/KRaftMetadataCache.scala |   2 +-
 .../unit/kafka/server/MetadataCacheTest.scala      | 109 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index a058c435e57..5aab667fc07 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -427,7 +427,7 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     val image = _currentImage
     val result = new mutable.HashMap[Int, Node]()
     Option(image.topics().getTopic(tp.topic())).foreach { topic =>
-      topic.partitions().values().forEach { partition =>
+      Option(topic.partitions().get(tp.partition())).foreach { partition =>
         partition.replicas.foreach { replicaId =>
           val broker = image.cluster().broker(replicaId)
           if (broker != null && !broker.fenced()) {
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 02cba057ba8..30b40d2e8ba 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -41,6 +41,7 @@ import org.junit.jupiter.params.provider.MethodSource
 import java.util
 import java.util.Arrays.asList
 import java.util.Collections
+import java.util.stream.Collectors
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 
@@ -645,6 +646,114 @@ class MetadataCacheTest {
     assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq)
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("cacheProvider"))
+  def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = {
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+
+    // Set up broker data for the metadata cache
+    val numBrokers = 10
+    // Set only the last broker in the list to be offline in order to allow 
easy
+    // indexing of brokers in the brokerStates list - the index in the list 
will
+    // be the same as the brokerId of the broker at that position.
+    val offlineBrokerId = numBrokers - 1
+    val brokerStates = (0 until numBrokers - 1).map { brokerId =>
+      new UpdateMetadataBroker()
+        .setId(brokerId)
+        .setRack("rack" + (brokerId % 3))
+        .setEndpoints(
+          Seq(new UpdateMetadataEndpoint()
+            .setHost("foo" + brokerId)
+            .setPort(9092)
+            .setSecurityProtocol(securityProtocol.id)
+            .setListener(listenerName.value)
+          ).asJava)
+    }
+
+    val topic = "many-partitions-topic"
+    val topicId = Uuid.randomUuid()
+
+    // Set up a number of partitions such that each different combination of
+    // $replicationFactor brokers is made a replica set for exactly one 
partition
+    val replicationFactor = 3
+    val replicaSets = getAllReplicaSets(numBrokers, replicationFactor)
+    val numPartitions = replicaSets.length
+    val partitionStates = (0 until numPartitions).map { partitionId =>
+      val replicas = replicaSets(partitionId)
+      val onlineReplicas = replicas.stream().filter(id => id != 
offlineBrokerId).collect(Collectors.toList())
+      new UpdateMetadataPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(partitionId)
+        .setReplicas(replicas)
+        .setLeader(onlineReplicas.get(0))
+        .setIsr(onlineReplicas)
+        .setOfflineReplicas(Collections.singletonList(offlineBrokerId))
+    }
+
+    // Load the prepared data in the metadata cache
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
+    val controllerId = 0
+    val controllerEpoch = 123
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+      version,
+      controllerId,
+      controllerEpoch,
+      brokerEpoch,
+      partitionStates.asJava,
+      brokerStates.asJava,
+      Collections.singletonMap(topic, topicId)).build()
+    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+
+    (0 until numPartitions).foreach { partitionId =>
+      val tp = new TopicPartition(topic, partitionId)
+      val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, 
listenerName)
+      val replicaSet = brokerIdToNodeMap.keySet
+      val expectedReplicaSet = 
partitionStates(partitionId).replicas().asScala.toSet
+      // Verify that we have endpoints for exactly the non-fenced brokers of 
the replica set
+      if (expectedReplicaSet.contains(offlineBrokerId)) {
+        assertEquals(expectedReplicaSet,
+                     replicaSet + offlineBrokerId,
+                     s"Unexpected partial replica set for partition 
$partitionId")
+      } else {
+        assertEquals(expectedReplicaSet,
+                     replicaSet,
+                     s"Unexpected replica set for partition $partitionId")
+      }
+      // Verify that the endpoint data for each non-fenced replica is as 
expected
+      replicaSet.foreach { brokerId =>
+        val brokerNode =
+          brokerIdToNodeMap.getOrElse(
+            brokerId, fail(s"No brokerNode for broker $brokerId and partition 
$partitionId"))
+        val expectedBroker = brokerStates(brokerId)
+        val expectedEndpoint = expectedBroker.endpoints().get(0)
+        assertEquals(securityProtocol.id, expectedEndpoint.securityProtocol())
+        assertEquals(listenerName.value(), expectedEndpoint.listener())
+        assertEquals(expectedEndpoint.host(),
+                     brokerNode.host(),
+                     s"Unexpected host for broker $brokerId and partition 
$partitionId")
+        assertEquals(expectedEndpoint.port(),
+                     brokerNode.port(),
+                     s"Unexpected port for broker $brokerId and partition 
$partitionId")
+        assertEquals(expectedBroker.rack(),
+                     brokerNode.rack(),
+                     s"Unexpected rack for broker $brokerId and partition 
$partitionId")
+      }
+    }
+
+    val tp = new TopicPartition(topic, numPartitions)
+    val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, 
listenerName)
+    assertTrue(brokerIdToNodeMap.isEmpty)
+  }
+
+  private def getAllReplicaSets(numBrokers: Int,
+                                replicationFactor: Int): 
Array[util.List[Integer]] = {
+    (0 until numBrokers)
+      .combinations(replicationFactor)
+      .map(replicaSet => replicaSet.map(Integer.valueOf).toList.asJava)
+      .toArray
+  }
+
   @Test
   def testIsBrokerFenced(): Unit = {
     val metadataCache = MetadataCache.kRaftMetadataCache(0)

Reply via email to