This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f32932cc257 KAFKA-18629: Delete share group state impl [1/N] (#18712)
f32932cc257 is described below
commit f32932cc257be48bec4dc6aa9bf30b666f0a5a60
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Jan 28 17:13:01 2025 +0530
KAFKA-18629: Delete share group state impl [1/N] (#18712)
Reviewers: Christo Lolov <[email protected]>, Andrew Schofield
<[email protected]>
---
.../requests/DeleteShareGroupStateResponse.java | 45 +++
core/src/main/scala/kafka/server/KafkaApis.scala | 19 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 119 +++++++
.../kafka/coordinator/share/ShareCoordinator.java | 10 +
.../share/ShareCoordinatorRecordHelpers.java | 10 +
.../coordinator/share/ShareCoordinatorService.java | 130 ++++++++
.../coordinator/share/ShareCoordinatorShard.java | 125 ++++++-
.../share/ShareCoordinatorRecordHelpersTest.java | 21 ++
.../share/ShareCoordinatorServiceTest.java | 360 +++++++++++++++++----
.../share/ShareCoordinatorShardTest.java | 216 +++++++++++++
10 files changed, 981 insertions(+), 74 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
index 78c87a63987..69087ebc0c8 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
@@ -17,13 +17,16 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class DeleteShareGroupStateResponse extends AbstractResponse {
@@ -65,4 +68,46 @@ public class DeleteShareGroupStateResponse extends
AbstractResponse {
new DeleteShareGroupStateResponseData(new
ByteBufferAccessor(buffer), version)
);
}
+
+ public static DeleteShareGroupStateResponseData toResponseData(Uuid
topicId, int partitionId) {
+ return new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionId)))));
+ }
+
+ public static DeleteShareGroupStateResponseData.PartitionResult
toErrorResponsePartitionResult(
+ int partitionId,
+ Errors error,
+ String errorMessage
+ ) {
+ return new DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionId)
+ .setErrorCode(error.code())
+ .setErrorMessage(errorMessage);
+ }
+
+ public static DeleteShareGroupStateResponseData.DeleteStateResult
toResponseDeleteStateResult(Uuid topicId,
List<DeleteShareGroupStateResponseData.PartitionResult> partitionResults) {
+ return new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(partitionResults);
+ }
+
+ public static DeleteShareGroupStateResponseData.PartitionResult
toResponsePartitionResult(int partitionId) {
+ return new DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionId);
+ }
+
+ public static DeleteShareGroupStateResponseData toErrorResponseData(Uuid
topicId, int partitionId, Errors error, String errorMessage) {
+ return new DeleteShareGroupStateResponseData().setResults(
+ Collections.singletonList(new
DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partitionId)
+ .setErrorCode(error.code())
+ .setErrorMessage(errorMessage)))));
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 11623b02a3d..82132d868bf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3148,9 +3148,22 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteShareGroupStateRequest(request: RequestChannel.Request):
Unit = {
val deleteShareGroupStateRequest =
request.body[DeleteShareGroupStateRequest]
- // TODO: Implement the DeleteShareGroupStateRequest handling
- requestHelper.sendMaybeThrottle(request,
deleteShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+ shareCoordinator match {
+ case None => requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs =>
+ deleteShareGroupStateRequest.getErrorResponse(requestThrottleMs,
+ new ApiException("Share coordinator is not enabled.")))
+
+ case Some(coordinator) => coordinator.deleteState(request.context,
deleteShareGroupStateRequest.data)
+ .handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
deleteShareGroupStateRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(request, new
DeleteShareGroupStateResponse(response))
+ }
+ }
+ }
}
def handleReadShareGroupStateSummaryRequest(request:
RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 18930be5ee6..bcf9ec587d5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10547,6 +10547,96 @@ class KafkaApisTest extends Logging {
})
}
+ @Test
+ def testDeleteShareGroupStateSuccess(): Unit = {
+ val topicId = Uuid.randomUuid();
+ val deleteRequestData = new DeleteShareGroupStateRequestData()
+ .setGroupId("group1")
+ .setTopics(List(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(1)
+ ).asJava)
+ ).asJava)
+
+ val deleteStateResultData:
util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(1)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)
+ ).asJava)
+ ).asJava
+
+ val config = Map(
+ ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+ )
+
+ val response = getDeleteShareGroupResponse(
+ deleteRequestData,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+ verifyNoErr = true,
+ null,
+ deleteStateResultData
+ )
+
+ assertNotNull(response.data)
+ assertEquals(1, response.data.results.size)
+ }
+
+ @Test
+ def testDeleteShareGroupStateAuthorizationFailed(): Unit = {
+ val topicId = Uuid.randomUuid();
+ val deleteRequestData = new DeleteShareGroupStateRequestData()
+ .setGroupId("group1")
+ .setTopics(List(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(1)
+ ).asJava)
+ ).asJava)
+
+ val deleteStateResultData:
util.List[DeleteShareGroupStateResponseData.DeleteStateResult] = List(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List(
+ new DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(1)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)
+ ).asJava)
+ ).asJava
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava,
Seq(AuthorizationResult.ALLOWED).asJava)
+
+ val config = Map(
+ ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
+ )
+
+ val response = getDeleteShareGroupResponse(
+ deleteRequestData,
+ config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
+ verifyNoErr = false,
+ authorizer,
+ deleteStateResultData
+ )
+
+ assertNotNull(response.data)
+ assertEquals(1, response.data.results.size)
+ response.data.results.forEach(deleteResult => {
+ assertEquals(1, deleteResult.partitions.size)
+ assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(),
deleteResult.partitions.get(0).errorCode())
+ })
+ }
+
def getShareGroupDescribeResponse(groupIds: util.List[String],
configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer:
Authorizer = null,
describedGroups:
util.List[ShareGroupDescribeResponseData.DescribedGroup]):
ShareGroupDescribeResponse = {
@@ -10663,4 +10753,33 @@ class KafkaApisTest extends Logging {
}
response
}
+
+ def getDeleteShareGroupResponse(requestData:
DeleteShareGroupStateRequestData, configOverrides: Map[String, String] =
Map.empty,
+ verifyNoErr: Boolean = true, authorizer:
Authorizer = null,
+ deleteStateResult:
util.List[DeleteShareGroupStateResponseData.DeleteStateResult]):
DeleteShareGroupStateResponse = {
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupStateRequest.Builder(requestData, true).build())
+
+ val future = new CompletableFuture[DeleteShareGroupStateResponseData]()
+ when(shareCoordinator.deleteState(
+ any[RequestContext],
+ any[DeleteShareGroupStateRequestData]
+ )).thenReturn(future)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis(
+ overrideProperties = configOverrides,
+ authorizer = Option(authorizer),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
+
+ future.complete(new DeleteShareGroupStateResponseData()
+ .setResults(deleteStateResult))
+
+ val response =
verifyNoThrottling[DeleteShareGroupStateResponse](requestChannelRequest)
+ if (verifyNoErr) {
+ val expectedDeleteShareGroupStateResponseData = new
DeleteShareGroupStateResponseData()
+ .setResults(deleteStateResult)
+ assertEquals(expectedDeleteShareGroupStateResponseData, response.data)
+ }
+ response
+ }
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index 72427ac8705..61e96e37f07 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -17,6 +17,8 @@
package org.apache.kafka.coordinator.share;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -86,6 +88,14 @@ public interface ShareCoordinator {
*/
CompletableFuture<ReadShareGroupStateSummaryResponseData>
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData
request);
+ /**
+ * Handle delete share group state call
+ * @param context - represents the incoming delete share group request
context
+ * @param request - actual RPC request object
+ * @return completable future representing delete share group RPC response
data
+ */
+ CompletableFuture<DeleteShareGroupStateResponseData>
deleteState(RequestContext context, DeleteShareGroupStateRequestData request);
+
/**
* Called when new coordinator is elected
* @param partitionIndex - The partition index (internal topic)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
index e1e9e9a11f9..17c869c4499 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
@@ -71,4 +71,14 @@ public class ShareCoordinatorRecordHelpers {
)
);
}
+
+ public static CoordinatorRecord newShareStateTombstoneRecord(String
groupId, Uuid topicId, int partitionId) {
+ // Always generate share snapshot type record for tombstone.
+ return CoordinatorRecord.tombstone(
+ new ShareSnapshotKey()
+ .setGroupId(groupId)
+ .setTopicId(topicId)
+ .setPartition(partitionId)
+ );
+ }
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 05fcffb43c6..3139e2ef07d 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -29,6 +31,7 @@ import
org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.RequestContext;
@@ -694,6 +697,116 @@ public class ShareCoordinatorService implements
ShareCoordinator {
});
}
+ @Override
+ public CompletableFuture<DeleteShareGroupStateResponseData>
deleteState(RequestContext context, DeleteShareGroupStateRequestData request) {
+ // Send an empty response if the coordinator is not active.
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+ generateErrorDeleteStateResponse(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ "Share coordinator is not available."
+ )
+ );
+ }
+
+ String groupId = request.groupId();
+ // Send an empty response if groupId is invalid.
+ if (isGroupIdEmpty(groupId)) {
+ log.error("Group id must be specified and non-empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResponseData()
+ );
+ }
+
+ // Send an empty response if topic data is empty.
+ if (isEmpty(request.topics())) {
+ log.error("Topic Data is empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResponseData()
+ );
+ }
+
+ // Send an empty response if partition data is empty for any topic.
+ for (DeleteShareGroupStateRequestData.DeleteStateData topicData :
request.topics()) {
+ if (isEmpty(topicData.partitions())) {
+ log.error("Partition Data for topic {} is empty: {}",
topicData.topicId(), request);
+ return CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResponseData()
+ );
+ }
+ }
+
+ // A map to store the futures for each topicId and partition.
+ Map<Uuid, Map<Integer,
CompletableFuture<DeleteShareGroupStateResponseData>>> futureMap = new
HashMap<>();
+
+ // The request received here could have multiple keys of structure
group:topic:partition. However,
+ // the deleteState method in ShareCoordinatorShard expects a single
key in the request. Hence, we will
+ // be looping over the keys below and constructing new
DeleteShareGroupStateRequestData objects to pass
+ // onto the shard method.
+
+ for (DeleteShareGroupStateRequestData.DeleteStateData topicData :
request.topics()) {
+ Uuid topicId = topicData.topicId();
+ for (DeleteShareGroupStateRequestData.PartitionData partitionData
: topicData.partitions()) {
+ SharePartitionKey coordinatorKey =
SharePartitionKey.getInstance(request.groupId(), topicId,
partitionData.partition());
+
+ DeleteShareGroupStateRequestData requestForCurrentPartition =
new DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(List.of(partitionData))));
+
+ CompletableFuture<DeleteShareGroupStateResponseData>
deleteFuture = runtime.scheduleWriteOperation(
+ "delete-share-group-state",
+ topicPartitionFor(coordinatorKey),
+ Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ coordinator ->
coordinator.deleteState(requestForCurrentPartition)
+ ).exceptionally(deleteException ->
+ handleOperationException(
+ "delete-share-group-state",
+ request,
+ deleteException,
+ (error, message) ->
DeleteShareGroupStateResponse.toErrorResponseData(
+ topicData.topicId(),
+ partitionData.partition(),
+ error,
+ "Unable to delete share group state: " +
deleteException.getMessage()
+ ),
+ log
+ ));
+
+ futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+ .put(partitionData.partition(), deleteFuture);
+ }
+ }
+
+ // Combine all futures into a single CompletableFuture<Void>.
+ CompletableFuture<Void> combinedFuture =
CompletableFuture.allOf(futureMap.values().stream()
+ .flatMap(map ->
map.values().stream()).toArray(CompletableFuture[]::new));
+
+ // Transform the combined CompletableFuture<Void> into
CompletableFuture<DeleteShareGroupStateResponseData>.
+ return combinedFuture.thenApply(v -> {
+ List<DeleteShareGroupStateResponseData.DeleteStateResult>
deleteStateResult = new ArrayList<>(futureMap.size());
+ futureMap.forEach(
+ (topicId, topicEntry) -> {
+ List<DeleteShareGroupStateResponseData.PartitionResult>
partitionResults = new ArrayList<>(topicEntry.size());
+ topicEntry.forEach(
+ (partitionId, responseFuture) -> {
+ // ResponseFut would already be completed by now
since we have used
+ // CompletableFuture::allOf to create a combined
future from the future map.
+ partitionResults.add(
+
responseFuture.getNow(null).results().get(0).partitions().get(0)
+ );
+ }
+ );
+
deleteStateResult.add(DeleteShareGroupStateResponse.toResponseDeleteStateResult(topicId,
partitionResults));
+ }
+ );
+ return new DeleteShareGroupStateResponseData()
+ .setResults(deleteStateResult);
+ });
+ }
+
private ReadShareGroupStateResponseData generateErrorReadStateResponse(
ReadShareGroupStateRequestData request,
Errors error,
@@ -746,6 +859,23 @@ public class ShareCoordinatorService implements
ShareCoordinator {
}).collect(Collectors.toList()));
}
+ private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse(
+ DeleteShareGroupStateRequestData request,
+ Errors error,
+ String errorMessage
+ ) {
+ return new
DeleteShareGroupStateResponseData().setResults(request.topics().stream()
+ .map(topicData -> {
+ DeleteShareGroupStateResponseData.DeleteStateResult resultData
= new DeleteShareGroupStateResponseData.DeleteStateResult();
+ resultData.setTopicId(topicData.topicId());
+ resultData.setPartitions(topicData.partitions().stream()
+ .map(partitionData ->
DeleteShareGroupStateResponse.toErrorResponsePartitionResult(
+ partitionData.partition(), error, errorMessage
+ )).collect(Collectors.toList()));
+ return resultData;
+ }).collect(Collectors.toList()));
+ }
+
private static boolean isGroupIdEmpty(String groupId) {
return groupId == null || groupId.isEmpty();
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 53a64174f38..a71a911907a 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -20,6 +20,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -28,6 +30,7 @@ import
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.TransactionResult;
@@ -228,16 +231,25 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue
value, long offset) {
SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
- maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
- maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
-
- ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
- // this record is the complete snapshot
- shareStateMap.put(mapKey, offsetRecord);
- // if number of share updates is exceeded, then reset it
- if (snapshotUpdateCount.containsKey(mapKey)) {
- if (snapshotUpdateCount.get(mapKey) >=
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
- snapshotUpdateCount.put(mapKey, 0);
+ if (value == null) {
+ log.debug("Tombstone records received for share partition key:
{}", mapKey);
+ // Consider this a tombstone.
+ shareStateMap.remove(mapKey);
+ leaderEpochMap.remove(mapKey);
+ stateEpochMap.remove(mapKey);
+ snapshotUpdateCount.remove(mapKey);
+ } else {
+ maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+ maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
+
+ ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+ // This record is the complete snapshot.
+ shareStateMap.put(mapKey, offsetRecord);
+ // If number of share updates is exceeded, then reset it.
+ if (snapshotUpdateCount.containsKey(mapKey)) {
+ if (snapshotUpdateCount.get(mapKey) >=
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
+ snapshotUpdateCount.put(mapKey, 0);
+ }
}
}
@@ -475,6 +487,49 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
);
}
+ /**
+ * This method writes tombstone records corresponding to the requested
topic partitions.
+ * <p>
+ * This method as called by the ShareCoordinatorService will be provided
with
+ * the request data which covers only key i.e. group1:topic1:partition1.
The implementation
+ * below was done keeping this in mind.
+ *
+ * @param request - ReadShareGroupStateSummaryRequestData for a single key
+ * @return CoordinatorResult(records, response)
+ */
+
+ public CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> deleteState(
+ DeleteShareGroupStateRequestData request
+ ) {
+ // Records to write (with both key and value of snapshot type),
response to caller
+ // only one key will be there in the request by design.
+ Optional<CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord>> error = maybeGetDeleteStateError(request);
+ if (error.isPresent()) {
+ return error.get();
+ }
+
+ DeleteShareGroupStateRequestData.DeleteStateData topicData =
request.topics().get(0);
+ DeleteShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
+ SharePartitionKey key =
SharePartitionKey.getInstance(request.groupId(), topicData.topicId(),
partitionData.partition());
+
+ CoordinatorRecord record = generateTombstoneRecord(key);
+ // build successful response if record is correctly created
+ DeleteShareGroupStateResponseData responseData = new
DeleteShareGroupStateResponseData()
+ .setResults(
+ List.of(
+
DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
+ List.of(
+
DeleteShareGroupStateResponse.toResponsePartitionResult(
+ key.partition()
+ )
+ )
+ )
+ )
+ );
+
+ return new CoordinatorResult<>(Collections.singletonList(record),
responseData);
+ }
+
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for
a key, based on various conditions.
* <p>
@@ -537,6 +592,14 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
}
+ private CoordinatorRecord generateTombstoneRecord(SharePartitionKey key) {
+ return ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+ key.groupId(),
+ key.topicId(),
+ key.partition()
+ );
+ }
+
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData) {
@@ -670,6 +733,37 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return Optional.empty();
}
+ private Optional<CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord>> maybeGetDeleteStateError(
+ DeleteShareGroupStateRequestData request
+ ) {
+ DeleteShareGroupStateRequestData.DeleteStateData topicData =
request.topics().get(0);
+ DeleteShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
+
+ Uuid topicId = topicData.topicId();
+ int partitionId = partitionData.partition();
+
+ if (topicId == null) {
+ return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST,
NULL_TOPIC_ID, null, partitionId));
+ }
+
+ if (partitionId < 0) {
+ return Optional.of(getDeleteErrorResponse(Errors.INVALID_REQUEST,
NEGATIVE_PARTITION_ID, topicId, partitionId));
+ }
+
+ if (metadataImage == null) {
+ log.error("Metadata image is null");
+ return
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null,
topicId, partitionId));
+ }
+
+ if (metadataImage.topics().getTopic(topicId) == null ||
+ metadataImage.topics().getPartition(topicId, partitionId) == null)
{
+ log.error("Topic/TopicPartition not found in metadata image.");
+ return
Optional.of(getDeleteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null,
topicId, partitionId));
+ }
+
+ return Optional.empty();
+ }
+
private CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> getWriteErrorResponse(
Errors error,
Exception exception,
@@ -681,6 +775,17 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(Collections.emptyList(), responseData);
}
+ private CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> getDeleteErrorResponse(
+ Errors error,
+ Exception exception,
+ Uuid topicId,
+ int partitionId
+ ) {
+ String message = exception == null ? error.message() :
exception.getMessage();
+ DeleteShareGroupStateResponseData responseData =
DeleteShareGroupStateResponse.toErrorResponseData(topicId, partitionId, error,
message);
+ return new CoordinatorResult<>(Collections.emptyList(), responseData);
+ }
+
// Visible for testing
Integer getLeaderMapValue(SharePartitionKey key) {
return this.leaderEpochMap.get(key);
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
index 37d9f9b5127..9de59e499d1 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
@@ -112,4 +112,25 @@ public class ShareCoordinatorRecordHelpersTest {
assertEquals(expectedRecord, record);
}
+
+ @Test
+ public void testNewShareStateTombstoneRecord() {
+ String groupId = "test-group";
+ Uuid topicId = Uuid.randomUuid();
+ int partitionId = 1;
+ CoordinatorRecord record =
ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+ groupId,
+ topicId,
+ partitionId
+ );
+
+ CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
+ new ShareSnapshotKey()
+ .setGroupId(groupId)
+ .setTopicId(topicId)
+ .setPartition(partitionId)
+ );
+
+ assertEquals(expectedRecord, record);
+ }
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 7f7776cd93e..331595119b6 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -49,7 +51,6 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -79,7 +80,7 @@ class ShareCoordinatorServiceTest {
private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>
mockRuntime() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mock(CoordinatorRuntime.class);
when(runtime.activeTopicPartitions())
- .thenReturn(Collections.singletonList(new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
+ .thenReturn(List.of(new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
return runtime;
}
@@ -133,13 +134,13 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -148,13 +149,13 @@ class ShareCoordinatorServiceTest {
)),
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -165,18 +166,18 @@ class ShareCoordinatorServiceTest {
);
WriteShareGroupStateResponseData response1 = new
WriteShareGroupStateResponseData()
- .setResults(Collections.singletonList(
+ .setResults(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))
));
WriteShareGroupStateResponseData response2 = new
WriteShareGroupStateResponseData()
- .setResults(Collections.singletonList(
+ .setResults(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)))
));
@@ -199,11 +200,11 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2))),
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)))));
assertEquals(expectedResult, result);
verify(time, times(2)).hiResClockMs();
@@ -243,14 +244,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@@ -260,12 +261,12 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateResponseData.ReadStateResult topicData1 = new
ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
.setStartOffset(0)
- .setStateBatches(Collections.singletonList(new
ReadShareGroupStateResponseData.StateBatch()
+ .setStateBatches(List.of(new
ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -275,7 +276,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateResponseData.ReadStateResult topicData2 = new
ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
@@ -301,9 +302,9 @@ class ShareCoordinatorServiceTest {
any()
))
.thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateResponseData()
- .setResults(Collections.singletonList(topicData1))))
+ .setResults(List.of(topicData1))))
.thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateResponseData()
- .setResults(Collections.singletonList(topicData2))));
+ .setResults(List.of(topicData2))));
CompletableFuture<ReadShareGroupStateResponseData> future =
service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
@@ -345,14 +346,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@@ -362,7 +363,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult
topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
@@ -371,7 +372,7 @@ class ShareCoordinatorServiceTest {
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult
topicData2 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(1)
@@ -385,9 +386,9 @@ class ShareCoordinatorServiceTest {
any()
))
.thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResponseData()
- .setResults(Collections.singletonList(topicData1))))
+ .setResults(List.of(topicData1))))
.thenReturn(CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResponseData()
- .setResults(Collections.singletonList(topicData2))));
+ .setResults(List.of(topicData2))));
CompletableFuture<ReadShareGroupStateSummaryResponseData> future =
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
@@ -402,6 +403,93 @@ class ShareCoordinatorServiceTest {
assertEquals(expectedResult, result);
}
+ @Test
+ public void testDeleteStateSuccess() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Metrics metrics = new Metrics();
+ ShareCoordinatorMetrics coordinatorMetrics = new
ShareCoordinatorMetrics(metrics);
+ Time time = mock(Time.class);
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ coordinatorMetrics,
+ time,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ service.startup(() -> 1);
+
+ String groupId = "group1";
+ Uuid topicId1 = Uuid.randomUuid();
+ int partition1 = 0;
+
+ Uuid topicId2 = Uuid.randomUuid();
+ int partition2 = 1;
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(Arrays.asList(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition1)
+ )),
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition2)
+ ))
+ )
+ );
+
+ DeleteShareGroupStateResponseData response1 = new
DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition1)))
+ ));
+
+ DeleteShareGroupStateResponseData response2 = new
DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition2)))
+ ));
+
+ when(runtime.scheduleWriteOperation(
+ eq("delete-share-group-state"),
+ eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
+ eq(Duration.ofMillis(5000)),
+ any()
+ ))
+ .thenReturn(CompletableFuture.completedFuture(response1))
+ .thenReturn(CompletableFuture.completedFuture(response2));
+
+ CompletableFuture<DeleteShareGroupStateResponseData> future =
service.deleteState(
+ requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+ request
+ );
+
+ HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result =
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+ HashSet<DeleteShareGroupStateResponseData.DeleteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition2))),
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition1)))));
+ assertEquals(expectedResult, result);
+ }
+
@Test
public void testWriteStateValidationsError() throws ExecutionException,
InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -433,7 +521,7 @@ class ShareCoordinatorServiceTest {
assertEquals(new WriteShareGroupStateResponseData(),
service.writeState(
requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
- new
WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
+ new
WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
new
WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
@@ -442,8 +530,8 @@ class ShareCoordinatorServiceTest {
assertEquals(new WriteShareGroupStateResponseData(),
service.writeState(
requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
- new
WriteShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList(
- new
WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(Collections.singletonList(
+ new
WriteShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+ new
WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId).setPartitions(List.of(
new
WriteShareGroupStateRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
@@ -480,7 +568,7 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateResponseData(),
service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
- new
ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
+ new
ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
new
ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
@@ -489,8 +577,8 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateResponseData(),
service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
- new
ReadShareGroupStateRequestData().setGroupId(null).setTopics(Collections.singletonList(
- new
ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(Collections.singletonList(
+ new
ReadShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+ new
ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(List.of(
new
ReadShareGroupStateRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
@@ -527,7 +615,7 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateSummaryResponseData(),
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
- new
ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(Collections.singletonList(
+ new
ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(List.of(
new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId)))
).get(5, TimeUnit.SECONDS)
);
@@ -536,13 +624,60 @@ class ShareCoordinatorServiceTest {
assertEquals(new ReadShareGroupStateSummaryResponseData(),
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
- new
ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(Collections.singletonList(
- new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(Collections.singletonList(
+ new
ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(List.of(
+ new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(List.of(
new
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partition)))))
).get(5, TimeUnit.SECONDS)
);
}
+ @Test
+ public void testDeleteStateValidationsError() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ service.startup(() -> 1);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+
+ // 1. Empty topicsData
+ assertEquals(new DeleteShareGroupStateResponseData(),
+ service.deleteState(
+ requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+ new DeleteShareGroupStateRequestData().setGroupId(groupId)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // 2. Empty partitionsData
+ assertEquals(new DeleteShareGroupStateResponseData(),
+ service.deleteState(
+ requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+ new
DeleteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(
+ new
DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId)))
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // 3. Invalid groupId
+ assertEquals(new DeleteShareGroupStateResponseData(),
+ service.deleteState(
+ requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+ new
DeleteShareGroupStateRequestData().setGroupId(null).setTopics(List.of(
+ new
DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId).setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)))))
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
+
@Test
public void testWriteStateWhenNotStarted() throws ExecutionException,
InterruptedException, TimeoutException {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -568,13 +703,13 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -583,13 +718,13 @@ class ShareCoordinatorServiceTest {
)),
new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(1)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -609,13 +744,13 @@ class ShareCoordinatorServiceTest {
HashSet<WriteShareGroupStateResponseData.WriteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not
available.")))));
@@ -647,14 +782,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@@ -672,13 +807,13 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateResponseData.ReadStateResult>
expectedResult = new HashSet<>(Arrays.asList(
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not
available.")))));
@@ -710,14 +845,14 @@ class ShareCoordinatorServiceTest {
.setTopics(Arrays.asList(
new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition1)
.setLeaderEpoch(1)
)),
new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition2)
.setLeaderEpoch(1)
@@ -735,13 +870,74 @@ class ShareCoordinatorServiceTest {
HashSet<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>
expectedResult = new HashSet<>(Arrays.asList(
new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId2)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition2)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not available."))),
new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId1)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition1)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage("Share coordinator is not
available.")))));
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void testDeleteStateWhenNotStarted() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ String groupId = "group1";
+ Uuid topicId1 = Uuid.randomUuid();
+ int partition1 = 0;
+
+ Uuid topicId2 = Uuid.randomUuid();
+ int partition2 = 1;
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(Arrays.asList(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition1)
+ )),
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition2)
+ ))
+ )
+ );
+
+ CompletableFuture<DeleteShareGroupStateResponseData> future =
service.deleteState(
+ requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+ request
+ );
+
+ HashSet<DeleteShareGroupStateResponseData.DeleteStateResult> result =
new HashSet<>(future.get(5, TimeUnit.SECONDS).results());
+
+ HashSet<DeleteShareGroupStateResponseData.DeleteStateResult>
expectedResult = new HashSet<>(Arrays.asList(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition2)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage("Share coordinator is not available."))),
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition1)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Share coordinator is not
available.")))));
@@ -771,23 +967,23 @@ class ShareCoordinatorServiceTest {
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
assertEquals(new WriteShareGroupStateResponseData()
- .setResults(Collections.singletonList(new
WriteShareGroupStateResponseData.WriteStateResult()
+ .setResults(List.of(new
WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage("Unable to write share group state:
This server does not host this topic-partition."))))),
service.writeState(
requestContext(ApiKeys.WRITE_SHARE_GROUP_STATE),
new WriteShareGroupStateRequestData().setGroupId(groupId)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicId)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(1)
.setStartOffset(1)
.setStateEpoch(1)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(2)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -821,18 +1017,18 @@ class ShareCoordinatorServiceTest {
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
assertEquals(new ReadShareGroupStateResponseData()
- .setResults(Collections.singletonList(new
ReadShareGroupStateResponseData.ReadStateResult()
+ .setResults(List.of(new
ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage("Unable to read share group state:
The server experienced an unexpected error when processing the request."))))),
service.readState(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE),
new ReadShareGroupStateRequestData().setGroupId(groupId)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(1)
))
@@ -864,18 +1060,18 @@ class ShareCoordinatorServiceTest {
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
assertEquals(new ReadShareGroupStateSummaryResponseData()
- .setResults(Collections.singletonList(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setResults(List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage("Unable to read share group state
summary: The server experienced an unexpected error when processing the
request."))))),
service.readStateSummary(
requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY),
new ReadShareGroupStateSummaryRequestData().setGroupId(groupId)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(topicId)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(1)
))
@@ -884,6 +1080,48 @@ class ShareCoordinatorServiceTest {
);
}
+ @Test
+ public void testDeleteFutureReturnsError() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ ShareCoordinatorService service = new ShareCoordinatorService(
+ new LogContext(),
+ ShareCoordinatorTestConfig.testConfig(),
+ runtime,
+ new ShareCoordinatorMetrics(),
+ Time.SYSTEM,
+ mock(Timer.class),
+ mock(PartitionWriter.class)
+ );
+
+ service.startup(() -> 1);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+
+ when(runtime.scheduleWriteOperation(any(), any(), any(), any()))
+
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
+
+ assertEquals(new DeleteShareGroupStateResponseData()
+ .setResults(List.of(new
DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage("Unable to delete share group state:
This server does not host this topic-partition."))))),
+ service.deleteState(
+ requestContext(ApiKeys.DELETE_SHARE_GROUP_STATE),
+ new DeleteShareGroupStateRequestData().setGroupId(groupId)
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ))
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
+
@Test
public void testTopicPartitionFor() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index fcd6933f1e3..7ebb5ce3954 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
@@ -26,6 +28,7 @@ import
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@@ -40,6 +43,7 @@ import
org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.SharePartitionKey;
@@ -58,10 +62,12 @@ import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -957,6 +963,216 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
+ @Test
+ public void testDeleteStateSuccess() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+
+ SharePartitionKey shareCoordinatorKey =
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(Collections.singletonList(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(Collections.singletonList(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)))));
+
+ CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
+
+ // apply a record in to verify delete
+ CoordinatorRecord record =
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID,
+ TOPIC_ID,
+ PARTITION,
+ new ShareGroupOffset.Builder()
+ .setSnapshotEpoch(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(0)
+ .setStateBatches(List.of(
+ new PersisterStateBatch(
+ 0,
+ 10,
+ (byte) 0,
+ (short) 1
+ )
+ )
+ )
+ .build()
+ );
+ shard.replay(0L, 0L, (short) 0, record);
+ assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
+ assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
+ assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+ // apply tombstone
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+ List<CoordinatorRecord> expectedRecords = List.of(
+ ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+ GROUP_ID, TOPIC_ID, PARTITION)
+ );
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+
+ assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+ assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+ assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+ }
+
+ @Test
+ public void testDeleteStateFirstRecordDeleteSuccess() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+
+ SharePartitionKey shareCoordinatorKey =
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(Collections.singletonList(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(Collections.singletonList(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)))));
+
+ CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
+
+ assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+ assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+ assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+ // apply tombstone
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+ List<CoordinatorRecord> expectedRecords = List.of(
+ ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+ GROUP_ID, TOPIC_ID, PARTITION)
+ );
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+
+ assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+ assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+ assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+ }
+
+ @Test
+ public void testDeleteStateInvalidRequestData() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+
+ // invalid partition
+ int partition = -1;
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)))));
+
+ CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
+
+ DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, partition, Errors.INVALID_REQUEST,
ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ assertEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testDeleteNullMetadataImage() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ shard.onNewMetadataImage(null, null);
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0)))));
+
+ CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
+
+ DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testDeleteTopicIdNonExistentInMetadataImage() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0)))));
+
+ // topic id not found in cache
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(
+ null
+ );
+ when(image.topics()).thenReturn(
+ topicsImage
+ );
+ CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
+
+ DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+ }
+
+ @Test
+ public void testDeletePartitionIdNonExistentInMetadataImage() {
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ MetadataImage image = mock(MetadataImage.class);
+ shard.onNewMetadataImage(image, null);
+
+ DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0)))));
+
+ // topic id found in cache
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ when(topicsImage.getTopic(eq(TOPIC_ID))).thenReturn(
+ mock(TopicImage.class)
+ );
+ when(image.topics()).thenReturn(
+ topicsImage
+ );
+
+ // partition id not found
+ when(topicsImage.getPartition(eq(TOPIC_ID), eq(0))).thenReturn(
+ null
+ );
+ CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
+
+ DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toErrorResponseData(
+ TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+ List<CoordinatorRecord> expectedRecords = List.of();
+
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+ verify(topicsImage, times(1)).getTopic(eq(TOPIC_ID));
+ verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
+ }
+
private static ShareGroupOffset groupOffset(ApiMessage record) {
if (record instanceof ShareSnapshotValue) {
return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);