This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0bd1ff936f8 KAFKA-18629: Add persister impl and tests for
DeleteShareGroupState RPC. [2/N] (#18748)
0bd1ff936f8 is described below
commit 0bd1ff936f81f21d4b7bf21563d4e8e7a2934527
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Feb 5 20:21:19 2025 +0530
KAFKA-18629: Add persister impl and tests for DeleteShareGroupState RPC.
[2/N] (#18748)
Reviewers: Andrew Schofield <[email protected]>
---
checkstyle/suppressions.xml | 2 +
.../requests/DeleteShareGroupStateRequest.java | 2 +-
.../share/persister/DefaultStatePersister.java | 103 ++-
.../share/persister/PersisterStateManager.java | 165 +++++
.../share/persister/DefaultStatePersisterTest.java | 311 +++++++++
.../share/persister/PersisterStateManagerTest.java | 725 +++++++++++++++++++++
6 files changed, 1306 insertions(+), 2 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 67c8a77022c..d4b357396f6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -347,6 +347,8 @@
files="(ShareCoordinatorServiceTest|DefaultStatePersisterTest|PersisterStateManagerTest).java"/>
<suppress checks="CyclomaticComplexity"
files="ShareCoordinatorShard.java"/>
+ <suppress checks="ClassFanOutComplexity"
+
files="(PersisterStateManagerTest|DefaultStatePersisterTest).java"/>
<!-- storage -->
<suppress checks="CyclomaticComplexity"
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
index 074ace1cfea..d66785c682a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
@@ -34,7 +34,7 @@ public class DeleteShareGroupStateRequest extends
AbstractRequest {
private final DeleteShareGroupStateRequestData data;
public Builder(DeleteShareGroupStateRequestData data) {
- this(data, false);
+ this(data, true);
}
public Builder(DeleteShareGroupStateRequestData data, boolean
enableUnstableLastVersion) {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 3b6db31b657..fdd6c7cb5da 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
@@ -280,7 +281,46 @@ public class DefaultStatePersister implements Persister {
* @return A completable future of DeleteShareGroupStateResult
*/
public CompletableFuture<DeleteShareGroupStateResult>
deleteState(DeleteShareGroupStateParameters request) {
- throw new RuntimeException("not implemented");
+ try {
+ validate(request);
+ } catch (Exception e) {
+ log.error("Unable to validate delete state request", e);
+ return CompletableFuture.failedFuture(e);
+ }
+ GroupTopicPartitionData<PartitionIdData> gtp =
request.groupTopicPartitionData();
+ String groupId = gtp.groupId();
+
+ Map<Uuid, Map<Integer,
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap = new HashMap<>();
+ List<PersisterStateManager.DeleteStateHandler> handlers = new
ArrayList<>();
+
+ gtp.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ CompletableFuture<DeleteShareGroupStateResponse> future =
futureMap
+ .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+ .computeIfAbsent(partitionData.partition(), k -> new
CompletableFuture<>());
+
+ handlers.add(
+ stateManager.new DeleteStateHandler(
+ groupId,
+ topicData.topicId(),
+ partitionData.partition(),
+ future,
+ null
+ )
+ );
+ });
+ });
+
+ for (PersisterStateManager.PersisterStateManagerHandler handler :
handlers) {
+ stateManager.enqueue(handler);
+ }
+
+ CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+ handlers.stream()
+ .map(PersisterStateManager.DeleteStateHandler::result)
+ .toArray(CompletableFuture[]::new));
+
+ return combinedFuture.thenApply(v ->
deleteResponsesToResult(futureMap));
}
/**
@@ -384,6 +424,55 @@ public class DefaultStatePersister implements Persister {
.build();
}
+ /**
+ * Takes in a list of COMPLETED futures and combines the results,
+ * taking care of errors if any, into a single DeleteShareGroupStateResult
+ *
+ * @param futureMap - HashMap of {topic -> {partition -> future}}
+ * @return Object representing combined result of type
DeleteShareGroupStateResult
+ */
+ // visible for testing
+ DeleteShareGroupStateResult deleteResponsesToResult(
+ Map<Uuid, Map<Integer,
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap
+ ) {
+ List<TopicData<PartitionErrorData>> topicsData =
futureMap.keySet().stream()
+ .map(topicId -> {
+ List<PartitionErrorData> partitionErrorData =
futureMap.get(topicId).entrySet().stream()
+ .map(partitionFuture -> {
+ int partition = partitionFuture.getKey();
+ CompletableFuture<DeleteShareGroupStateResponse>
future = partitionFuture.getValue();
+ try {
+ // already completed because of allOf call in the
caller
+ DeleteShareGroupStateResponse partitionResponse =
future.join();
+ return
partitionResponse.data().results().get(0).partitions().stream()
+ .map(partitionResult ->
PartitionFactory.newPartitionErrorData(
+ partitionResult.partition(),
+ partitionResult.errorCode(),
+ partitionResult.errorMessage()
+ )
+ )
+ .toList();
+ } catch (Exception e) {
+ log.error("Unexpected exception while getting data
from share coordinator", e);
+ return List.of(
+ PartitionFactory.newPartitionErrorData(
+ partition,
+ Errors.UNKNOWN_SERVER_ERROR.code(), //
No specific public error code exists for InterruptedException /
ExecutionException
+ "Error deleting state from share
coordinator: " + e.getMessage()
+ )
+ );
+ }
+ })
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ return new TopicData<>(topicId, partitionErrorData);
+ })
+ .collect(Collectors.toList());
+ return new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(topicsData)
+ .build();
+ }
+
private static void validate(WriteShareGroupStateParameters params) {
String prefix = "Write share group parameters";
if (params == null) {
@@ -420,6 +509,18 @@ public class DefaultStatePersister implements Persister {
validateGroupTopicPartitionData(prefix,
params.groupTopicPartitionData());
}
+ private static void validate(DeleteShareGroupStateParameters params) {
+ String prefix = "Delete share group parameters";
+ if (params == null) {
+ throw new IllegalArgumentException(prefix + " cannot be null.");
+ }
+ if (params.groupTopicPartitionData() == null) {
+ throw new IllegalArgumentException(prefix + " data cannot be
null.");
+ }
+
+ validateGroupTopicPartitionData(prefix,
params.groupTopicPartitionData());
+ }
+
private static void validateGroupTopicPartitionData(String prefix,
GroupTopicPartitionData<? extends PartitionIdData> data) {
String groupId = data.groupId();
if (groupId == null || groupId.isEmpty()) {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 5c9028b87e8..21e00be29f8 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
@@ -36,6 +38,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
@@ -875,6 +879,144 @@ public class PersisterStateManager {
}
}
+ public class DeleteStateHandler extends PersisterStateManagerHandler {
+ private final CompletableFuture<DeleteShareGroupStateResponse> result;
+ private final BackoffManager deleteStateBackoff;
+
+ public DeleteStateHandler(
+ String groupId,
+ Uuid topicId,
+ int partition,
+ CompletableFuture<DeleteShareGroupStateResponse> result,
+ long backoffMs,
+ long backoffMaxMs,
+ int maxRPCRetryAttempts
+ ) {
+ super(groupId, topicId, partition, backoffMs, backoffMaxMs,
maxRPCRetryAttempts);
+ this.result = result;
+ this.deleteStateBackoff = new BackoffManager(maxRPCRetryAttempts,
backoffMs, backoffMaxMs);
+ }
+
+ public DeleteStateHandler(
+ String groupId,
+ Uuid topicId,
+ int partition,
+ CompletableFuture<DeleteShareGroupStateResponse> result,
+ Consumer<ClientResponse> onCompleteCallback
+ ) {
+ this(
+ groupId,
+ topicId,
+ partition,
+ result,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_FIND_COORD_ATTEMPTS
+ );
+ }
+
+ @Override
+ protected String name() {
+ return "DeleteStateHandler";
+ }
+
+ @Override
+ protected AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder() {
+ throw new RuntimeException("Delete requests are batchable, hence
individual requests not needed.");
+ }
+
+ @Override
+ protected boolean isResponseForRequest(ClientResponse response) {
+ return response.requestHeader().apiKey() ==
ApiKeys.DELETE_SHARE_GROUP_STATE;
+ }
+
+ @Override
+ protected void handleRequestResponse(ClientResponse response) {
+ log.debug("Delete state response received - {}", response);
+ deleteStateBackoff.incrementAttempt();
+
+ // response can be a combined one for large number of requests
+ // we need to deconstruct it
+ DeleteShareGroupStateResponse combinedResponse =
(DeleteShareGroupStateResponse) response.responseBody();
+
+ for (DeleteShareGroupStateResponseData.DeleteStateResult
deleteStateResult : combinedResponse.data().results()) {
+ if
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
+
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
+ deleteStateResult.partitions().stream()
+ .filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
+ .findFirst();
+
+ if (partitionStateData.isPresent()) {
+ Errors error =
Errors.forCode(partitionStateData.get().errorCode());
+ switch (error) {
+ case NONE:
+ deleteStateBackoff.resetAttempts();
+
DeleteShareGroupStateResponseData.DeleteStateResult result =
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
+ partitionKey().topicId(),
+ List.of(partitionStateData.get())
+ );
+ this.result.complete(new
DeleteShareGroupStateResponse(
+ new
DeleteShareGroupStateResponseData().setResults(List.of(result))));
+ return;
+
+ // check retriable errors
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ log.warn("Received retriable error in delete
state RPC for key {}: {}", partitionKey(), error.message());
+ if (!deleteStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries for
delete state RPC for key {} without success.", partitionKey());
+ deleteStateErrorResponse(error, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform delete state RPC
for key {}: {}", partitionKey(), error.message());
+ deleteStateErrorResponse(error, null);
+ return;
+ }
+ }
+ }
+ }
+
+ // no response found specific topic partition
+ IllegalStateException exception = new IllegalStateException(
+ "Failed to delete state for share partition: " + partitionKey()
+ );
+ deleteStateErrorResponse(Errors.forException(exception),
exception);
+ }
+
+ private void deleteStateErrorResponse(Errors error, Exception
exception) {
+ this.result.complete(new DeleteShareGroupStateResponse(
+
DeleteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(),
partitionKey().partition(), error, "Error in delete state RPC. " +
+ (exception == null ? error.message() :
exception.getMessage()))));
+ }
+
+ @Override
+ protected void findCoordinatorErrorResponse(Errors error, Exception
exception) {
+ this.result.complete(new DeleteShareGroupStateResponse(
+
DeleteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(),
partitionKey().partition(), error, "Error in find coordinator. " +
+ (exception == null ? error.message() :
exception.getMessage()))));
+ }
+
+ protected CompletableFuture<DeleteShareGroupStateResponse> result() {
+ return result;
+ }
+
+ @Override
+ protected boolean isBatchable() {
+ return true;
+ }
+
+ @Override
+ protected RPCType rpcType() {
+ return RPCType.DELETE;
+ }
+ }
+
private class SendThread extends InterBrokerSendThread {
private final ConcurrentLinkedQueue<PersisterStateManagerHandler>
queue = new ConcurrentLinkedQueue<>();
private final Random random;
@@ -1059,6 +1201,8 @@ public class PersisterStateManager {
return coalesceReads(groupId, handlers);
case SUMMARY:
return coalesceReadSummaries(groupId, handlers);
+ case DELETE:
+ return coalesceDeletes(groupId, handlers);
default:
throw new RuntimeException("Unknown rpc type: " + rpcType);
}
@@ -1140,5 +1284,26 @@ public class PersisterStateManager {
true
);
}
+
+ private static AbstractRequest.Builder<? extends AbstractRequest>
coalesceDeletes(String groupId, List<? extends PersisterStateManagerHandler>
handlers) {
+ Map<Uuid, List<DeleteShareGroupStateRequestData.PartitionData>>
partitionData = new HashMap<>();
+ handlers.forEach(persHandler -> {
+ assert persHandler instanceof DeleteStateHandler;
+ DeleteStateHandler handler = (DeleteStateHandler) persHandler;
+
partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new
LinkedList<>())
+ .add(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(handler.partitionKey().partition())
+ );
+ });
+
+ return new DeleteShareGroupStateRequest.Builder(new
DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(partitionData.entrySet().stream()
+ .map(entry -> new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(entry.getKey())
+ .setPartitions(entry.getValue()))
+ .toList()));
+ }
}
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 88c83c88914..50ffcf90f81 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -22,12 +22,15 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
@@ -352,6 +355,81 @@ class DefaultStatePersisterTest {
assertFutureThrows(result, IllegalArgumentException.class);
}
+ @Test
+ public void testDeleteStateValidate() {
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+ int incorrectPartition = -1;
+
+ // Request Parameters are null
+ DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
+ CompletableFuture<DeleteShareGroupStateResult> result =
defaultStatePersister.deleteState(null);
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
+
+ // groupTopicPartitionData is null
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.deleteState(new
DeleteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
+
+ // groupId is null
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.deleteState(new
DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(null).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
+
+ // topicsData is empty
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.deleteState(new
DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(groupId)
+ .setTopicsData(List.of()).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
+
+ // topicId is null
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.deleteState(new
DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(groupId)
+ .setTopicsData(List.of(new TopicData<>(null,
+ List.of(PartitionFactory.newPartitionStateBatchData(
+ partition, 1, 0, 0, null))))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
+
+ // partitionData is empty
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.deleteState(new
DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(groupId)
+ .setTopicsData(List.of(new TopicData<>(topicId,
List.of()))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
+
+ // partition value is incorrect
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.deleteState(new
DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(groupId)
+ .setTopicsData(List.of(new TopicData<>(topicId,
+ List.of(PartitionFactory.newPartitionIdData(
+ incorrectPartition))))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
+ }
+
@Test
public void testWriteStateSuccess() {
@@ -789,6 +867,135 @@ class DefaultStatePersisterTest {
assertEquals(expectedResultMap, resultMap);
}
+ @Test
+ public void testDeleteStateSuccess() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId1 = Uuid.randomUuid();
+ int partition1 = 10;
+
+ Uuid topicId2 = Uuid.randomUuid();
+ int partition2 = 8;
+
+ Node suppliedNode = new Node(0, HOST, PORT);
+ Node coordinatorNode1 = new Node(5, HOST, PORT);
+ Node coordinatorNode2 = new Node(6, HOST, PORT);
+
+ String coordinatorKey1 = SharePartitionKey.asCoordinatorKey(groupId,
topicId1, partition1);
+ String coordinatorKey2 = SharePartitionKey.asCoordinatorKey(groupId,
topicId2, partition2);
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey1),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(List.of(
+ new FindCoordinatorResponseData.Coordinator()
+ .setNodeId(coordinatorNode1.id())
+ .setHost(HOST)
+ .setPort(PORT)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ),
+ suppliedNode
+ );
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey2),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(List.of(
+ new FindCoordinatorResponseData.Coordinator()
+ .setNodeId(coordinatorNode2.id())
+ .setHost(HOST)
+ .setPort(PORT)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ),
+ suppliedNode
+ );
+
+ client.prepareResponseFrom(
+ body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId ==
topicId1 && requestPartition == partition1;
+ },
+ new
DeleteShareGroupStateResponse(DeleteShareGroupStateResponse.toResponseData(topicId1,
partition1)),
+ coordinatorNode1
+ );
+
+ client.prepareResponseFrom(
+ body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId ==
topicId2 && requestPartition == partition2;
+ },
+ new
DeleteShareGroupStateResponse(DeleteShareGroupStateResponse.toResponseData(topicId2,
partition2)),
+ coordinatorNode2
+ );
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getDefaultCacheHelper(suppliedNode);
+
+ DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder()
+ .withKafkaClient(client)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ DeleteShareGroupStateParameters request =
DeleteShareGroupStateParameters.from(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition1)
+ )),
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition2)
+ ))
+ ))
+ );
+
+ CompletableFuture<DeleteShareGroupStateResult> resultFuture =
defaultStatePersister.deleteState(request);
+
+ DeleteShareGroupStateResult result = null;
+ try {
+ // adding long delay to allow for environment/GC issues
+ result = resultFuture.get(10L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ fail("Unexpected exception", e);
+ }
+
+ HashSet<PartitionData> resultMap = new HashSet<>();
+ result.topicsData().forEach(
+ topicData -> topicData.partitions().forEach(
+ partitionData -> resultMap.add((PartitionData) partitionData)
+ )
+ );
+
+
+ HashSet<PartitionData> expectedResultMap = new HashSet<>();
+ expectedResultMap.add((PartitionData)
PartitionFactory.newPartitionErrorData(partition1, Errors.NONE.code(), null));
+
+ expectedResultMap.add((PartitionData)
PartitionFactory.newPartitionErrorData(partition2, Errors.NONE.code(), null));
+
+ assertEquals(2, result.topicsData().size());
+ assertEquals(expectedResultMap, resultMap);
+ }
+
@Test
public void testWriteStateResponseToResultPartialResults() {
Map<Uuid, Map<Integer,
CompletableFuture<WriteShareGroupStateResponse>>> futureMap = new HashMap<>();
@@ -1111,6 +1318,110 @@ class DefaultStatePersisterTest {
);
}
+ @Test
+ public void testDeleteStateResponseToResultPartialResults() {
+ Map<Uuid, Map<Integer,
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap = new HashMap<>();
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1,
null);
+ TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1,
null);
+
+ // one entry has valid results
+ futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>())
+ .put(tp1.partition(), CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResponse(
+ DeleteShareGroupStateResponse.toResponseData(
+ tp1.topicId(),
+ tp1.partition()
+ )
+ )
+ )
+ );
+
+ // one entry has error
+ futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>())
+ .put(tp2.partition(), CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResponse(
+ DeleteShareGroupStateResponse.toErrorResponseData(
+ tp2.topicId(),
+ tp2.partition(),
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ "unknown tp"
+ )
+ )
+ )
+ );
+
+ PersisterStateManager psm = mock(PersisterStateManager.class);
+ DefaultStatePersister dsp = new DefaultStatePersister(psm);
+
+ DeleteShareGroupStateResult results =
dsp.deleteResponsesToResult(futureMap);
+
+ // results should contain partial results
+ assertEquals(2, results.topicsData().size());
+ assertTrue(
+ results.topicsData().contains(
+ new TopicData<>(
+ tp1.topicId(),
+
List.of(PartitionFactory.newPartitionErrorData(tp1.partition(),
Errors.NONE.code(), null))
+ )
+ )
+ );
+ assertTrue(
+ results.topicsData().contains(
+ new TopicData<>(
+ tp2.topicId(),
+
List.of(PartitionFactory.newPartitionErrorData(tp2.partition(),
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
+ )
+ )
+ );
+ }
+
+ @Test
+ public void testDeleteStateResponseToResultFailedFuture() {
+ Map<Uuid, Map<Integer,
CompletableFuture<DeleteShareGroupStateResponse>>> futureMap = new HashMap<>();
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1,
null);
+ TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1,
null);
+
+ // one entry has valid results
+ futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>())
+ .put(tp1.partition(), CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResponse(
+ DeleteShareGroupStateResponse.toResponseData(
+ tp1.topicId(),
+ tp1.partition()
+ )
+ )
+ )
+ );
+
+ // one entry has failed future
+ futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>())
+ .put(tp2.partition(), CompletableFuture.failedFuture(new
Exception("scary stuff")));
+
+ PersisterStateManager psm = mock(PersisterStateManager.class);
+ DefaultStatePersister dsp = new DefaultStatePersister(psm);
+
+ DeleteShareGroupStateResult results =
dsp.deleteResponsesToResult(futureMap);
+
+ // results should contain partial results
+ assertEquals(2, results.topicsData().size());
+ assertTrue(
+ results.topicsData().contains(
+ new TopicData<>(
+ tp1.topicId(),
+
List.of(PartitionFactory.newPartitionErrorData(tp1.partition(),
Errors.NONE.code(), null))
+ )
+ )
+ );
+ assertTrue(
+ results.topicsData().contains(
+ new TopicData<>(
+ tp2.topicId(),
+
List.of(PartitionFactory.newPartitionErrorData(tp2.partition(),
Errors.UNKNOWN_SERVER_ERROR.code(), "Error deleting state from share
coordinator: java.lang.Exception: scary stuff"))
+ )
+ )
+ );
+ }
+
@Test
public void testDefaultPersisterClose() {
PersisterStateManager psm = mock(PersisterStateManager.class);
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index afa9b07aa17..61a477b1203 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -22,12 +22,15 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
@@ -2843,6 +2846,728 @@ class PersisterStateManagerTest {
}
}
+ @Test
+ public void testDeleteStateRequestCoordinatorFoundSuccessfully() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node suppliedNode = new Node(0, HOST, PORT);
+ Node coordinatorNode = new Node(1, HOST, PORT);
+
+ String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(Collections.singletonList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setNodeId(1)
+ .setHost(HOST)
+ .setPort(PORT)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ),
+ suppliedNode
+ );
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getDefaultCacheHelper(suppliedNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_RPC_RETRY_ATTEMPTS
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<DeleteShareGroupStateResponse> resultFuture =
handler.result();
+
+ DeleteShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ DeleteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ verify(handler, times(1)).findShareCoordinatorBuilder();
+ verify(handler, times(0)).requestBuilder();
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
+ @Test
+ public void
testDeleteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws
InterruptedException, ExecutionException {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node suppliedNode = new Node(0, HOST, PORT);
+ Node coordinatorNode = new Node(1, HOST, PORT);
+
+ String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(Collections.singletonList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setErrorCode(Errors.NOT_COORDINATOR.code())
+ ))
+ ),
+ suppliedNode
+ );
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(Collections.singletonList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setNodeId(1)
+ .setHost(HOST)
+ .setPort(PORT)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ),
+ suppliedNode
+ );
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getDefaultCacheHelper(suppliedNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_RPC_RETRY_ATTEMPTS
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<DeleteShareGroupStateResponse> resultFuture =
handler.result();
+
+ TestUtils.waitForCondition(resultFuture::isDone,
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future");
+
+ DeleteShareGroupStateResponse result = resultFuture.get();
+ DeleteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ verify(handler, times(2)).findShareCoordinatorBuilder();
+ verify(handler, times(0)).requestBuilder();
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned is correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
+ @Test
+ public void testDeleteStateRequestCoordinatorFoundOnRetry() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node suppliedNode = new Node(0, HOST, PORT);
+ Node coordinatorNode = new Node(1, HOST, PORT);
+
+ String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(Collections.singletonList(
+ new FindCoordinatorResponseData.Coordinator()
+
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ ))
+ ),
+ suppliedNode
+ );
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setCoordinators(Collections.singletonList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setNodeId(1)
+ .setHost(HOST)
+ .setPort(PORT)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ),
+ suppliedNode
+ );
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getDefaultCacheHelper(suppliedNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_RPC_RETRY_ATTEMPTS
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<DeleteShareGroupStateResponse> resultFuture =
handler.result();
+
+ DeleteShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ DeleteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ verify(handler, times(2)).findShareCoordinatorBuilder();
+ verify(handler, times(0)).requestBuilder();
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
+ @Test
+ public void testDeleteStateRequestWithCoordinatorNodeLookup() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_RPC_RETRY_ATTEMPTS
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<DeleteShareGroupStateResponse> resultFuture =
handler.result();
+
+ DeleteShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ DeleteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ verify(handler, times(0)).findShareCoordinatorBuilder();
+ verify(handler, times(0)).requestBuilder();
+ verify(handler, times(1)).onComplete(any());
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
+ @Test
+ public void testDeleteStateRequestWithRetryAndCoordinatorNodeLookup() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_RPC_RETRY_ATTEMPTS
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<DeleteShareGroupStateResponse> resultFuture =
handler.result();
+
+ DeleteShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ DeleteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ verify(handler, times(0)).findShareCoordinatorBuilder();
+ verify(handler, times(0)).requestBuilder();
+ verify(handler, times(2)).onComplete(any());
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode());
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
+ @Test
+ public void testDeleteStateRequestFailedMaxRetriesExhausted() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ 2
+ ));
+
+ stateManager.enqueue(handler);
+
+ CompletableFuture<DeleteShareGroupStateResponse> resultFuture =
handler.result();
+
+ DeleteShareGroupStateResponse result = null;
+ try {
+ result = resultFuture.get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ DeleteShareGroupStateResponseData.PartitionResult partitionResult =
result.data().results().get(0).partitions().get(0);
+
+ verify(handler, times(0)).findShareCoordinatorBuilder();
+ verify(handler, times(0)).requestBuilder();
+ verify(handler, times(2)).onComplete(any());
+
+ // Verifying the coordinator node was populated correctly by the
FIND_COORDINATOR request
+ assertEquals(coordinatorNode, handler.getCoordinatorNode());
+
+ // Verifying the result returned in correct
+ assertEquals(partition, partitionResult.partition());
+ assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(),
partitionResult.errorCode());
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
+ @Test
+ public void testDeleteStateRequestBatchingWithCoordinatorNodeLookup()
throws Exception {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node coordinatorNode = new Node(1, HOST, PORT);
+
+ client.prepareResponseFrom(body -> {
+ DeleteShareGroupStateRequest request =
(DeleteShareGroupStateRequest) body;
+ String requestGroupId = request.data().groupId();
+ Uuid requestTopicId = request.data().topics().get(0).topicId();
+ int requestPartition =
request.data().topics().get(0).partitions().get(0).partition();
+
+ return requestGroupId.equals(groupId) && requestTopicId == topicId
&& requestPartition == partition;
+ }, new DeleteShareGroupStateResponse(
+ new DeleteShareGroupStateResponseData()
+ .setResults(List.of(
+ new DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage("")
+ ))
+ ))
+ ), coordinatorNode);
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getCoordinatorCacheHelper(coordinatorNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ AtomicBoolean isBatchingSuccess = new AtomicBoolean(false);
+ stateManager.setGenerateCallback(() -> {
+ Map<PersisterStateManager.RPCType, Map<String,
List<PersisterStateManager.PersisterStateManagerHandler>>> handlersPerType =
stateManager.nodeRPCMap().get(coordinatorNode);
+ if (handlersPerType != null &&
handlersPerType.containsKey(PersisterStateManager.RPCType.DELETE) &&
handlersPerType.get(PersisterStateManager.RPCType.DELETE).containsKey(groupId))
{
+ if
(handlersPerType.get(PersisterStateManager.RPCType.DELETE).get(groupId).size()
> 2)
+ isBatchingSuccess.set(true);
+ }
+ });
+
+ stateManager.start();
+
+ CompletableFuture<DeleteShareGroupStateResponse> future = new
CompletableFuture<>();
+
+ List<PersisterStateManager.DeleteStateHandler> handlers = new
ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ PersisterStateManager.DeleteStateHandler handler =
spy(stateManager.new DeleteStateHandler(
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_RPC_RETRY_ATTEMPTS
+ ));
+ handlers.add(handler);
+ stateManager.enqueue(handler);
+ }
+
+ CompletableFuture.allOf(handlers.stream()
+
.map(PersisterStateManager.DeleteStateHandler::result).toArray(CompletableFuture[]::new)).get();
+
+ TestUtils.waitForCondition(isBatchingSuccess::get,
TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "unable to verify batching");
+ }
+
@Test
public void testPersisterStateManagerClose() {
KafkaClient client = mock(KafkaClient.class);