This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f28973bd19 KAFKA-18827: Initialize share state, share coordinator
impl. [1/N] (#18968)
4f28973bd19 is described below
commit 4f28973bd19cb94986415765f55cf5ab5bfae3e6
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Feb 22 21:42:08 2025 +0530
KAFKA-18827: Initialize share state, share coordinator impl. [1/N] (#18968)
In this PR, we have added the share coordinator and KafkaApis side impl
of the intialize share group state RPC.
ref:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-InitializeShareGroupStateAPI
Reviewers: Andrew Schofield <[email protected]>
---
.../requests/InitializeShareGroupStateRequest.java | 22 +-
.../InitializeShareGroupStateResponse.java | 70 +++-
core/src/main/scala/kafka/server/KafkaApis.scala | 30 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 121 +++++++
.../kafka/coordinator/share/ShareCoordinator.java | 10 +
.../coordinator/share/ShareCoordinatorService.java | 146 ++++++++-
.../coordinator/share/ShareCoordinatorShard.java | 205 ++++++++----
.../kafka/coordinator/share/ShareGroupOffset.java | 21 +-
.../share/ShareCoordinatorServiceTest.java | 361 +++++++++++++++++----
.../share/ShareCoordinatorShardTest.java | 181 +++++++++++
10 files changed, 1007 insertions(+), 160 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
index fc9abc71613..15d50acaa37 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
@@ -34,7 +34,7 @@ public class InitializeShareGroupStateRequest extends
AbstractRequest {
private final InitializeShareGroupStateRequestData data;
public Builder(InitializeShareGroupStateRequestData data) {
- this(data, false);
+ this(data, true);
}
public Builder(InitializeShareGroupStateRequestData data, boolean
enableUnstableLastVersion) {
@@ -64,15 +64,15 @@ public class InitializeShareGroupStateRequest extends
AbstractRequest {
public InitializeShareGroupStateResponse getErrorResponse(int
throttleTimeMs, Throwable e) {
List<InitializeShareGroupStateResponseData.InitializeStateResult>
results = new ArrayList<>();
data.topics().forEach(
- topicResult -> results.add(new
InitializeShareGroupStateResponseData.InitializeStateResult()
- .setTopicId(topicResult.topicId())
- .setPartitions(topicResult.partitions().stream()
- .map(partitionData -> new
InitializeShareGroupStateResponseData.PartitionResult()
-
.setPartition(partitionData.partition())
-
.setErrorCode(Errors.forException(e).code()))
- .collect(Collectors.toList()))));
+ topicResult -> results.add(new
InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicResult.topicId())
+ .setPartitions(topicResult.partitions().stream()
+ .map(partitionData -> new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionData.partition())
+ .setErrorCode(Errors.forException(e).code()))
+ .collect(Collectors.toList()))));
return new InitializeShareGroupStateResponse(new
InitializeShareGroupStateResponseData()
- .setResults(results));
+ .setResults(results));
}
@Override
@@ -82,8 +82,8 @@ public class InitializeShareGroupStateRequest extends
AbstractRequest {
public static InitializeShareGroupStateRequest parse(ByteBuffer buffer,
short version) {
return new InitializeShareGroupStateRequest(
- new InitializeShareGroupStateRequestData(new
ByteBufferAccessor(buffer), version),
- version
+ new InitializeShareGroupStateRequestData(new
ByteBufferAccessor(buffer), version),
+ version
);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
index 44880c2cb86..be88f0944d0 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
@@ -17,13 +17,17 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class InitializeShareGroupStateResponse extends AbstractResponse {
@@ -43,9 +47,9 @@ public class InitializeShareGroupStateResponse extends
AbstractResponse {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
- result -> result.partitions().forEach(
- partitionResult -> updateErrorCounts(counts,
Errors.forCode(partitionResult.errorCode()))
- )
+ result -> result.partitions().forEach(
+ partitionResult -> updateErrorCounts(counts,
Errors.forCode(partitionResult.errorCode()))
+ )
);
return counts;
}
@@ -62,7 +66,65 @@ public class InitializeShareGroupStateResponse extends
AbstractResponse {
public static InitializeShareGroupStateResponse parse(ByteBuffer buffer,
short version) {
return new InitializeShareGroupStateResponse(
- new InitializeShareGroupStateResponseData(new
ByteBufferAccessor(buffer), version)
+ new InitializeShareGroupStateResponseData(new
ByteBufferAccessor(buffer), version)
);
}
+
+ public static InitializeShareGroupStateResponseData
toGlobalErrorResponse(InitializeShareGroupStateRequestData request, Errors
error) {
+ List<InitializeShareGroupStateResponseData.InitializeStateResult>
initStateResults = new ArrayList<>();
+ request.topics().forEach(topicData -> {
+ List<InitializeShareGroupStateResponseData.PartitionResult>
partitionResults = new ArrayList<>();
+ topicData.partitions().forEach(partitionData ->
partitionResults.add(
+ toErrorResponsePartitionResult(partitionData.partition(),
error, error.message()))
+ );
+
initStateResults.add(toResponseInitializeStateResult(topicData.topicId(),
partitionResults));
+ });
+ return new
InitializeShareGroupStateResponseData().setResults(initStateResults);
+ }
+
+ public static InitializeShareGroupStateResponseData.PartitionResult
toErrorResponsePartitionResult(
+ int partitionId,
+ Errors error,
+ String errorMessage
+ ) {
+ return new InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionId)
+ .setErrorCode(error.code())
+ .setErrorMessage(errorMessage);
+ }
+
+ public static InitializeShareGroupStateResponseData.InitializeStateResult
toResponseInitializeStateResult(
+ Uuid topicId,
+ List<InitializeShareGroupStateResponseData.PartitionResult>
partitionResults
+ ) {
+ return new
InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId)
+ .setPartitions(partitionResults);
+ }
+
+ public static InitializeShareGroupStateResponseData
toErrorResponseData(Uuid topicId, int partitionId, Errors error, String
errorMessage) {
+ return new InitializeShareGroupStateResponseData().setResults(List.of(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionId)
+ .setErrorCode(error.code())
+ .setErrorMessage(errorMessage)))
+ ));
+ }
+
+ public static InitializeShareGroupStateResponseData.PartitionResult
toResponsePartitionResult(int partitionId) {
+ return new
InitializeShareGroupStateResponseData.PartitionResult().setPartition(partitionId);
+ }
+
+ public static InitializeShareGroupStateResponseData toResponseData(Uuid
topicId, int partitionId) {
+ return new InitializeShareGroupStateResponseData().setResults(List.of(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionId)
+ ))
+ ));
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7adf9c7c3cb..aa1f392506e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3100,11 +3100,33 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleInitializeShareGroupStateRequest(request: RequestChannel.Request):
Unit = {
+ def handleInitializeShareGroupStateRequest(request: RequestChannel.Request):
CompletableFuture[Unit] = {
val initializeShareGroupStateRequest =
request.body[InitializeShareGroupStateRequest]
- // TODO: Implement the InitializeShareGroupStateRequest handling
- requestHelper.sendMaybeThrottle(request,
initializeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+
+ if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
+ requestHelper.sendMaybeThrottle(request, new
InitializeShareGroupStateResponse(
+ InitializeShareGroupStateResponse.toGlobalErrorResponse(
+ initializeShareGroupStateRequest.data(),
+ Errors.CLUSTER_AUTHORIZATION_FAILED
+ )))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+
+ shareCoordinator match {
+ case None => requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs =>
+ initializeShareGroupStateRequest.getErrorResponse(requestThrottleMs,
+ new ApiException("Share coordinator is not enabled.")))
+ CompletableFuture.completedFuture[Unit](())
+
+ case Some(coordinator) => coordinator.initializeState(request.context,
initializeShareGroupStateRequest.data)
+ .handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
initializeShareGroupStateRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(request, new
InitializeShareGroupStateResponse(response))
+ }
+ }
+ }
}
def handleReadShareGroupStateRequest(request: RequestChannel.Request):
CompletableFuture[Unit] = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 0670bf0b36d..5f30cfd83e3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10879,6 +10879,98 @@ class KafkaApisTest extends Logging {
})
}
+ @Test
+ def testInitializeShareGroupStateSuccess(): Unit = {
+ val topicId = Uuid.randomUuid();
+ val initRequestData = new InitializeShareGroupStateRequestData()
+ .setGroupId("group1")
+ .setTopics(List(
+ new InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(1)
+ .setStateEpoch(0)
+ ).asJava)
+ ).asJava)
+
+ val initStateResultData:
util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(1)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)
+ ).asJava)
+ ).asJava
+
+ val config = Map(
+ ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+ )
+
+ val response = getInitializeShareGroupResponse(
+ initRequestData,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+ verifyNoErr = true,
+ null,
+ initStateResultData
+ )
+
+ assertNotNull(response.data)
+ assertEquals(1, response.data.results.size)
+ }
+
+ @Test
+ def testInitializeShareGroupStateAuthorizationFailed(): Unit = {
+ val topicId = Uuid.randomUuid();
+ val initRequestData = new InitializeShareGroupStateRequestData()
+ .setGroupId("group1")
+ .setTopics(List(
+ new InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(1)
+ .setStateEpoch(0)
+ ).asJava)
+ ).asJava)
+
+ val initStateResultData:
util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(1)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)
+ ).asJava)
+ ).asJava
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava,
Seq(AuthorizationResult.ALLOWED).asJava)
+
+ val config = Map(
+ ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+ )
+
+ val response = getInitializeShareGroupResponse(
+ initRequestData,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+ verifyNoErr = false,
+ authorizer,
+ initStateResultData
+ )
+
+ assertNotNull(response.data)
+ assertEquals(1, response.data.results.size)
+ response.data.results.forEach(deleteResult => {
+ assertEquals(1, deleteResult.partitions.size)
+ assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(),
deleteResult.partitions.get(0).errorCode())
+ })
+ }
+
def getShareGroupDescribeResponse(groupIds: util.List[String],
configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer:
Authorizer = null,
describedGroups:
util.List[ShareGroupDescribeResponseData.DescribedGroup]):
ShareGroupDescribeResponse = {
@@ -11024,4 +11116,33 @@ class KafkaApisTest extends Logging {
}
response
}
+
+ def getInitializeShareGroupResponse(requestData:
InitializeShareGroupStateRequestData, configOverrides: Map[String, String] =
Map.empty,
+ verifyNoErr: Boolean = true, authorizer:
Authorizer = null,
+ initStateResult:
util.List[InitializeShareGroupStateResponseData.InitializeStateResult]):
InitializeShareGroupStateResponse = {
+ val requestChannelRequest = buildRequest(new
InitializeShareGroupStateRequest.Builder(requestData, true).build())
+
+ val future = new CompletableFuture[InitializeShareGroupStateResponseData]()
+ when(shareCoordinator.initializeState(
+ any[RequestContext],
+ any[InitializeShareGroupStateRequestData]
+ )).thenReturn(future)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis(
+ overrideProperties = configOverrides,
+ authorizer = Option(authorizer),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
+
+ future.complete(new InitializeShareGroupStateResponseData()
+ .setResults(initStateResult))
+
+ val response =
verifyNoThrottling[InitializeShareGroupStateResponse](requestChannelRequest)
+ if (verifyNoErr) {
+ val expectedInitShareGroupStateResponseData = new
InitializeShareGroupStateResponseData()
+ .setResults(initStateResult)
+ assertEquals(expectedInitShareGroupStateResponseData, response.data)
+ }
+ response
+ }
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index 61e96e37f07..f04cb1385f0 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -96,6 +98,14 @@ public interface ShareCoordinator {
*/
CompletableFuture<DeleteShareGroupStateResponseData>
deleteState(RequestContext context, DeleteShareGroupStateRequestData request);
+ /**
+ * Handle initialize share group state call
+ * @param context - represents the incoming initialize share group request
context
+ * @param request - actual RPC request object
+ * @return completable future representing initialize share group RPC
response data
+ */
+ CompletableFuture<InitializeShareGroupStateResponseData>
initializeState(RequestContext context, InitializeShareGroupStateRequestData
request);
+
/**
* Called when new coordinator is elected
* @param partitionIndex - The partition index (internal topic)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 87071b3f97d..764e008136e 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -32,6 +34,7 @@ import
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.RequestContext;
@@ -70,10 +73,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
-import java.util.stream.Collectors;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
+@SuppressWarnings("ClassDataAbstractionCoupling")
public class ShareCoordinatorService implements ShareCoordinator {
private final ShareCoordinatorConfig config;
private final Logger log;
@@ -682,11 +685,11 @@ public class ShareCoordinatorService implements
ShareCoordinator {
(topicId, topicEntry) -> {
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults =
new ArrayList<>(topicEntry.size());
topicEntry.forEach(
- (partitionId, responseFuture) -> {
+ (partitionId, responseFut) -> {
// ResponseFut would already be completed by now
since we have used
// CompletableFuture::allOf to create a combined
future from the future map.
partitionResults.add(
-
responseFuture.getNow(null).results().get(0).partitions().get(0)
+
responseFut.getNow(null).results().get(0).partitions().get(0)
);
}
);
@@ -792,11 +795,11 @@ public class ShareCoordinatorService implements
ShareCoordinator {
(topicId, topicEntry) -> {
List<DeleteShareGroupStateResponseData.PartitionResult>
partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach(
- (partitionId, responseFuture) -> {
+ (partitionId, responseFut) -> {
// ResponseFut would already be completed by now
since we have used
// CompletableFuture::allOf to create a combined
future from the future map.
partitionResults.add(
-
responseFuture.getNow(null).results().get(0).partitions().get(0)
+
responseFut.getNow(null).results().get(0).partitions().get(0)
);
}
);
@@ -808,6 +811,106 @@ public class ShareCoordinatorService implements
ShareCoordinator {
});
}
+ @Override
+ public CompletableFuture<InitializeShareGroupStateResponseData>
initializeState(RequestContext context, InitializeShareGroupStateRequestData
request) {
+ // Send an empty response if the coordinator is not active.
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+ generateErrorInitStateResponse(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ "Share coordinator is not available."
+ )
+ );
+ }
+
+ String groupId = request.groupId();
+ // Send an empty response if groupId is invalid.
+ if (isGroupIdEmpty(groupId)) {
+ log.error("Group id must be specified and non-empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new InitializeShareGroupStateResponseData()
+ );
+ }
+
+ // Send an empty response if topic data is empty.
+ if (isEmpty(request.topics())) {
+ log.error("Topic Data is empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new InitializeShareGroupStateResponseData()
+ );
+ }
+
+ // A map to store the futures for each topicId and partition.
+ Map<Uuid, Map<Integer,
CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new
HashMap<>();
+
+ // The request received here could have multiple keys of structure
group:topic:partition. However,
+ // the initializeState method in ShareCoordinatorShard expects a
single key in the request. Hence, we will
+ // be looping over the keys below and constructing new
InitializeShareGroupStateRequestData objects to pass
+ // onto the shard method.
+
+ for (InitializeShareGroupStateRequestData.InitializeStateData
topicData : request.topics()) {
+ Uuid topicId = topicData.topicId();
+ for (InitializeShareGroupStateRequestData.PartitionData
partitionData : topicData.partitions()) {
+ SharePartitionKey coordinatorKey =
SharePartitionKey.getInstance(request.groupId(), topicId,
partitionData.partition());
+
+ InitializeShareGroupStateRequestData
requestForCurrentPartition = new InitializeShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId)
+ .setPartitions(List.of(partitionData))));
+
+ CompletableFuture<InitializeShareGroupStateResponseData>
initializeFuture = runtime.scheduleWriteOperation(
+ "initialize-share-group-state",
+ topicPartitionFor(coordinatorKey),
+ Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ coordinator ->
coordinator.initializeState(requestForCurrentPartition)
+ ).exceptionally(initializeException ->
+ handleOperationException(
+ "initialize-share-group-state",
+ request,
+ initializeException,
+ (error, message) ->
InitializeShareGroupStateResponse.toErrorResponseData(
+ topicData.topicId(),
+ partitionData.partition(),
+ error,
+ "Unable to initialize share group state: " +
initializeException.getMessage()
+ ),
+ log
+ ));
+
+ futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+ .put(partitionData.partition(), initializeFuture);
+ }
+ }
+
+ // Combine all futures into a single CompletableFuture<Void>.
+ CompletableFuture<Void> combinedFuture =
CompletableFuture.allOf(futureMap.values().stream()
+ .flatMap(map ->
map.values().stream()).toArray(CompletableFuture[]::new));
+
+ // Transform the combined CompletableFuture<Void> into
CompletableFuture<InitializeShareGroupStateResponseData>.
+ return combinedFuture.thenApply(v -> {
+ List<InitializeShareGroupStateResponseData.InitializeStateResult>
initializeStateResult = new ArrayList<>(futureMap.size());
+ futureMap.forEach(
+ (topicId, topicEntry) -> {
+
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults =
new ArrayList<>(topicEntry.size());
+ topicEntry.forEach(
+ (partitionId, responseFut) -> {
+ // ResponseFut would already be completed by now
since we have used
+ // CompletableFuture::allOf to create a combined
future from the future map.
+ partitionResults.add(
+
responseFut.getNow(null).results().get(0).partitions().get(0)
+ );
+ }
+ );
+
initializeStateResult.add(InitializeShareGroupStateResponse.toResponseInitializeStateResult(topicId,
partitionResults));
+ }
+ );
+ return new InitializeShareGroupStateResponseData()
+ .setResults(initializeStateResult);
+ });
+ }
+
private ReadShareGroupStateResponseData generateErrorReadStateResponse(
ReadShareGroupStateRequestData request,
Errors error,
@@ -820,9 +923,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData ->
ReadShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
- )).collect(Collectors.toList()));
+ )).toList());
return resultData;
- }).collect(Collectors.toList()));
+ }).toList());
}
private ReadShareGroupStateSummaryResponseData
generateErrorReadStateSummaryResponse(
@@ -837,9 +940,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData ->
ReadShareGroupStateSummaryResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
- )).collect(Collectors.toList()));
+ )).toList());
return resultData;
- }).collect(Collectors.toList()));
+ }).toList());
}
private WriteShareGroupStateResponseData generateErrorWriteStateResponse(
@@ -855,9 +958,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData ->
WriteShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
- )).collect(Collectors.toList()));
+ )).toList());
return resultData;
- }).collect(Collectors.toList()));
+ }).toList());
}
private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse(
@@ -872,9 +975,26 @@ public class ShareCoordinatorService implements
ShareCoordinator {
resultData.setPartitions(topicData.partitions().stream()
.map(partitionData ->
DeleteShareGroupStateResponse.toErrorResponsePartitionResult(
partitionData.partition(), error, errorMessage
- )).collect(Collectors.toList()));
+ )).toList());
+ return resultData;
+ }).toList());
+ }
+
+ private InitializeShareGroupStateResponseData
generateErrorInitStateResponse(
+ InitializeShareGroupStateRequestData request,
+ Errors error,
+ String errorMessage
+ ) {
+ return new
InitializeShareGroupStateResponseData().setResults(request.topics().stream()
+ .map(topicData -> {
+ InitializeShareGroupStateResponseData.InitializeStateResult
resultData = new InitializeShareGroupStateResponseData.InitializeStateResult();
+ resultData.setTopicId(topicData.topicId());
+ resultData.setPartitions(topicData.partitions().stream()
+ .map(partitionData ->
InitializeShareGroupStateResponse.toErrorResponsePartitionResult(
+ partitionData.partition(), error, errorMessage
+ )).toList());
return resultData;
- }).collect(Collectors.toList()));
+ }).toList());
}
private static boolean isGroupIdEmpty(String groupId) {
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index a71a911907a..1022a36fb65 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -31,6 +33,7 @@ import
org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.TransactionResult;
@@ -66,7 +69,6 @@ import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.stream.Collectors;
public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord> {
private final Logger log;
@@ -317,16 +319,12 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
CoordinatorRecord record = generateShareStateRecord(partitionData,
key);
// build successful response if record is correctly created
- WriteShareGroupStateResponseData responseData = new
WriteShareGroupStateResponseData()
- .setResults(
- Collections.singletonList(
-
WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
- Collections.singletonList(
-
WriteShareGroupStateResponse.toResponsePartitionResult(
- key.partition()
- ))
- ))
- );
+ WriteShareGroupStateResponseData responseData = new
WriteShareGroupStateResponseData().setResults(
+
List.of(WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
+ List.of(WriteShareGroupStateResponse.toResponsePartitionResult(
+ key.partition()))
+ ))
+ );
return new CoordinatorResult<>(Collections.singletonList(record),
responseData);
}
@@ -346,7 +344,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
// Only one key will be there in the request by design.
Optional<ReadShareGroupStateResponseData> error =
maybeGetReadStateError(request);
if (error.isPresent()) {
- return new CoordinatorResult<>(Collections.emptyList(),
error.get());
+ return new CoordinatorResult<>(List.of(), error.get());
}
ReadShareGroupStateRequestData.ReadStateData topicData =
request.topics().get(0);
@@ -366,7 +364,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
partitionId,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
- Collections.emptyList()
+ List.of()
);
} else {
// Leader epoch update might be needed
@@ -379,7 +377,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setLastOffset(stateBatch.lastOffset())
.setDeliveryState(stateBatch.deliveryState())
.setDeliveryCount(stateBatch.deliveryCount())
- ).collect(Collectors.toList()) : Collections.emptyList();
+ ).toList() : List.of();
responseData = ReadShareGroupStateResponse.toResponseData(
topicId,
@@ -393,7 +391,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
// Optimization in case leaderEpoch update is not required.
if (leaderEpoch == -1 ||
(leaderEpochMap.get(key) != null && leaderEpochMap.get(key) ==
leaderEpoch)) {
- return new CoordinatorResult<>(Collections.emptyList(),
responseData);
+ return new CoordinatorResult<>(List.of(), responseData);
}
// It is OK to info log this since this reaching this codepoint should
be quite infrequent.
@@ -406,7 +404,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
WriteShareGroupStateRequestData.PartitionData writePartitionData = new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(partitionId)
.setLeaderEpoch(leaderEpoch)
- .setStateBatches(Collections.emptyList())
+ .setStateBatches(List.of())
.setStartOffset(responseData.results().get(0).partitions().get(0).startOffset())
.setStateEpoch(responseData.results().get(0).partitions().get(0).stateEpoch());
@@ -471,7 +469,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
}
- return new CoordinatorResult<>(Collections.emptyList(), responseData);
+ return new CoordinatorResult<>(List.of(), responseData);
}
/**
@@ -482,7 +480,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
*/
public CoordinatorResult<Optional<Long>, CoordinatorRecord>
lastRedundantOffset() {
return new CoordinatorResult<>(
- Collections.emptyList(),
+ List.of(),
this.offsetsManager.lastRedundantOffset()
);
}
@@ -494,7 +492,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
* the request data which covers only key i.e. group1:topic1:partition1.
The implementation
* below was done keeping this in mind.
*
- * @param request - ReadShareGroupStateSummaryRequestData for a single key
+ * @param request - DeleteShareGroupStateRequestData for a single key
* @return CoordinatorResult(records, response)
*/
@@ -514,22 +512,53 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
CoordinatorRecord record = generateTombstoneRecord(key);
// build successful response if record is correctly created
- DeleteShareGroupStateResponseData responseData = new
DeleteShareGroupStateResponseData()
- .setResults(
- List.of(
-
DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
- List.of(
-
DeleteShareGroupStateResponse.toResponsePartitionResult(
- key.partition()
- )
- )
- )
- )
- );
+ DeleteShareGroupStateResponseData responseData = new
DeleteShareGroupStateResponseData().setResults(
+
List.of(DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
+
List.of(DeleteShareGroupStateResponse.toResponsePartitionResult(
+ key.partition()))
+ ))
+ );
return new CoordinatorResult<>(Collections.singletonList(record),
responseData);
}
+ /**
+ * This method writes a share snapshot records corresponding to the
requested topic partitions.
+ * <p>
+ * This method as called by the ShareCoordinatorService will be provided
with
+ * the request data which covers only key i.e. group1:topic1:partition1.
The implementation
+ * below was done keeping this in mind.
+ *
+ * @param request - InitializeShareGroupStateRequestData for a single key
+ * @return CoordinatorResult(records, response)
+ */
+
+ public CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord> initializeState(
+ InitializeShareGroupStateRequestData request
+ ) {
+ // Records to write (with both key and value of snapshot type),
response to caller
+ // only one key will be there in the request by design.
+ Optional<CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord>> error = maybeGetInitializeStateError(request);
+ if (error.isPresent()) {
+ return error.get();
+ }
+
+ InitializeShareGroupStateRequestData.InitializeStateData topicData =
request.topics().get(0);
+ InitializeShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
+ SharePartitionKey key =
SharePartitionKey.getInstance(request.groupId(), topicData.topicId(),
partitionData.partition());
+
+ CoordinatorRecord record =
generateInitializeStateRecord(partitionData, key);
+ // build successful response if record is correctly created
+ InitializeShareGroupStateResponseData responseData = new
InitializeShareGroupStateResponseData().setResults(
+
List.of(InitializeShareGroupStateResponse.toResponseInitializeStateResult(key.topicId(),
+
List.of(InitializeShareGroupStateResponse.toResponsePartitionResult(
+ key.partition()))
+ ))
+ );
+
+ return new CoordinatorResult<>(List.of(record), responseData);
+ }
+
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for
a key, based on various conditions.
* <p>
@@ -555,7 +584,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateEpoch(partitionData.stateEpoch())
- .setStateBatches(mergeBatches(Collections.emptyList(),
partitionData))
+ .setStateBatches(mergeBatches(List.of(), partitionData))
.build());
} else if (snapshotUpdateCount.getOrDefault(key, 0) >=
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
ShareGroupOffset currentState = shareStateMap.get(key); //
shareStateMap will have the entry as containsKey is true
@@ -587,7 +616,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setSnapshotEpoch(currentState.snapshotEpoch()) // Use
same snapshotEpoch as last share snapshot.
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
- .setStateBatches(mergeBatches(Collections.emptyList(),
partitionData))
+ .setStateBatches(mergeBatches(List.of(), partitionData))
.build());
}
}
@@ -600,6 +629,24 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
);
}
+ private CoordinatorRecord generateInitializeStateRecord(
+ InitializeShareGroupStateRequestData.PartitionData partitionData,
+ SharePartitionKey key
+ ) {
+ // We need to create a new share snapshot here, with
+ // appropriate state information. We will not be merging
+ // state here with previous snapshots as init state implies
+ // fresh start.
+
+ int snapshotEpoch = shareStateMap.containsKey(key) ?
shareStateMap.get(key).snapshotEpoch() + 1 : 0;
+ return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ key.groupId(),
+ key.topicId(),
+ key.partition(),
+ ShareGroupOffset.fromRequest(partitionData, snapshotEpoch)
+ );
+ }
+
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData) {
@@ -609,15 +656,13 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData,
- long startOffset) {
- return new PersisterStateBatchCombiner(
- soFar,
- partitionData.stateBatches().stream()
- .map(PersisterStateBatch::from)
- .collect(Collectors.toList()),
+ long startOffset
+ ) {
+ return new PersisterStateBatchCombiner(soFar,
partitionData.stateBatches().stream()
+ .map(PersisterStateBatch::from)
+ .toList(),
startOffset
- )
- .combineStateBatches();
+ ).combineStateBatches();
}
private Optional<CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord>> maybeGetWriteStateError(
@@ -631,30 +676,30 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
int partitionId = partitionData.partition();
if (topicId == null) {
- return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST,
NULL_TOPIC_ID, null, partitionId));
+ return
Optional.of(getWriteErrorCoordinatorResult(Errors.INVALID_REQUEST,
NULL_TOPIC_ID, null, partitionId));
}
if (partitionId < 0) {
- return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST,
NEGATIVE_PARTITION_ID, topicId, partitionId));
+ return
Optional.of(getWriteErrorCoordinatorResult(Errors.INVALID_REQUEST,
NEGATIVE_PARTITION_ID, topicId, partitionId));
}
SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId,
topicId, partitionId);
if (partitionData.leaderEpoch() != -1 &&
leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) >
partitionData.leaderEpoch()) {
log.error("Request leader epoch smaller than last recorded.");
- return
Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId,
partitionId));
+ return
Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_LEADER_EPOCH, null,
topicId, partitionId));
}
if (partitionData.stateEpoch() != -1 &&
stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) >
partitionData.stateEpoch()) {
log.error("Request state epoch smaller than last recorded.");
- return
Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId,
partitionId));
+ return
Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, null,
topicId, partitionId));
}
if (metadataImage == null) {
log.error("Metadata image is null");
- return
Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null,
topicId, partitionId));
+ return
Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
null, topicId, partitionId));
}
if (metadataImage.topics().getTopic(topicId) == null ||
metadataImage.topics().getPartition(topicId, partitionId) == null)
{
log.error("Topic/TopicPartition not found in metadata image.");
- return
Optional.of(getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null,
topicId, partitionId));
+ return
Optional.of(getWriteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
null, topicId, partitionId));
}
return Optional.empty();
@@ -743,28 +788,65 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
int partitionId = partitionData.partition();
if (topicId == null) {
- return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST,
NULL_TOPIC_ID, null, partitionId));
+ return
Optional.of(getDeleteErrorCoordinatorResult(Errors.INVALID_REQUEST,
NULL_TOPIC_ID, null, partitionId));
+ }
+
+ if (partitionId < 0) {
+ return
Optional.of(getDeleteErrorCoordinatorResult(Errors.INVALID_REQUEST,
NEGATIVE_PARTITION_ID, topicId, partitionId));
+ }
+
+ if (metadataImage == null) {
+ log.error("Metadata image is null");
+ return
Optional.of(getDeleteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
null, topicId, partitionId));
+ }
+
+ if (metadataImage.topics().getTopic(topicId) == null ||
+ metadataImage.topics().getPartition(topicId, partitionId) == null)
{
+ log.error("Topic/TopicPartition not found in metadata image.");
+ return
Optional.of(getDeleteErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
null, topicId, partitionId));
+ }
+
+ return Optional.empty();
+ }
+
+ private Optional<CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord>> maybeGetInitializeStateError(
+ InitializeShareGroupStateRequestData request
+ ) {
+ InitializeShareGroupStateRequestData.InitializeStateData topicData =
request.topics().get(0);
+ InitializeShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
+
+ Uuid topicId = topicData.topicId();
+ int partitionId = partitionData.partition();
+
+ if (topicId == null) {
+ return
Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST,
NULL_TOPIC_ID, null, partitionId));
}
if (partitionId < 0) {
- return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST,
NEGATIVE_PARTITION_ID, topicId, partitionId));
+ return
Optional.of(getInitializeErrorCoordinatorResult(Errors.INVALID_REQUEST,
NEGATIVE_PARTITION_ID, topicId, partitionId));
+ }
+
+ SharePartitionKey key =
SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
+ if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(key)
&& stateEpochMap.get(key) > partitionData.stateEpoch()) {
+ log.error("Initialize request state epoch smaller than last
recorded.");
+ return
Optional.of(getInitializeErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH,
Errors.FENCED_STATE_EPOCH.exception(), topicId, partitionId));
}
if (metadataImage == null) {
log.error("Metadata image is null");
- return
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null,
topicId, partitionId));
+ return
Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
null, topicId, partitionId));
}
if (metadataImage.topics().getTopic(topicId) == null ||
metadataImage.topics().getPartition(topicId, partitionId) == null)
{
log.error("Topic/TopicPartition not found in metadata image.");
- return
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null,
topicId, partitionId));
+ return
Optional.of(getInitializeErrorCoordinatorResult(Errors.UNKNOWN_TOPIC_OR_PARTITION,
null, topicId, partitionId));
}
return Optional.empty();
}
- private CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> getWriteErrorResponse(
+ private CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> getWriteErrorCoordinatorResult(
Errors error,
Exception exception,
Uuid topicId,
@@ -772,10 +854,10 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
) {
String message = exception == null ? error.message() :
exception.getMessage();
WriteShareGroupStateResponseData responseData =
WriteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error,
message);
- return new CoordinatorResult<>(Collections.emptyList(), responseData);
+ return new CoordinatorResult<>(List.of(), responseData);
}
- private CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> getDeleteErrorResponse(
+ private CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> getDeleteErrorCoordinatorResult(
Errors error,
Exception exception,
Uuid topicId,
@@ -783,7 +865,18 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
) {
String message = exception == null ? error.message() :
exception.getMessage();
DeleteShareGroupStateResponseData responseData =
DeleteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error,
message);
- return new CoordinatorResult<>(Collections.emptyList(), responseData);
+ return new CoordinatorResult<>(List.of(), responseData);
+ }
+
+ private CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord> getInitializeErrorCoordinatorResult(
+ Errors error,
+ Exception exception,
+ Uuid topicId,
+ int partitionId
+ ) {
+ String message = exception == null ? error.message() :
exception.getMessage();
+ InitializeShareGroupStateResponseData responseData =
InitializeShareGroupStateResponse.toErrorResponseData(topicId, partitionId,
error, message);
+ return new CoordinatorResult<>(List.of(), responseData);
}
// Visible for testing
@@ -820,7 +913,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(new PersisterStateBatchCombiner(currentBatches,
newData.stateBatches().stream()
.map(ShareCoordinatorShard::toPersisterStateBatch)
- .collect(Collectors.toList()), newStartOffset)
+ .toList(), newStartOffset)
.combineStateBatches())
.build();
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index d7868f0cc63..2d659607a1d 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.share;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
@@ -94,13 +95,29 @@ public class ShareGroupOffset {
}
public static ShareGroupOffset
fromRequest(WriteShareGroupStateRequestData.PartitionData data, int
snapshotEpoch) {
- return new ShareGroupOffset(snapshotEpoch,
+ return new ShareGroupOffset(
+ snapshotEpoch,
data.stateEpoch(),
data.leaderEpoch(),
data.startOffset(),
data.stateBatches().stream()
.map(PersisterStateBatch::from)
- .collect(Collectors.toList()));
+ .toList()
+ );
+ }
+
+ public static ShareGroupOffset
fromRequest(InitializeShareGroupStateRequestData.PartitionData data) {
+ return fromRequest(data, 0);
+ }
+
+ public static ShareGroupOffset
fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int
snapshotEpoch) {
+ return new ShareGroupOffset(
+ snapshotEpoch,
+ data.stateEpoch(),
+ -1,
+ data.startOffset(),
+ List.of()
+ );
}
public LinkedHashSet<PersisterStateBatch> stateBatchAsSet() {
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 8738db4dd9a..399643e32a9 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -26,6 +26,8 @@ import
org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -51,7 +53,6 @@ import org.apache.kafka.server.util.timer.Timer;
import org.junit.jupiter.api.Test;
import java.time.Duration;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -133,7 +134,7 @@ class ShareCoordinatorServiceTest {
WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
+ .setTopics(List.of(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@@ -199,7 +200,7 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult> result =
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<WriteShareGroupStateResponseData.WriteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<WriteShareGroupStateResponseData.WriteStateResult>
expectedResult = new HashSet<>(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
@@ -210,7 +211,7 @@ class ShareCoordinatorServiceTest {
.setPartition(partition1)))));
assertEquals(expectedResult, result);
verify(time, times(2)).hiResClockMs();
- Set<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
+ Set<MetricName> expectedMetrics = new HashSet<>(List.of(
metrics.metricName("write-latency-avg",
ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-latency-max",
ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-rate",
ShareCoordinatorMetrics.METRICS_GROUP),
@@ -243,7 +244,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateRequestData request = new
ReadShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
+ .setTopics(List.of(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@@ -283,7 +284,7 @@ class ShareCoordinatorServiceTest {
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
.setStartOffset(0)
- .setStateBatches(Arrays.asList(
+ .setStateBatches(List.of(
new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
@@ -315,7 +316,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateResponseData.ReadStateResult> result = new
HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<ReadShareGroupStateResponseData.ReadStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<ReadShareGroupStateResponseData.ReadStateResult>
expectedResult = new HashSet<>(List.of(
topicData1,
topicData2));
assertEquals(expectedResult, result);
@@ -345,23 +346,20 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryRequestData request = new
ReadShareGroupStateSummaryRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
- new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
- .setTopicId(topicId1)
- .setPartitions(List.of(
- new
ReadShareGroupStateSummaryRequestData.PartitionData()
- .setPartition(partition1)
- .setLeaderEpoch(1)
- )),
- new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
- .setTopicId(topicId2)
- .setPartitions(List.of(
- new
ReadShareGroupStateSummaryRequestData.PartitionData()
- .setPartition(partition2)
- .setLeaderEpoch(1)
- ))
- )
- );
+ .setTopics(List.of(
+ new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition1)
+ .setLeaderEpoch(1))),
+ new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition2)
+ .setLeaderEpoch(1)))
+ ));
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult
topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId1)
@@ -385,12 +383,12 @@ class ShareCoordinatorServiceTest {
eq("read-share-group-state-summary"),
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
any(),
- any()
- ))
- .thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResponseData()
- .setResults(List.of(topicData1))))
- .thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResponseData()
- .setResults(List.of(topicData2))));
+ any())
+ ).thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResponseData()
+ .setResults(List.of(topicData1)))
+ ).thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResponseData()
+ .setResults(List.of(topicData2)))
+ );
CompletableFuture<ReadShareGroupStateSummaryResponseData> future =
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
@@ -399,7 +397,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>
expectedResult = new HashSet<>(List.of(
topicData1,
topicData2));
assertEquals(expectedResult, result);
@@ -432,21 +430,18 @@ class ShareCoordinatorServiceTest {
DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
- new DeleteShareGroupStateRequestData.DeleteStateData()
- .setTopicId(topicId1)
- .setPartitions(List.of(
- new
DeleteShareGroupStateRequestData.PartitionData()
- .setPartition(partition1)
- )),
- new DeleteShareGroupStateRequestData.DeleteStateData()
- .setTopicId(topicId2)
- .setPartitions(List.of(
- new
DeleteShareGroupStateRequestData.PartitionData()
- .setPartition(partition2)
- ))
- )
- );
+ .setTopics(List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition1))),
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition2)))
+ ));
DeleteShareGroupStateResponseData response1 = new
DeleteShareGroupStateResponseData()
.setResults(List.of(
@@ -469,9 +464,7 @@ class ShareCoordinatorServiceTest {
eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
eq(Duration.ofMillis(5000)),
any()
- ))
- .thenReturn(CompletableFuture.completedFuture(response1))
- .thenReturn(CompletableFuture.completedFuture(response2));
+
)).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2));
CompletableFuture<DeleteShareGroupStateResponseData> future =
service.deleteState(
requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
@@ -480,7 +473,7 @@ class ShareCoordinatorServiceTest {
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result =
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<DeleteShareGroupStateResponseData.DeleteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<DeleteShareGroupStateResponseData.DeleteStateResult>
expectedResult = new HashSet<>(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
@@ -492,6 +485,91 @@ class ShareCoordinatorServiceTest {
assertEquals(expectedResult, result);
}
+ @Test
+ public void testInitializeStateSuccess() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Metrics metrics = new Metrics();
+ ShareCoordinatorMetrics coordinatorMetrics = new
ShareCoordinatorMetrics(metrics);
+ Time time = mock(Time.class);
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ coordinatorMetrics,
+ time,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ service.startup(() -> 1);
+
+ String groupId = "group1";
+ Uuid topicId1 = Uuid.randomUuid();
+ int partition1 = 0;
+
+ Uuid topicId2 = Uuid.randomUuid();
+ int partition2 = 1;
+
+ InitializeShareGroupStateRequestData request = new
InitializeShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(partition1)
+ .setStartOffset(0)
+ .setStateEpoch(1))),
+ new InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(partition2)
+ .setStartOffset(5)
+ .setStateEpoch(1)))
+ ));
+
+ InitializeShareGroupStateResponseData response1 = new
InitializeShareGroupStateResponseData().setResults(List.of(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition1)))
+ ));
+
+ InitializeShareGroupStateResponseData response2 = new
InitializeShareGroupStateResponseData().setResults(List.of(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition2)))
+ ));
+
+ when(runtime.scheduleWriteOperation(
+ eq("initialize-share-group-state"),
+ eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
+ eq(Duration.ofMillis(5000)),
+
any())).thenReturn(CompletableFuture.completedFuture(response1)).thenReturn(CompletableFuture.completedFuture(response2)
+ );
+
+ CompletableFuture<InitializeShareGroupStateResponseData> future =
service.initializeState(
+ requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+ request
+ );
+
+ HashSet<InitializeShareGroupStateResponseData.InitializeStateResult>
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+ HashSet<InitializeShareGroupStateResponseData.InitializeStateResult>
expectedResult = new HashSet<>(List.of(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition2))),
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition1)))
+ ));
+ assertEquals(expectedResult, result);
+ }
+
@Test
public void testWriteStateValidationsError() throws ExecutionException,
InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -680,6 +758,53 @@ class ShareCoordinatorServiceTest {
);
}
+ @Test
+ public void testInitializeStateValidationError() throws
ExecutionException, InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ service.startup(() -> 1);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+
+ // 1. Empty topicsData
+ assertEquals(new InitializeShareGroupStateResponseData(),
+ service.initializeState(
+ requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+ new InitializeShareGroupStateRequestData().setGroupId(groupId)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // 2. Empty partitionsData
+ assertEquals(new InitializeShareGroupStateResponseData(),
+ service.initializeState(
+ requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+ new
InitializeShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
+ new
InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId)))
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // 3. Invalid groupId
+ assertEquals(new InitializeShareGroupStateResponseData(),
+ service.initializeState(
+ requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+ new
InitializeShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+ new
InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId).setPartitions(List.of(
+ new
InitializeShareGroupStateRequestData.PartitionData().setPartition(partition)))))
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
+
@Test
public void testWriteStateWhenNotStarted() throws ExecutionException,
InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -702,7 +827,7 @@ class ShareCoordinatorServiceTest {
WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
+ .setTopics(List.of(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@@ -743,7 +868,7 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult> result =
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<WriteShareGroupStateResponseData.WriteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<WriteShareGroupStateResponseData.WriteStateResult>
expectedResult = new HashSet<>(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
@@ -781,7 +906,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateRequestData request = new
ReadShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
+ .setTopics(List.of(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@@ -806,7 +931,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateResponseData.ReadStateResult> result = new
HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<ReadShareGroupStateResponseData.ReadStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<ReadShareGroupStateResponseData.ReadStateResult>
expectedResult = new HashSet<>(List.of(
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new
ReadShareGroupStateResponseData.PartitionResult()
@@ -844,7 +969,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryRequestData request = new
ReadShareGroupStateSummaryRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
+ .setTopics(List.of(
new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
.setPartitions(List.of(
@@ -869,7 +994,7 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>
expectedResult = new HashSet<>(List.of(
new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId2)
.setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
@@ -907,7 +1032,7 @@ class ShareCoordinatorServiceTest {
DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Arrays.asList(
+ .setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
@@ -930,7 +1055,7 @@ class ShareCoordinatorServiceTest {
HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result =
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
- HashSet<DeleteShareGroupStateResponseData.DeleteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ HashSet<DeleteShareGroupStateResponseData.DeleteStateResult>
expectedResult = new HashSet<>(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId2)
.setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
@@ -946,6 +1071,66 @@ class ShareCoordinatorServiceTest {
assertEquals(expectedResult, result);
}
+ @Test
+ public void testInitializeStateWhenNotStarted() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ String groupId = "group1";
+ Uuid topicId1 = Uuid.randomUuid();
+ int partition1 = 0;
+
+ Uuid topicId2 = Uuid.randomUuid();
+ int partition2 = 1;
+
+ InitializeShareGroupStateRequestData request = new
InitializeShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(partition1)
+ )),
+ new InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(partition2)
+ ))
+ ));
+
+ CompletableFuture<InitializeShareGroupStateResponseData> future =
service.initializeState(
+ requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+ request
+ );
+
+ HashSet<InitializeShareGroupStateResponseData.InitializeStateResult>
result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+ HashSet<InitializeShareGroupStateResponseData.InitializeStateResult>
expectedResult = new HashSet<>(List.of(
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition2)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage("Share coordinator is not available."))),
+ new InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition1)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage("Share coordinator is not
available.")))));
+ assertEquals(expectedResult, result);
+ }
+
@Test
public void testWriteFutureReturnsError() throws ExecutionException,
InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -1124,6 +1309,47 @@ class ShareCoordinatorServiceTest {
);
}
+ @Test
+ public void testInitializeFutureReturnsError() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ service.startup(() -> 1);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+
+ when(runtime.scheduleWriteOperation(any(), any(), any(),
any())).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
+
+ assertEquals(
+ new InitializeShareGroupStateResponseData().setResults(List.of(new
InitializeShareGroupStateResponseData.InitializeStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(new
InitializeShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage("Unable to initialize share group state:
This server does not host this topic-partition.")))
+ )),
+
service.initializeState(requestContext(ApiKeys.INITIALIZE_SHARE_GROUP_STATE),
+ new InitializeShareGroupStateRequestData().setGroupId(groupId)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId)
+ .setPartitions(List.of(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ))
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
+
@Test
public void testTopicPartitionFor() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -1676,7 +1902,7 @@ class ShareCoordinatorServiceTest {
}
private void checkMetrics(Metrics metrics) {
- Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
+ Set<MetricName> usualMetrics = new HashSet<>(List.of(
metrics.metricName("write-latency-avg",
ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-latency-max",
ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-rate",
ShareCoordinatorMetrics.METRICS_GROUP),
@@ -1687,17 +1913,12 @@ class ShareCoordinatorServiceTest {
}
private void checkPruneMetric(Metrics metrics, String topic, int
partition, boolean checkPresence) {
- boolean isPresent = metrics.metrics().containsKey(
- metrics.metricName(
- "last-pruned-offset",
- ShareCoordinatorMetrics.METRICS_GROUP,
- "The offset at which the share-group state topic was last
pruned.",
- Map.of(
- "topic", topic,
- "partition", Integer.toString(partition)
- )
- )
- );
+ boolean isPresent = metrics.metrics().containsKey(metrics.metricName(
+ "last-pruned-offset",
+ ShareCoordinatorMetrics.METRICS_GROUP,
+ "The offset at which the share-group state topic was last pruned.",
+ Map.of("topic", topic, "partition", Integer.toString(partition))
+ ));
assertEquals(checkPresence, isPresent);
}
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 7ebb5ce3954..ab136f8dc31 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
+import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
+import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -29,6 +31,7 @@ import
org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@@ -1173,6 +1176,184 @@ class ShareCoordinatorShardTest {
verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
}
+ @Test
+ public void testInitializeStateSuccess() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+
+ SharePartitionKey shareCoordinatorKey =
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+ InitializeShareGroupStateRequestData request = new
InitializeShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)
+ .setStartOffset(10)
+ .setStateEpoch(5)))
+ ));
+
+ assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+ assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+ CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord> result = shard.initializeState(request);
+ result.records().forEach(record -> shard.replay(0L, 0L, (short) 0,
record));
+
+ InitializeShareGroupStateResponseData expectedData =
InitializeShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+ List<CoordinatorRecord> expectedRecords = List.of(
+ ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0))
+ ));
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+
+ assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
+ assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+ }
+
+ @Test
+ public void testInitializeStateInvalidRequestData() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+
+ // invalid partition
+ int partition = -1;
+
+ InitializeShareGroupStateRequestData request = new
InitializeShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ));
+
+ CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord> result = shard.initializeState(request);
+
+ InitializeShareGroupStateResponseData expectedData =
InitializeShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, partition, Errors.INVALID_REQUEST,
ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+
+ // invalid state epoch
+ int stateEpoch = 1;
+ partition = 0;
+ shard.replay(0L, 0L, (short) 0,
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID, TOPIC_ID, partition, new ShareGroupOffset.Builder()
+ .setStateEpoch(5)
+ .setSnapshotEpoch(0)
+ .setStateBatches(List.of())
+ .build()
+ ));
+
+ request = new InitializeShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ));
+
+ result = shard.initializeState(request);
+
+ expectedData = InitializeShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, partition, Errors.FENCED_STATE_EPOCH,
Errors.FENCED_STATE_EPOCH.exception().getMessage());
+ expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testInitializeNullMetadataImage() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ shard.onNewMetadataImage(null, null);
+
+ InitializeShareGroupStateRequestData request = new
InitializeShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(0)
+ ))
+ ));
+
+ CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord> result = shard.initializeState(request);
+
+ InitializeShareGroupStateResponseData expectedData =
InitializeShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testInitializeTopicIdNonExistentInMetadataImage() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+
+ InitializeShareGroupStateRequestData request = new
InitializeShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(0)
+ ))
+ ));
+
+ // topic id not found in cache
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(null);
+ when(image.topics()).thenReturn(topicsImage);
+ CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord> result = shard.initializeState(request);
+
+ InitializeShareGroupStateResponseData expectedData =
InitializeShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+ }
+
+ @Test
+ public void testInitializePartitionIdNonExistentInMetadataImage() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+
+ InitializeShareGroupStateRequestData request = new
InitializeShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(0)
+ ))
+ ));
+
+ // topic id found in cache
+ TopicsImage topicsImage = mock(TopicsImage.class);
+
when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(mock(TopicImage.class));
+ when(image.topics()).thenReturn(topicsImage);
+
+ // partition id not found
+ when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn(null);
+ CoordinatorResult<InitializeShareGroupStateResponseData,
CoordinatorRecord> result = shard.initializeState(request);
+
+ InitializeShareGroupStateResponseData expectedData =
InitializeShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+ verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
+ }
+
private static ShareGroupOffset groupOffset(ApiMessage record) {
if (record instanceof ShareSnapshotValue) {
return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);