This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 89418b66aeb KAFKA-17442: Handled persister errors with write async 
calls (KIP-932) (#16956)
89418b66aeb is described below

commit 89418b66aeb3e004e6ef16cd42f2f929d48a6718
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Sep 2 00:36:26 2024 +0100

    KAFKA-17442: Handled persister errors with write async calls (KIP-932) 
(#16956)
    
    The PR makes the persister write RPC async. Also handles the errors from 
persister as per the review comment here:
    Addressing review comment for PR: #16397 (comment)
    
    Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit 
<[email protected]>, Jun Rao <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 197 ++++++----
 .../kafka/server/share/SharePartitionManager.java  |  24 +-
 .../server/share/SharePartitionManagerTest.java    |  19 +-
 .../kafka/server/share/SharePartitionTest.java     | 423 ++++++++++++++-------
 4 files changed, 440 insertions(+), 223 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index cb507f6a8be..7823b76a80a 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -16,10 +16,15 @@
  */
 package kafka.server.share;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.FencedStateEpochException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
@@ -36,7 +41,6 @@ import 
org.apache.kafka.server.group.share.ReadShareGroupStateParameters;
 import org.apache.kafka.server.group.share.ReadShareGroupStateResult;
 import org.apache.kafka.server.group.share.TopicData;
 import org.apache.kafka.server.group.share.WriteShareGroupStateParameters;
-import org.apache.kafka.server.group.share.WriteShareGroupStateResult;
 import org.apache.kafka.server.share.ShareAcknowledgementBatch;
 import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.server.util.timer.TimerTask;
@@ -342,9 +346,8 @@ public class SharePartition {
      * in-flight records and the next fetch offset is updated to the next 
offset that should be
      * fetched from the leader.
      *
-     * @param memberId The member id of the client that is fetching the record.
+     * @param memberId           The member id of the client that is fetching 
the record.
      * @param fetchPartitionData The fetched records for the share partition.
-     *
      * @return A future which is completed when the records are acquired.
      */
     public CompletableFuture<List<AcquiredRecords>> acquire(
@@ -463,21 +466,21 @@ public class SharePartition {
      * from the in-flight records once persisted. The next fetch offset is 
updated to the next offset
      * that should be fetched from the leader, if required.
      *
-     * @param memberId The member id of the client that is fetching the record.
+     * @param memberId               The member id of the client that is 
fetching the record.
      * @param acknowledgementBatches The acknowledgement batch list for the 
share partition.
-     *
      * @return A future which is completed when the records are acknowledged.
      */
-    public CompletableFuture<Optional<Throwable>> acknowledge(
+    public CompletableFuture<Void> acknowledge(
         String memberId,
         List<ShareAcknowledgementBatch> acknowledgementBatches
     ) {
         log.trace("Acknowledgement batch request for share partition: {}-{}", 
groupId, topicIdPartition);
 
+        CompletableFuture<Void> future = new CompletableFuture<>();
         Throwable throwable = null;
-        lock.writeLock().lock();
         List<InFlightState> updatedStates = new ArrayList<>();
         List<PersisterStateBatch> stateBatches = new ArrayList<>();
+        lock.writeLock().lock();
         try {
             // Avoided using enhanced for loop as need to check if the last 
batch have offsets
             // in the range.
@@ -528,12 +531,12 @@ public class SharePartition {
 
             // If the acknowledgement is successful then persist state, 
complete the state transition
             // and update the cached state for start offset. Else rollback the 
state transition.
-            rollbackOrProcessStateUpdates(throwable, updatedStates, 
stateBatches);
+            rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         } finally {
             lock.writeLock().unlock();
         }
 
-        return 
CompletableFuture.completedFuture(Optional.ofNullable(throwable));
+        return future;
     }
 
     /**
@@ -541,17 +544,17 @@ public class SharePartition {
      * that should be fetched from the leader.
      *
      * @param memberId The member id of the client whose records shall be 
released.
-     *
      * @return A future which is completed when the records are released.
      */
-    public CompletableFuture<Optional<Throwable>> 
releaseAcquiredRecords(String memberId) {
+    public CompletableFuture<Void> releaseAcquiredRecords(String memberId) {
         log.trace("Release acquired records request for share partition: {}-{} 
memberId: {}", groupId, topicIdPartition, memberId);
 
+        CompletableFuture<Void> future = new CompletableFuture<>();
         Throwable throwable = null;
-        lock.writeLock().lock();
         List<InFlightState> updatedStates = new ArrayList<>();
         List<PersisterStateBatch> stateBatches = new ArrayList<>();
 
+        lock.writeLock().lock();
         try {
             RecordState recordState = RecordState.AVAILABLE;
             // Iterate over multiple fetched batches. The state can vary per 
offset entry
@@ -584,11 +587,11 @@ public class SharePartition {
 
             // If the release acquired records is successful then persist 
state, complete the state transition
             // and update the cached state for start offset. Else rollback the 
state transition.
-            rollbackOrProcessStateUpdates(throwable, updatedStates, 
stateBatches);
+            rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         } finally {
             lock.writeLock().unlock();
         }
-        return 
CompletableFuture.completedFuture(Optional.ofNullable(throwable));
+        return future;
     }
 
     private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String 
memberId,
@@ -880,14 +883,27 @@ public class SharePartition {
         }
 
         TopicData<PartitionAllData> state = response.topicsData().get(0);
-        if (state.topicId() != topicIdPartition.topicId() || 
state.partitions().size() != 1
-            || state.partitions().get(0).partition() != 
topicIdPartition.partition()) {
+        if (state.topicId() != topicIdPartition.topicId() || 
state.partitions().size() != 1) {
             log.error("Failed to initialize the share partition: {}-{}. 
Invalid topic partition response: {}.",
                 groupId, topicIdPartition, response);
             throw new IllegalStateException(String.format("Failed to 
initialize the share partition %s-%s", groupId, topicIdPartition));
         }
 
         PartitionAllData partitionData = state.partitions().get(0);
+        if (partitionData.partition() != topicIdPartition.partition()) {
+            log.error("Failed to initialize the share partition: {}-{}. 
Invalid partition response: {}.",
+                groupId, topicIdPartition, partitionData);
+            throw new IllegalStateException(String.format("Failed to 
initialize the share partition %s-%s",
+                groupId, topicIdPartition));
+        }
+
+        if (partitionData.errorCode() != Errors.NONE.code()) {
+            KafkaException ex = fetchPersisterError(partitionData.errorCode(), 
partitionData.errorMessage());
+            log.error("Failed to initialize the share partition: {}-{}. 
Exception occurred: {}.",
+                groupId, topicIdPartition, partitionData);
+            throw ex;
+        }
+
         // Set the state epoch and end offset from the persisted state.
         startOffset = partitionData.startOffset() != -1 ? 
partitionData.startOffset() : 0;
         stateEpoch = partitionData.stateEpoch();
@@ -1338,21 +1354,36 @@ public class SharePartition {
 
     // Visible for testing
     void rollbackOrProcessStateUpdates(
+        CompletableFuture<Void> future,
         Throwable throwable,
         List<InFlightState> updatedStates,
         List<PersisterStateBatch> stateBatches
     ) {
-        if (stateBatches.isEmpty() && updatedStates.isEmpty())
-            return;
-
         lock.writeLock().lock();
         try {
-            if (throwable != null || 
!isWriteShareGroupStateSuccessful(stateBatches)) {
+            if (throwable != null) {
                 // Log in DEBUG to avoid flooding of logs for a faulty client.
                 log.debug("Request failed for updating state, rollback any 
changed state"
                     + " for the share partition: {}-{}", groupId, 
topicIdPartition);
                 updatedStates.forEach(state -> 
state.completeStateTransition(false));
-            } else {
+                future.completeExceptionally(throwable);
+                return;
+            }
+
+            if (stateBatches.isEmpty() && updatedStates.isEmpty()) {
+                future.complete(null);
+                return;
+            }
+
+            writeShareGroupState(stateBatches).whenComplete((result, 
exception) -> {
+                if (exception != null) {
+                    log.error("Failed to write state to persister for the 
share partition: {}-{}",
+                        groupId, topicIdPartition, exception);
+                    updatedStates.forEach(state -> 
state.completeStateTransition(false));
+                    future.completeExceptionally(exception);
+                    return;
+                }
+
                 log.trace("State change request successful for share 
partition: {}-{}",
                     groupId, topicIdPartition);
                 updatedStates.forEach(state -> {
@@ -1362,7 +1393,8 @@ public class SharePartition {
                 });
                 // Update the cached state and start and end offsets after 
acknowledging/releasing the acquired records.
                 maybeUpdateCachedStateAndOffsets();
-            }
+                future.complete(null);
+            });
         } finally {
             lock.writeLock().unlock();
         }
@@ -1494,46 +1526,71 @@ public class SharePartition {
     }
 
     // Visible for testing
-    boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch> 
stateBatches) {
-        WriteShareGroupStateResult response;
-        try {
-            response = persister.writeState(new 
WriteShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
-                    .setGroupId(this.groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(topicIdPartition.topicId(),
-                        
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
-                            topicIdPartition.partition(), stateEpoch, 
startOffset, 0, stateBatches))))
-                    ).build()).build()).get();
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Failed to write the share group state for share 
partition: {}-{}", groupId, topicIdPartition, e);
-            throw new IllegalStateException(String.format("Failed to write the 
share group state for share partition %s-%s",
-                groupId, topicIdPartition), e);
-        }
+    CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> 
stateBatches) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        persister.writeState(new WriteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+                .setGroupId(this.groupId)
+                .setTopicsData(Collections.singletonList(new 
TopicData<>(topicIdPartition.topicId(),
+                    
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
+                        topicIdPartition.partition(), stateEpoch, startOffset, 
0, stateBatches))))
+                ).build()).build())
+            .whenComplete((result, exception) -> {
+                if (exception != null) {
+                    log.error("Failed to write the share group state for share 
partition: {}-{}", groupId, topicIdPartition, exception);
+                    future.completeExceptionally(new 
IllegalStateException(String.format("Failed to write the share group state for 
share partition %s-%s",
+                        groupId, topicIdPartition), exception));
+                    return;
+                }
 
-        if (response == null || response.topicsData() == null || 
response.topicsData().size() != 1) {
-            log.error("Failed to write the share group state for share 
partition: {}-{}. Invalid state found: {}",
-                groupId, topicIdPartition, response);
-            throw new IllegalStateException(String.format("Failed to write the 
share group state for share partition %s-%s",
-                groupId, topicIdPartition));
-        }
+                if (result == null || result.topicsData() == null || 
result.topicsData().size() != 1) {
+                    log.error("Failed to write the share group state for share 
partition: {}-{}. Invalid state found: {}",
+                        groupId, topicIdPartition, result);
+                    future.completeExceptionally(new 
IllegalStateException(String.format("Failed to write the share group state for 
share partition %s-%s",
+                        groupId, topicIdPartition)));
+                    return;
+                }
 
-        TopicData<PartitionErrorData> state = response.topicsData().get(0);
-        if (state.topicId() != topicIdPartition.topicId() || 
state.partitions().size() != 1
-            || state.partitions().get(0).partition() != 
topicIdPartition.partition()) {
-            log.error("Failed to write the share group state for share 
partition: {}-{}. Invalid topic partition response: {}",
-                groupId, topicIdPartition, response);
-            throw new IllegalStateException(String.format("Failed to write the 
share group state for share partition %s-%s",
-                groupId, topicIdPartition));
-        }
+                TopicData<PartitionErrorData> state = 
result.topicsData().get(0);
+                if (state.topicId() != topicIdPartition.topicId() || 
state.partitions().size() != 1
+                    || state.partitions().get(0).partition() != 
topicIdPartition.partition()) {
+                    log.error("Failed to write the share group state for share 
partition: {}-{}. Invalid topic partition response: {}",
+                        groupId, topicIdPartition, result);
+                    future.completeExceptionally(new 
IllegalStateException(String.format("Failed to write the share group state for 
share partition %s-%s",
+                        groupId, topicIdPartition)));
+                    return;
+                }
 
-        PartitionErrorData partitionData = state.partitions().get(0);
-        if (partitionData.errorCode() != Errors.NONE.code()) {
-            Exception exception = 
Errors.forCode(partitionData.errorCode()).exception(partitionData.errorMessage());
-            log.error("Failed to write the share group state for share 
partition: {}-{} due to exception",
-                groupId, topicIdPartition, exception);
-            return false;
+                PartitionErrorData partitionData = state.partitions().get(0);
+                if (partitionData.errorCode() != Errors.NONE.code()) {
+                    KafkaException ex = 
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
+                    log.error("Failed to write the share group state for share 
partition: {}-{} due to exception",
+                        groupId, topicIdPartition, ex);
+                    future.completeExceptionally(ex);
+                    return;
+                }
+                future.complete(null);
+            });
+        return future;
+    }
+
+    private KafkaException fetchPersisterError(short errorCode, String 
errorMessage) {
+        Errors error = Errors.forCode(errorCode);
+        switch (error) {
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                return new CoordinatorNotAvailableException(errorMessage);
+            case GROUP_ID_NOT_FOUND:
+            case UNKNOWN_TOPIC_OR_PARTITION:
+                return new InvalidRequestException(errorMessage);
+            case FENCED_STATE_EPOCH:
+                return new FencedStateEpochException(errorMessage);
+            case FENCED_LEADER_EPOCH:
+                return new NotLeaderOrFollowerException(errorMessage);
+            default:
+                return new UnknownServerException(errorMessage);
         }
-        return true;
     }
 
     private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String 
memberId, long firstOffset, long lastOffset) {
@@ -1554,9 +1611,9 @@ public class SharePartition {
         long lastOffset,
         long delayMs
     ) {
-        AcquisitionLockTimerTask acquistionLockTimerTask = 
acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
-        timer.add(acquistionLockTimerTask);
-        return acquistionLockTimerTask;
+        AcquisitionLockTimerTask acquisitionLockTimerTask = 
acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
+        timer.add(acquisitionLockTimerTask);
+        return acquisitionLockTimerTask;
     }
 
     private AcquisitionLockTimerTask acquisitionLockTimerTask(
@@ -1598,15 +1655,17 @@ public class SharePartition {
                 }
             }
 
-            if (!stateBatches.isEmpty() && 
!isWriteShareGroupStateSuccessful(stateBatches)) {
-
-                // Even if write share group state RPC call fails, we will 
still go ahead with the state transition.
-                log.error("Failed to write the share group state on 
acquisition lock timeout for share partition: {}-{} memberId: {}. " +
-                                "Proceeding with state transition.", groupId, 
topicIdPartition, memberId);
+            if (!stateBatches.isEmpty()) {
+                writeShareGroupState(stateBatches).whenComplete((result, 
exception) -> {
+                    if (exception != null) {
+                        log.error("Failed to write the share group state on 
acquisition lock timeout for share partition: {}-{} memberId: {}",
+                            groupId, topicIdPartition, memberId, exception);
+                    }
+                    // Even if write share group state RPC call fails, we will 
still go ahead with the state transition.
+                    // Update the cached state and start and end offsets after 
releasing the acquisition lock on timeout.
+                    maybeUpdateCachedStateAndOffsets();
+                });
             }
-
-            // Update the cached state and start and end offsets after 
releasing the acquisition lock on timeout.
-            maybeUpdateCachedStateAndOffsets();
         } finally {
             lock.writeLock().unlock();
         }
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index fb1383db966..2b39e969ce6 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -272,14 +272,14 @@ public class SharePartitionManager implements 
AutoCloseable {
         acknowledgeTopics.forEach((topicIdPartition, 
acknowledgePartitionBatches) -> {
             SharePartition sharePartition = 
partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
             if (sharePartition != null) {
-                CompletableFuture<Errors> future = 
sharePartition.acknowledge(memberId, 
acknowledgePartitionBatches).thenApply(throwable -> {
-                    if (throwable.isPresent()) {
-                        return Errors.forException(throwable.get());
+                CompletableFuture<Errors> future = new CompletableFuture<>();
+                sharePartition.acknowledge(memberId, 
acknowledgePartitionBatches).whenComplete((result, throwable) -> {
+                    if (throwable != null) {
+                        future.complete(Errors.forException(throwable));
+                        return;
                     }
-                    acknowledgePartitionBatches.forEach(batch -> {
-                        
batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement);
-                    });
-                    return Errors.NONE;
+                    acknowledgePartitionBatches.forEach(batch -> 
batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement));
+                    future.complete(Errors.NONE);
                 });
                 futures.put(topicIdPartition, future);
             } else {
@@ -337,11 +337,13 @@ public class SharePartitionManager implements 
AutoCloseable {
                 log.error("No share partition found for groupId {} 
topicPartition {} while releasing acquired topic partitions", groupId, 
topicIdPartition);
                 futuresMap.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
             } else {
-                CompletableFuture<Errors> future = 
sharePartition.releaseAcquiredRecords(memberId).thenApply(throwable -> {
-                    if (throwable.isPresent()) {
-                        return Errors.forException(throwable.get());
+                CompletableFuture<Errors> future = new CompletableFuture<>();
+                
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, 
throwable) -> {
+                    if (throwable != null) {
+                        future.complete(Errors.forException(throwable));
+                        return;
                     }
-                    return Errors.NONE;
+                    future.complete(Errors.NONE);
                 });
                 futuresMap.put(topicIdPartition, future);
             }
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index ce778a56898..894ab947f0b 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.server.share.ShareSession;
 import org.apache.kafka.server.share.ShareSessionCache;
 import org.apache.kafka.server.share.ShareSessionContext;
 import org.apache.kafka.server.share.ShareSessionKey;
+import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.MockTimer;
 import org.apache.kafka.server.util.timer.SystemTimer;
 import org.apache.kafka.server.util.timer.SystemTimerReaper;
@@ -1367,9 +1368,9 @@ public class SharePartitionManagerTest {
 
         SharePartition sp1 = mock(SharePartition.class);
         SharePartition sp2 = mock(SharePartition.class);
-        
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
-        
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(CompletableFuture.completedFuture(
-                Optional.of(new InvalidRecordStateException("Unable to release 
acquired records for the batch"))
+        
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(CompletableFuture.completedFuture(null));
+        
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(FutureUtils.failedFuture(
+                new InvalidRecordStateException("Unable to release acquired 
records for the batch")
         ));
 
         ShareSessionCache cache = mock(ShareSessionCache.class);
@@ -1515,7 +1516,7 @@ public class SharePartitionManagerTest {
         TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
         SharePartition sp = mock(SharePartition.class);
 
-        when(sp.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+        when(sp.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(null));
 
         Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
         partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
@@ -1549,9 +1550,9 @@ public class SharePartitionManagerTest {
         SharePartition sp2 = mock(SharePartition.class);
         SharePartition sp3 = mock(SharePartition.class);
 
-        when(sp1.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
-        when(sp2.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
-        when(sp3.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+        when(sp1.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(null));
+        when(sp2.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(null));
+        when(sp3.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(null));
 
         Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
         partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
@@ -1669,8 +1670,8 @@ public class SharePartitionManagerTest {
 
         TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
         SharePartition sp = mock(SharePartition.class);
-        when(sp.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(CompletableFuture.completedFuture(
-                Optional.of(new InvalidRequestException("Member is not the 
owner of batch record"))
+        when(sp.acknowledge(ArgumentMatchers.eq(memberId), 
any())).thenReturn(FutureUtils.failedFuture(
+                new InvalidRequestException("Member is not the owner of batch 
record")
         ));
         Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
         partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 56a783b3977..9754b31c165 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -22,8 +22,12 @@ import kafka.server.share.SharePartition.RecordState;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.FencedStateEpochException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -64,6 +68,7 @@ import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 
 import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -187,6 +192,76 @@ public class SharePartitionTest {
         assertEquals(10, sharePartition.nextFetchOffset());
     }
 
+    @Test
+    public void testInitializeWithErrorPartitionResponse() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+
+        // Mock NOT_COORDINATOR error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.NOT_COORDINATOR.code(), Errors.NOT_COORDINATOR.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(CoordinatorNotAvailableException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+
+        // Mock COORDINATOR_NOT_AVAILABLE error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.COORDINATOR_NOT_AVAILABLE.code(), 
Errors.COORDINATOR_NOT_AVAILABLE.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(CoordinatorNotAvailableException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+
+        // Mock COORDINATOR_LOAD_IN_PROGRESS error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), 
Errors.COORDINATOR_LOAD_IN_PROGRESS.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(CoordinatorNotAvailableException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+
+        // Mock GROUP_ID_NOT_FOUND error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(InvalidRequestException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+
+        // Mock UNKNOWN_TOPIC_OR_PARTITION error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(InvalidRequestException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+
+        // Mock FENCED_STATE_EPOCH error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.FENCED_STATE_EPOCH.code(), Errors.FENCED_STATE_EPOCH.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(FencedStateEpochException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+
+        // Mock FENCED_LEADER_EPOCH error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.FENCED_LEADER_EPOCH.code(), Errors.FENCED_LEADER_EPOCH.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(NotLeaderOrFollowerException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+
+        // Mock UNKNOWN_SERVER_ERROR error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message(),
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        assertThrows(UnknownServerException.class, () -> 
SharePartitionBuilder.builder().withPersister(persister).build());
+    }
+
     @Test
     public void testInitializeWithInvalidStartOffsetStateBatches() {
         Persister persister = Mockito.mock(Persister.class);
@@ -497,11 +572,11 @@ public class SharePartitionTest {
         assertFalse(result.isCompletedExceptionally());
         assertEquals(1, result.join().size());
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(1, 1, 
Collections.singletonList((byte) 1))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(2, sharePartition.nextFetchOffset());
         assertEquals(2, sharePartition.cachedState().size());
@@ -522,11 +597,11 @@ public class SharePartitionTest {
         assertFalse(result.isCompletedExceptionally());
         assertEquals(1, result.join().size());
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(5, 14, 
Collections.singletonList((byte) 1))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(15, sharePartition.nextFetchOffset());
         assertEquals(0, sharePartition.cachedState().size());
@@ -558,7 +633,7 @@ public class SharePartitionTest {
         assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), 
result.join().toArray());
         assertEquals(19, sharePartition.nextFetchOffset());
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Arrays.asList(
                 new ShareAcknowledgementBatch(5, 6, 
Collections.singletonList((byte) 2)),
@@ -567,8 +642,8 @@ public class SharePartitionTest {
                     (byte) 2, (byte) 2, (byte) 0,
                     (byte) 0, (byte) 0, (byte) 1
                 ))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(2, sharePartition.cachedState().size());
@@ -622,7 +697,7 @@ public class SharePartitionTest {
         assertEquals(21, sharePartition.nextFetchOffset());
 
         // Acknowledging over subset of both batch with subset of gap offsets.
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(6, 18, 
Arrays.asList(
                 (byte) 1, (byte) 1, (byte) 1,
@@ -630,8 +705,8 @@ public class SharePartitionTest {
                 (byte) 0, (byte) 0, (byte) 1,
                 (byte) 0, (byte) 1, (byte) 0,
                 (byte) 1))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(21, sharePartition.nextFetchOffset());
         assertEquals(2, sharePartition.cachedState().size());
@@ -663,12 +738,11 @@ public class SharePartitionTest {
     public void testAcknowledgeOutOfRangeCachedData() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
         // Acknowledge a batch when cache is empty.
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(0, 15, 
Collections.singletonList((byte) 3))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
 
         MemoryRecords records = memoryRecords(5, 5);
         CompletableFuture<List<AcquiredRecords>> result = 
sharePartition.acquire(
@@ -684,9 +758,8 @@ public class SharePartitionTest {
         ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(20, 25, 
Collections.singletonList((byte) 3))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRequestException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRequestException.class);
     }
 
     @Test
@@ -715,11 +788,10 @@ public class SharePartitionTest {
         List<ShareAcknowledgementBatch> acknowledgeBatches = Arrays.asList(
             new ShareAcknowledgementBatch(0, 10, 
Collections.singletonList((byte) 1)),
             new ShareAcknowledgementBatch(20, 24, 
Collections.singletonList((byte) 1)));
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID, acknowledgeBatches);
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRequestException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRequestException.class);
 
         // Create data for the batch with offsets 5-10.
         records = memoryRecords(6, 5);
@@ -733,8 +805,8 @@ public class SharePartitionTest {
         // Previous failed acknowledge request should succeed now.
         ackResult = sharePartition.acknowledge(
             MEMBER_ID, acknowledgeBatches);
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
     }
 
     @Test
@@ -751,12 +823,11 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.cachedState().size());
         assertNotNull(sharePartition.cachedState().get(5L));
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             "member-2",
             Collections.singletonList(new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 3))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
     }
 
     @Test
@@ -773,19 +844,18 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.cachedState().size());
         assertNotNull(sharePartition.cachedState().get(5L));
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 2))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         // Acknowledge the same batch again but with ACCEPT type.
         ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 1))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
 
         // Re-acquire the same batch and then acknowledge subset with ACCEPT 
type.
         result = sharePartition.acquire(
@@ -798,16 +868,15 @@ public class SharePartitionTest {
         ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(6, 8, 
Collections.singletonList((byte) 3))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         // Re-acknowledge the subset batch with REJECT type.
         ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(6, 8, 
Collections.singletonList((byte) 3))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
     }
 
     @Test
@@ -839,7 +908,7 @@ public class SharePartitionTest {
         // Cached data with offset 5-19 should exist.
         assertEquals(3, sharePartition.cachedState().size());
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Arrays.asList(
                 new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 2)),
@@ -847,9 +916,8 @@ public class SharePartitionTest {
                 new ShareAcknowledgementBatch(15, 19, 
Collections.singletonList((byte) 1)),
                 // Add another batch which should fail the request.
                 new ShareAcknowledgementBatch(15, 19, 
Collections.singletonList((byte) 1))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
 
         // Check the state of the cache. The state should be acquired itself.
         assertEquals(3, sharePartition.cachedState().size());
@@ -887,7 +955,7 @@ public class SharePartitionTest {
         // Cached data with offset 5-19 should exist.
         assertEquals(3, sharePartition.cachedState().size());
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Arrays.asList(
                 new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 2)),
@@ -895,9 +963,8 @@ public class SharePartitionTest {
                 new ShareAcknowledgementBatch(15, 19, 
Collections.singletonList((byte) 1)),
                 // Add another batch which should fail the request.
                 new ShareAcknowledgementBatch(16, 19, 
Collections.singletonList((byte) 1))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
 
         // Check the state of the cache. The state should be acquired itself.
         assertEquals(3, sharePartition.cachedState().size());
@@ -923,11 +990,11 @@ public class SharePartitionTest {
         assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(15, sharePartition.nextFetchOffset());
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(12, 13, 
Collections.singletonList((byte) 2))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(12, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
@@ -1010,11 +1077,11 @@ public class SharePartitionTest {
         assertNull(sharePartition.cachedState().get(23L).offsetState());
         assertNull(sharePartition.cachedState().get(28L).offsetState());
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
             MEMBER_ID,
             Collections.singletonList(new ShareAcknowledgementBatch(12, 30, 
Collections.singletonList((byte) 2))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(12, sharePartition.nextFetchOffset());
         assertEquals(4, sharePartition.cachedState().size());
@@ -1687,20 +1754,18 @@ public class SharePartitionTest {
                 () -> "Acquisition lock never got released.");
 
         // Acknowledge with ACCEPT type should throw 
InvalidRecordStateException since they've been released due to acquisition lock 
timeout.
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(MEMBER_ID,
+        CompletableFuture<Void> ackResult = 
sharePartition.acknowledge(MEMBER_ID,
                 Collections.singletonList(new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 1))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
         
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
         assertEquals(0, sharePartition.timer().size());
 
         // Try acknowledging with REJECT type should throw 
InvalidRecordStateException since they've been released due to acquisition lock 
timeout.
         ackResult = sharePartition.acknowledge(MEMBER_ID,
                 Collections.singletonList(new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 3))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
         
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
         assertEquals(0, sharePartition.timer().size());
     }
@@ -1853,9 +1918,9 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
@@ -1872,9 +1937,9 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
@@ -1905,9 +1970,9 @@ public class SharePartitionTest {
 
         sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new 
ShareAcknowledgementBatch(5, 18, Collections.singletonList((byte) 1))));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(3, sharePartition.cachedState().size());
         assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).batchState());
@@ -1947,9 +2012,9 @@ public class SharePartitionTest {
                 (byte) 0, (byte) 1, (byte) 0,
                 (byte) 1))));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(5, sharePartition.nextFetchOffset());
         // Check cached state.
@@ -2000,9 +2065,9 @@ public class SharePartitionTest {
                 (byte) 1))));
 
         // Release acquired records for "member-1".
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(19, sharePartition.nextFetchOffset());
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
@@ -2024,8 +2089,8 @@ public class SharePartitionTest {
 
         // Release acquired records for "member-2".
         releaseResult = sharePartition.releaseAcquiredRecords("member-2");
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
@@ -2073,9 +2138,9 @@ public class SharePartitionTest {
                 (byte) 1))));
 
         // Release acquired records for "member-1".
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(19, sharePartition.nextFetchOffset());
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
@@ -2101,8 +2166,8 @@ public class SharePartitionTest {
 
         // Release acquired records for "member-2".
         releaseResult = sharePartition.releaseAcquiredRecords("member-2");
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(6, sharePartition.nextFetchOffset());
         // Check cached state.
@@ -2129,9 +2194,9 @@ public class SharePartitionTest {
     public void testReleaseAcquiredRecordsForEmptyCachedData() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
         // Release a batch when cache is empty.
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(0, sharePartition.cachedState().size());
     }
@@ -2148,9 +2213,9 @@ public class SharePartitionTest {
         sharePartition.acknowledge(MEMBER_ID,
                 Collections.singletonList(new ShareAcknowledgementBatch(8, 9, 
Collections.singletonList((byte) 1))));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
         assertEquals(5, sharePartition.nextFetchOffset());
         // Check cached state.
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@@ -2182,9 +2247,9 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(2, sharePartition.cachedState().size());
@@ -2234,9 +2299,9 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(10, sharePartition.nextFetchOffset());
         assertEquals(3, sharePartition.cachedState().size());
@@ -2308,9 +2373,9 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(25, sharePartition.nextFetchOffset());
         assertEquals(0, sharePartition.cachedState().size());
@@ -2329,9 +2394,9 @@ public class SharePartitionTest {
                 Collections.singletonList(new ShareAcknowledgementBatch(5, 7, 
Collections.singletonList((byte) 1))));
 
         // Release acquired records subset with another member.
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords("member-2");
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords("member-2");
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
         // Check cached state.
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(5L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
@@ -2361,9 +2426,9 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
-        assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertTrue(releaseResult.isCompletedExceptionally());
+        assertFutureThrows(releaseResult, InvalidRequestException.class);
 
         // Due to failure in writeShareGroupState, the cached state should not 
be updated.
         assertEquals(1, sharePartition.cachedState().size());
@@ -2397,9 +2462,9 @@ public class SharePartitionTest {
                         PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
-        assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertTrue(releaseResult.isCompletedExceptionally());
+        assertFutureThrows(releaseResult, InvalidRequestException.class);
 
         // Due to failure in writeShareGroupState, the cached state should not 
be updated.
         assertEquals(1, sharePartition.cachedState().size());
@@ -2426,9 +2491,9 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
@@ -2471,9 +2536,9 @@ public class SharePartitionTest {
                         (byte) 0, (byte) 1, (byte) 0,
                         (byte) 1))));
 
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(5, sharePartition.nextFetchOffset());
         // Check cached state.
@@ -3114,9 +3179,9 @@ public class SharePartitionTest {
         assertEquals(7, sharePartition.cachedState().size());
 
         // Release acquired records for MEMBER_ID.
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         // Check cached state.
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@@ -3178,9 +3243,9 @@ public class SharePartitionTest {
         assertEquals(2, sharePartition.cachedState().size());
 
         // Release acquired records.
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).batchMemberId());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(5L).batchState());
@@ -3209,9 +3274,9 @@ public class SharePartitionTest {
         assertEquals(2, sharePartition.cachedState().size());
 
         // Release acquired records.
-        CompletableFuture<Optional<Throwable>> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
         assertFalse(releaseResult.isCompletedExceptionally());
-        assertFalse(releaseResult.join().isPresent());
 
         assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).batchMemberId());
         assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(5L).batchState());
@@ -3402,12 +3467,12 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
 
         // Acknowledge with RELEASE action.
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+        CompletableFuture<Void> ackResult = 
sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
                 new ShareAcknowledgementBatch(2, 6, 
Collections.singletonList((byte) 2)),
                 new ShareAcknowledgementBatch(10, 14, 
Collections.singletonList((byte) 2))));
 
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(12, sharePartition.nextFetchOffset());
         assertEquals(12, sharePartition.startOffset());
@@ -3466,10 +3531,10 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
 
         // Acknowledge with ACCEPT action.
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(
+        CompletableFuture<Void> ackResult = 
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(
                 new ShareAcknowledgementBatch(2, 14, 
Collections.singletonList((byte) 1))));
+        assertNull(ackResult.join());
         assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
 
         assertEquals(25, sharePartition.nextFetchOffset());
         // For cached state corresponding to entry 2, the offset states will 
be ARCHIVED, ARCHIVED, ARCHIVED, ARCHIVED and ACKNOWLEDGED.
@@ -3610,7 +3675,9 @@ public class SharePartitionTest {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Collections.emptyList()));
+        CompletableFuture<Void> result = 
sharePartition.writeShareGroupState(Collections.emptyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalStateException.class);
     }
 
     @Test
@@ -3622,7 +3689,9 @@ public class SharePartitionTest {
         WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
         Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(null);
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        CompletableFuture<Void> result = 
sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalStateException.class);
     }
 
     @Test
@@ -3635,27 +3704,35 @@ public class SharePartitionTest {
         // TopicsData is empty.
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.emptyList());
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        CompletableFuture<Void> writeResult = 
sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(writeResult.isCompletedExceptionally());
+        assertFutureThrows(writeResult, IllegalStateException.class);
 
         // TopicsData contains more results than expected.
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Arrays.asList(
                 new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.emptyList()),
                 new TopicData<>(Uuid.randomUuid(), Collections.emptyList())));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(writeResult.isCompletedExceptionally());
+        assertFutureThrows(writeResult, IllegalStateException.class);
 
         // TopicsData contains no partition data.
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
                 new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.emptyList())));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(writeResult.isCompletedExceptionally());
+        assertFutureThrows(writeResult, IllegalStateException.class);
 
         // TopicsData contains wrong topicId.
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
                 new TopicData<>(Uuid.randomUuid(), Collections.singletonList(
                         PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(writeResult.isCompletedExceptionally());
+        assertFutureThrows(writeResult, IllegalStateException.class);
 
         // TopicsData contains more partition data than expected.
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
@@ -3663,14 +3740,18 @@ public class SharePartitionTest {
                         PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
                         PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(writeResult.isCompletedExceptionally());
+        assertFutureThrows(writeResult, IllegalStateException.class);
 
         // TopicsData contains wrong partition.
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
                 new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
                         PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(writeResult.isCompletedExceptionally());
+        assertFutureThrows(writeResult, IllegalStateException.class);
     }
 
     @Test
@@ -3680,11 +3761,13 @@ public class SharePartitionTest {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
 RuntimeException("Write exception")));
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        CompletableFuture<Void> writeResult = 
sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(writeResult.isCompletedExceptionally());
+        assertFutureThrows(writeResult, IllegalStateException.class);
     }
 
     @Test
-    public void testIsWriteShareGroupStateSuccessful() {
+    public void testWriteShareGroupState() {
         Persister persister = Mockito.mock(Persister.class);
         mockPersisterReadStateMethod(persister);
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -3695,22 +3778,96 @@ public class SharePartitionTest {
                         PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
 
-        
assertTrue(sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        CompletableFuture<Void> result = 
sharePartition.writeShareGroupState(Mockito.anyList());
+        assertNull(result.join());
+        assertFalse(result.isCompletedExceptionally());
     }
 
     @Test
-    public void testIsWriteShareGroupStateFailure() {
+    public void testWriteShareGroupStateFailure() {
         Persister persister = Mockito.mock(Persister.class);
         mockPersisterReadStateMethod(persister);
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
-        // Mock Write state RPC to return error response.
+        // Mock Write state RPC to return error response, NOT_COORDINATOR.
         WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
                 new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
                         PartitionFactory.newPartitionErrorData(0, 
Errors.NOT_COORDINATOR.code(), Errors.NOT_COORDINATOR.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
 
-        
assertFalse(sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+        CompletableFuture<Void> result = 
sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, CoordinatorNotAvailableException.class);
+
+        // Mock Write state RPC to return error response, 
COORDINATOR_NOT_AVAILABLE.
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.COORDINATOR_NOT_AVAILABLE.code(), 
Errors.COORDINATOR_NOT_AVAILABLE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        result = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, CoordinatorNotAvailableException.class);
+
+        // Mock Write state RPC to return error response, 
COORDINATOR_LOAD_IN_PROGRESS.
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), 
Errors.COORDINATOR_LOAD_IN_PROGRESS.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        result = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, CoordinatorNotAvailableException.class);
+
+        // Mock Write state RPC to return error response, GROUP_ID_NOT_FOUND.
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        result = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, InvalidRequestException.class);
+
+        // Mock Write state RPC to return error response, 
UNKNOWN_TOPIC_OR_PARTITION.
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        result = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, InvalidRequestException.class);
+
+        // Mock Write state RPC to return error response, FENCED_STATE_EPOCH.
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.FENCED_STATE_EPOCH.code(), Errors.FENCED_STATE_EPOCH.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        result = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, FencedStateEpochException.class);
+
+        // Mock Write state RPC to return error response, FENCED_LEADER_EPOCH.
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.FENCED_LEADER_EPOCH.code(), Errors.FENCED_LEADER_EPOCH.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        result = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, NotLeaderOrFollowerException.class);
+
+        // Mock Write state RPC to return error response, UNKNOWN_SERVER_ERROR.
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        result = sharePartition.writeShareGroupState(Mockito.anyList());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, UnknownServerException.class);
     }
 
     @Test
@@ -3720,7 +3877,9 @@ public class SharePartitionTest {
                 new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, 
(short) 2),
                 new PersisterStateBatch(11L, 15L, RecordState.ARCHIVED.id, 
(short) 3));
 
-        
assertTrue(sharePartition.isWriteShareGroupStateSuccessful(stateBatches));
+        CompletableFuture<Void> result = 
sharePartition.writeShareGroupState(stateBatches);
+        assertNull(result.join());
+        assertFalse(result.isCompletedExceptionally());
     }
 
     @Test
@@ -4109,17 +4268,17 @@ public class SharePartitionTest {
         WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
                 new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
-                        PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+                        PartitionFactory.newPartitionErrorData(0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))));
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
 
         sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE, 
20, 0, memoryRecords(10, 5),
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(MEMBER_ID,
+        CompletableFuture<Void> ackResult = 
sharePartition.acknowledge(MEMBER_ID,
                 Collections.singletonList(new ShareAcknowledgementBatch(5, 14, 
Collections.singletonList((byte) 1))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRequestException.class);
 
         // Due to failure in writeShareGroupState, the cached state should not 
be updated.
         assertEquals(1, sharePartition.cachedState().size());
@@ -4144,11 +4303,10 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge(
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
                 MEMBER_ID,
                 Collections.singletonList(new ShareAcknowledgementBatch(8, 10, 
Collections.singletonList((byte) 3))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertFalse(ackResult.join().isPresent());
+        assertTrue(ackResult.isCompletedExceptionally());
 
         // Due to failure in writeShareGroupState, the cached state should not 
be updated.
         assertEquals(1, sharePartition.cachedState().size());
@@ -4179,11 +4337,10 @@ public class SharePartitionTest {
                 Collections.singletonList(new ShareAcknowledgementBatch(5, 7, 
Collections.singletonList((byte) 1))));
 
         // Acknowledge subset with another member.
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge("member-2",
+        CompletableFuture<Void> ackResult = 
sharePartition.acknowledge("member-2",
                 Collections.singletonList(new ShareAcknowledgementBatch(9, 11, 
Collections.singletonList((byte) 1))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
     }
 
     @Test
@@ -4202,15 +4359,14 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge("member-1", Arrays.asList(
+        CompletableFuture<Void> ackResult = 
sharePartition.acknowledge("member-1", Arrays.asList(
                 new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 2)),
                 // Acknowledging batch with another member will cause failure 
and rollback.
                 new ShareAcknowledgementBatch(10, 14, 
Collections.singletonList((byte) 1)),
                 new ShareAcknowledgementBatch(15, 19, 
Collections.singletonList((byte) 1))));
 
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
 
         // State should be rolled back to the previous state for any changes.
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
@@ -4240,14 +4396,13 @@ public class SharePartitionTest {
                 Optional.empty(), OptionalLong.empty(), Optional.empty(),
                 OptionalInt.empty(), false));
 
-        CompletableFuture<Optional<Throwable>> ackResult = 
sharePartition.acknowledge("member-1", Arrays.asList(
+        CompletableFuture<Void> ackResult = 
sharePartition.acknowledge("member-1", Arrays.asList(
                 new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 2)),
                 new ShareAcknowledgementBatch(10, 14, 
Collections.singletonList((byte) 1)),
                 // Acknowledging subset with another member will cause failure 
and rollback.
                 new ShareAcknowledgementBatch(16, 18, 
Collections.singletonList((byte) 1))));
-        assertFalse(ackResult.isCompletedExceptionally());
-        assertTrue(ackResult.join().isPresent());
-        assertEquals(InvalidRecordStateException.class, 
ackResult.join().get().getClass());
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(ackResult, InvalidRecordStateException.class);
 
         assertEquals(3, sharePartition.cachedState().size());
         // Check the state of the cache. State should be rolled back to the 
previous state for any changes.
@@ -4440,12 +4595,12 @@ public class SharePartitionTest {
         sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(
                 new ShareAcknowledgementBatch(2, 6, 
Collections.singletonList((byte) 1))));
         // Acknowledge records will induce 1 write state RPC call via function 
isWriteShareGroupStateSuccessful.
-        Mockito.verify(sharePartition, 
Mockito.times(1)).isWriteShareGroupStateSuccessful(anyList());
+        Mockito.verify(sharePartition, 
Mockito.times(1)).writeShareGroupState(anyList());
 
         sharePartition.releaseAcquiredRecords(MEMBER_ID);
         // Release acquired records will induce 0 write state RPC call via 
function isWriteShareGroupStateSuccessful
         // because the in-flight batch has been acknowledged. Hence, the total 
calls remain 1.
-        Mockito.verify(sharePartition, 
Mockito.times(1)).isWriteShareGroupStateSuccessful(anyList());
+        Mockito.verify(sharePartition, 
Mockito.times(1)).writeShareGroupState(anyList());
     }
 
     @Test


Reply via email to