This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9db58886097 MINOR: FindCoordinator API does not lookup partition for
share partition key correctly (#19273)
9db58886097 is described below
commit 9db5888609752db2c4138c4be78b47563e4d1198
Author: David Jacot <[email protected]>
AuthorDate: Mon Mar 24 19:43:23 2025 +0100
MINOR: FindCoordinator API does not lookup partition for share partition
key correctly (#19273)
This patch fixes another bug in the FindCoordinator API handling for
share partition key. `shareCoordinator.foreach` returns `Unit` so
`shareCoordinator.foreach(coordinator =>
coordinator.partitionFor(SharePartitionKey.getInstance(key)))` does not
return the partition for the key.
Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 31 +++++++++++++++++++++-
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a7ddfd8aee4..af851f3ed84 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1180,7 +1180,8 @@ class KafkaApis(val requestChannel: RequestChannel,
(txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)
case CoordinatorType.SHARE =>
- (shareCoordinator.foreach(coordinator =>
coordinator.partitionFor(SharePartitionKey.getInstance(key))),
SHARE_GROUP_STATE_TOPIC_NAME)
+ // We know that shareCoordinator is defined at this stage.
+
(shareCoordinator.get.partitionFor(SharePartitionKey.getInstance(key)),
SHARE_GROUP_STATE_TOPIC_NAME)
}
val topicMetadata =
metadataCache.getTopicMetadata(Set(internalTopicName).asJava,
request.context.listenerName, false, false).asScala
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index de111e74ff2..d71e5c88778 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
BROKER_LOGGER}
import org.apache.kafka.common.errors.{ClusterAuthorizationException,
UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.internals.Topic.SHARE_GROUP_STATE_TOPIC_NAME
import org.apache.kafka.common.memory.MemoryPool
import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction,
AddPartitionsToTxnTransactionCollection}
import
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
@@ -88,7 +89,7 @@ import org.apache.kafka.server.authorizer.{Action,
AuthorizationResult, Authoriz
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData}
+import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.quota.ThrottleCallback
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.share.context.{FinalContext,
ShareSessionContext}
@@ -789,6 +790,34 @@ class KafkaApisTest extends Logging {
assertEquals(expectedResponse, response.data)
}
+ @Test
+ def testFindCoordinatorWithValidSharePartitionKey(): Unit = {
+ addTopicToMetadataCache(SHARE_GROUP_STATE_TOPIC_NAME, 10, 3)
+ val key = SharePartitionKey.getInstance("foo", Uuid.randomUuid(), 10)
+
+ val request = new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.SHARE.id)
+ .setCoordinatorKeys(asList(key.asCoordinatorKey))
+
+ val requestChannelRequest = buildRequest(new
FindCoordinatorRequest.Builder(request).build())
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+
when(shareCoordinator.partitionFor(ArgumentMatchers.eq(key))).thenReturn(10)
+
+ val expectedResponse = new FindCoordinatorResponseData()
+ .setCoordinators(asList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setKey(key.asCoordinatorKey)
+ .setNodeId(0)
+ .setHost("broker0")
+ .setPort(9092)))
+
+ val response =
verifyNoThrottling[FindCoordinatorResponse](requestChannelRequest)
+ assertEquals(expectedResponse, response.data)
+ }
+
@Test
def testMetadataAutoTopicCreationForOffsetTopic(): Unit = {
testMetadataAutoTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME,
enableAutoTopicCreation = true,