This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 2c147c7c0f6 KAFKA-18583; Fix getPartitionReplicaEndpoints for KRaft
(#18657)
2c147c7c0f6 is described below
commit 2c147c7c0f6d6d73d48737b5e6e00a1220f68391
Author: Dimitar Dimitrov <[email protected]>
AuthorDate: Wed Jan 22 09:39:18 2025 +0100
KAFKA-18583; Fix getPartitionReplicaEndpoints for KRaft (#18657)
The cherry-pick required reimplementing the accompanying test to work
with UpdateMetadataRequest (removed in 4.0 and trunk) in order to also
apply to `ZkMetadataCache`. If the removal of UpdateMetadataRequest is
backported here as well, the test can be changed to match the trunk
version.
Reviewers: David Jacot <[email protected]>
---
.../kafka/server/metadata/KRaftMetadataCache.scala | 2 +-
.../unit/kafka/server/MetadataCacheTest.scala | 109 +++++++++++++++++++++
2 files changed, 110 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index a058c435e57..5aab667fc07 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -427,7 +427,7 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
val image = _currentImage
val result = new mutable.HashMap[Int, Node]()
Option(image.topics().getTopic(tp.topic())).foreach { topic =>
- topic.partitions().values().forEach { partition =>
+ Option(topic.partitions().get(tp.partition())).foreach { partition =>
partition.replicas.foreach { replicaId =>
val broker = image.cluster().broker(replicaId)
if (broker != null && !broker.fenced()) {
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 02cba057ba8..30b40d2e8ba 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -41,6 +41,7 @@ import org.junit.jupiter.params.provider.MethodSource
import java.util
import java.util.Arrays.asList
import java.util.Collections
+import java.util.stream.Collectors
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
@@ -645,6 +646,114 @@ class MetadataCacheTest {
assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq)
}
+ @ParameterizedTest
+ @MethodSource(Array("cacheProvider"))
+ def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = {
+ val securityProtocol = SecurityProtocol.PLAINTEXT
+ val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+
+ // Set up broker data for the metadata cache
+ val numBrokers = 10
+ // Set only the last broker in the list to be offline in order to allow
easy
+ // indexing of brokers in the brokerStates list - the index in the list
will
+ // be the same as the brokerId of the broker at that position.
+ val offlineBrokerId = numBrokers - 1
+ val brokerStates = (0 until numBrokers - 1).map { brokerId =>
+ new UpdateMetadataBroker()
+ .setId(brokerId)
+ .setRack("rack" + (brokerId % 3))
+ .setEndpoints(
+ Seq(new UpdateMetadataEndpoint()
+ .setHost("foo" + brokerId)
+ .setPort(9092)
+ .setSecurityProtocol(securityProtocol.id)
+ .setListener(listenerName.value)
+ ).asJava)
+ }
+
+ val topic = "many-partitions-topic"
+ val topicId = Uuid.randomUuid()
+
+ // Set up a number of partitions such that each different combination of
+ // $replicationFactor brokers is made a replica set for exactly one
partition
+ val replicationFactor = 3
+ val replicaSets = getAllReplicaSets(numBrokers, replicationFactor)
+ val numPartitions = replicaSets.length
+ val partitionStates = (0 until numPartitions).map { partitionId =>
+ val replicas = replicaSets(partitionId)
+ val onlineReplicas = replicas.stream().filter(id => id !=
offlineBrokerId).collect(Collectors.toList())
+ new UpdateMetadataPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(partitionId)
+ .setReplicas(replicas)
+ .setLeader(onlineReplicas.get(0))
+ .setIsr(onlineReplicas)
+ .setOfflineReplicas(Collections.singletonList(offlineBrokerId))
+ }
+
+ // Load the prepared data in the metadata cache
+ val version = ApiKeys.UPDATE_METADATA.latestVersion
+ val controllerId = 0
+ val controllerEpoch = 123
+ val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+ version,
+ controllerId,
+ controllerEpoch,
+ brokerEpoch,
+ partitionStates.asJava,
+ brokerStates.asJava,
+ Collections.singletonMap(topic, topicId)).build()
+ MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+
+ (0 until numPartitions).foreach { partitionId =>
+ val tp = new TopicPartition(topic, partitionId)
+ val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp,
listenerName)
+ val replicaSet = brokerIdToNodeMap.keySet
+ val expectedReplicaSet =
partitionStates(partitionId).replicas().asScala.toSet
+ // Verify that we have endpoints for exactly the non-fenced brokers of
the replica set
+ if (expectedReplicaSet.contains(offlineBrokerId)) {
+ assertEquals(expectedReplicaSet,
+ replicaSet + offlineBrokerId,
+ s"Unexpected partial replica set for partition
$partitionId")
+ } else {
+ assertEquals(expectedReplicaSet,
+ replicaSet,
+ s"Unexpected replica set for partition $partitionId")
+ }
+ // Verify that the endpoint data for each non-fenced replica is as
expected
+ replicaSet.foreach { brokerId =>
+ val brokerNode =
+ brokerIdToNodeMap.getOrElse(
+ brokerId, fail(s"No brokerNode for broker $brokerId and partition
$partitionId"))
+ val expectedBroker = brokerStates(brokerId)
+ val expectedEndpoint = expectedBroker.endpoints().get(0)
+ assertEquals(securityProtocol.id, expectedEndpoint.securityProtocol())
+ assertEquals(listenerName.value(), expectedEndpoint.listener())
+ assertEquals(expectedEndpoint.host(),
+ brokerNode.host(),
+ s"Unexpected host for broker $brokerId and partition
$partitionId")
+ assertEquals(expectedEndpoint.port(),
+ brokerNode.port(),
+ s"Unexpected port for broker $brokerId and partition
$partitionId")
+ assertEquals(expectedBroker.rack(),
+ brokerNode.rack(),
+ s"Unexpected rack for broker $brokerId and partition
$partitionId")
+ }
+ }
+
+ val tp = new TopicPartition(topic, numPartitions)
+ val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp,
listenerName)
+ assertTrue(brokerIdToNodeMap.isEmpty)
+ }
+
+ private def getAllReplicaSets(numBrokers: Int,
+ replicationFactor: Int):
Array[util.List[Integer]] = {
+ (0 until numBrokers)
+ .combinations(replicationFactor)
+ .map(replicaSet => replicaSet.map(Integer.valueOf).toList.asJava)
+ .toArray
+ }
+
@Test
def testIsBrokerFenced(): Unit = {
val metadataCache = MetadataCache.kRaftMetadataCache(0)