This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 89418b66aeb KAFKA-17442: Handled persister errors with write async
calls (KIP-932) (#16956)
89418b66aeb is described below
commit 89418b66aeb3e004e6ef16cd42f2f929d48a6718
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Sep 2 00:36:26 2024 +0100
KAFKA-17442: Handled persister errors with write async calls (KIP-932)
(#16956)
The PR makes the persister write RPC async. Also handles the errors from
persister as per the review comment here:
Addressing review comment for PR: #16397 (comment)
Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
<[email protected]>, Jun Rao <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 197 ++++++----
.../kafka/server/share/SharePartitionManager.java | 24 +-
.../server/share/SharePartitionManagerTest.java | 19 +-
.../kafka/server/share/SharePartitionTest.java | 423 ++++++++++++++-------
4 files changed, 440 insertions(+), 223 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index cb507f6a8be..7823b76a80a 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -16,10 +16,15 @@
*/
package kafka.server.share;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
@@ -36,7 +41,6 @@ import
org.apache.kafka.server.group.share.ReadShareGroupStateParameters;
import org.apache.kafka.server.group.share.ReadShareGroupStateResult;
import org.apache.kafka.server.group.share.TopicData;
import org.apache.kafka.server.group.share.WriteShareGroupStateParameters;
-import org.apache.kafka.server.group.share.WriteShareGroupStateResult;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
@@ -342,9 +346,8 @@ public class SharePartition {
* in-flight records and the next fetch offset is updated to the next
offset that should be
* fetched from the leader.
*
- * @param memberId The member id of the client that is fetching the record.
+ * @param memberId The member id of the client that is fetching
the record.
* @param fetchPartitionData The fetched records for the share partition.
- *
* @return A future which is completed when the records are acquired.
*/
public CompletableFuture<List<AcquiredRecords>> acquire(
@@ -463,21 +466,21 @@ public class SharePartition {
* from the in-flight records once persisted. The next fetch offset is
updated to the next offset
* that should be fetched from the leader, if required.
*
- * @param memberId The member id of the client that is fetching the record.
+ * @param memberId The member id of the client that is
fetching the record.
* @param acknowledgementBatches The acknowledgement batch list for the
share partition.
- *
* @return A future which is completed when the records are acknowledged.
*/
- public CompletableFuture<Optional<Throwable>> acknowledge(
+ public CompletableFuture<Void> acknowledge(
String memberId,
List<ShareAcknowledgementBatch> acknowledgementBatches
) {
log.trace("Acknowledgement batch request for share partition: {}-{}",
groupId, topicIdPartition);
+ CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null;
- lock.writeLock().lock();
List<InFlightState> updatedStates = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();
+ lock.writeLock().lock();
try {
// Avoided using enhanced for loop as need to check if the last
batch have offsets
// in the range.
@@ -528,12 +531,12 @@ public class SharePartition {
// If the acknowledgement is successful then persist state,
complete the state transition
// and update the cached state for start offset. Else rollback the
state transition.
- rollbackOrProcessStateUpdates(throwable, updatedStates,
stateBatches);
+ rollbackOrProcessStateUpdates(future, throwable, updatedStates,
stateBatches);
} finally {
lock.writeLock().unlock();
}
- return
CompletableFuture.completedFuture(Optional.ofNullable(throwable));
+ return future;
}
/**
@@ -541,17 +544,17 @@ public class SharePartition {
* that should be fetched from the leader.
*
* @param memberId The member id of the client whose records shall be
released.
- *
* @return A future which is completed when the records are released.
*/
- public CompletableFuture<Optional<Throwable>>
releaseAcquiredRecords(String memberId) {
+ public CompletableFuture<Void> releaseAcquiredRecords(String memberId) {
log.trace("Release acquired records request for share partition: {}-{}
memberId: {}", groupId, topicIdPartition, memberId);
+ CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null;
- lock.writeLock().lock();
List<InFlightState> updatedStates = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();
+ lock.writeLock().lock();
try {
RecordState recordState = RecordState.AVAILABLE;
// Iterate over multiple fetched batches. The state can vary per
offset entry
@@ -584,11 +587,11 @@ public class SharePartition {
// If the release acquired records is successful then persist
state, complete the state transition
// and update the cached state for start offset. Else rollback the
state transition.
- rollbackOrProcessStateUpdates(throwable, updatedStates,
stateBatches);
+ rollbackOrProcessStateUpdates(future, throwable, updatedStates,
stateBatches);
} finally {
lock.writeLock().unlock();
}
- return
CompletableFuture.completedFuture(Optional.ofNullable(throwable));
+ return future;
}
private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String
memberId,
@@ -880,14 +883,27 @@ public class SharePartition {
}
TopicData<PartitionAllData> state = response.topicsData().get(0);
- if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1
- || state.partitions().get(0).partition() !=
topicIdPartition.partition()) {
+ if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}.
Invalid topic partition response: {}.",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to
initialize the share partition %s-%s", groupId, topicIdPartition));
}
PartitionAllData partitionData = state.partitions().get(0);
+ if (partitionData.partition() != topicIdPartition.partition()) {
+ log.error("Failed to initialize the share partition: {}-{}.
Invalid partition response: {}.",
+ groupId, topicIdPartition, partitionData);
+ throw new IllegalStateException(String.format("Failed to
initialize the share partition %s-%s",
+ groupId, topicIdPartition));
+ }
+
+ if (partitionData.errorCode() != Errors.NONE.code()) {
+ KafkaException ex = fetchPersisterError(partitionData.errorCode(),
partitionData.errorMessage());
+ log.error("Failed to initialize the share partition: {}-{}.
Exception occurred: {}.",
+ groupId, topicIdPartition, partitionData);
+ throw ex;
+ }
+
// Set the state epoch and end offset from the persisted state.
startOffset = partitionData.startOffset() != -1 ?
partitionData.startOffset() : 0;
stateEpoch = partitionData.stateEpoch();
@@ -1338,21 +1354,36 @@ public class SharePartition {
// Visible for testing
void rollbackOrProcessStateUpdates(
+ CompletableFuture<Void> future,
Throwable throwable,
List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches
) {
- if (stateBatches.isEmpty() && updatedStates.isEmpty())
- return;
-
lock.writeLock().lock();
try {
- if (throwable != null ||
!isWriteShareGroupStateSuccessful(stateBatches)) {
+ if (throwable != null) {
// Log in DEBUG to avoid flooding of logs for a faulty client.
log.debug("Request failed for updating state, rollback any
changed state"
+ " for the share partition: {}-{}", groupId,
topicIdPartition);
updatedStates.forEach(state ->
state.completeStateTransition(false));
- } else {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ if (stateBatches.isEmpty() && updatedStates.isEmpty()) {
+ future.complete(null);
+ return;
+ }
+
+ writeShareGroupState(stateBatches).whenComplete((result,
exception) -> {
+ if (exception != null) {
+ log.error("Failed to write state to persister for the
share partition: {}-{}",
+ groupId, topicIdPartition, exception);
+ updatedStates.forEach(state ->
state.completeStateTransition(false));
+ future.completeExceptionally(exception);
+ return;
+ }
+
log.trace("State change request successful for share
partition: {}-{}",
groupId, topicIdPartition);
updatedStates.forEach(state -> {
@@ -1362,7 +1393,8 @@ public class SharePartition {
});
// Update the cached state and start and end offsets after
acknowledging/releasing the acquired records.
maybeUpdateCachedStateAndOffsets();
- }
+ future.complete(null);
+ });
} finally {
lock.writeLock().unlock();
}
@@ -1494,46 +1526,71 @@ public class SharePartition {
}
// Visible for testing
- boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch>
stateBatches) {
- WriteShareGroupStateResult response;
- try {
- response = persister.writeState(new
WriteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
- .setGroupId(this.groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(topicIdPartition.topicId(),
-
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
- topicIdPartition.partition(), stateEpoch,
startOffset, 0, stateBatches))))
- ).build()).build()).get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Failed to write the share group state for share
partition: {}-{}", groupId, topicIdPartition, e);
- throw new IllegalStateException(String.format("Failed to write the
share group state for share partition %s-%s",
- groupId, topicIdPartition), e);
- }
+ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch>
stateBatches) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ persister.writeState(new WriteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+ .setGroupId(this.groupId)
+ .setTopicsData(Collections.singletonList(new
TopicData<>(topicIdPartition.topicId(),
+
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
+ topicIdPartition.partition(), stateEpoch, startOffset,
0, stateBatches))))
+ ).build()).build())
+ .whenComplete((result, exception) -> {
+ if (exception != null) {
+ log.error("Failed to write the share group state for share
partition: {}-{}", groupId, topicIdPartition, exception);
+ future.completeExceptionally(new
IllegalStateException(String.format("Failed to write the share group state for
share partition %s-%s",
+ groupId, topicIdPartition), exception));
+ return;
+ }
- if (response == null || response.topicsData() == null ||
response.topicsData().size() != 1) {
- log.error("Failed to write the share group state for share
partition: {}-{}. Invalid state found: {}",
- groupId, topicIdPartition, response);
- throw new IllegalStateException(String.format("Failed to write the
share group state for share partition %s-%s",
- groupId, topicIdPartition));
- }
+ if (result == null || result.topicsData() == null ||
result.topicsData().size() != 1) {
+ log.error("Failed to write the share group state for share
partition: {}-{}. Invalid state found: {}",
+ groupId, topicIdPartition, result);
+ future.completeExceptionally(new
IllegalStateException(String.format("Failed to write the share group state for
share partition %s-%s",
+ groupId, topicIdPartition)));
+ return;
+ }
- TopicData<PartitionErrorData> state = response.topicsData().get(0);
- if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1
- || state.partitions().get(0).partition() !=
topicIdPartition.partition()) {
- log.error("Failed to write the share group state for share
partition: {}-{}. Invalid topic partition response: {}",
- groupId, topicIdPartition, response);
- throw new IllegalStateException(String.format("Failed to write the
share group state for share partition %s-%s",
- groupId, topicIdPartition));
- }
+ TopicData<PartitionErrorData> state =
result.topicsData().get(0);
+ if (state.topicId() != topicIdPartition.topicId() ||
state.partitions().size() != 1
+ || state.partitions().get(0).partition() !=
topicIdPartition.partition()) {
+ log.error("Failed to write the share group state for share
partition: {}-{}. Invalid topic partition response: {}",
+ groupId, topicIdPartition, result);
+ future.completeExceptionally(new
IllegalStateException(String.format("Failed to write the share group state for
share partition %s-%s",
+ groupId, topicIdPartition)));
+ return;
+ }
- PartitionErrorData partitionData = state.partitions().get(0);
- if (partitionData.errorCode() != Errors.NONE.code()) {
- Exception exception =
Errors.forCode(partitionData.errorCode()).exception(partitionData.errorMessage());
- log.error("Failed to write the share group state for share
partition: {}-{} due to exception",
- groupId, topicIdPartition, exception);
- return false;
+ PartitionErrorData partitionData = state.partitions().get(0);
+ if (partitionData.errorCode() != Errors.NONE.code()) {
+ KafkaException ex =
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
+ log.error("Failed to write the share group state for share
partition: {}-{} due to exception",
+ groupId, topicIdPartition, ex);
+ future.completeExceptionally(ex);
+ return;
+ }
+ future.complete(null);
+ });
+ return future;
+ }
+
+ private KafkaException fetchPersisterError(short errorCode, String
errorMessage) {
+ Errors error = Errors.forCode(errorCode);
+ switch (error) {
+ case NOT_COORDINATOR:
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ return new CoordinatorNotAvailableException(errorMessage);
+ case GROUP_ID_NOT_FOUND:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ return new InvalidRequestException(errorMessage);
+ case FENCED_STATE_EPOCH:
+ return new FencedStateEpochException(errorMessage);
+ case FENCED_LEADER_EPOCH:
+ return new NotLeaderOrFollowerException(errorMessage);
+ default:
+ return new UnknownServerException(errorMessage);
}
- return true;
}
private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String
memberId, long firstOffset, long lastOffset) {
@@ -1554,9 +1611,9 @@ public class SharePartition {
long lastOffset,
long delayMs
) {
- AcquisitionLockTimerTask acquistionLockTimerTask =
acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
- timer.add(acquistionLockTimerTask);
- return acquistionLockTimerTask;
+ AcquisitionLockTimerTask acquisitionLockTimerTask =
acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
+ timer.add(acquisitionLockTimerTask);
+ return acquisitionLockTimerTask;
}
private AcquisitionLockTimerTask acquisitionLockTimerTask(
@@ -1598,15 +1655,17 @@ public class SharePartition {
}
}
- if (!stateBatches.isEmpty() &&
!isWriteShareGroupStateSuccessful(stateBatches)) {
-
- // Even if write share group state RPC call fails, we will
still go ahead with the state transition.
- log.error("Failed to write the share group state on
acquisition lock timeout for share partition: {}-{} memberId: {}. " +
- "Proceeding with state transition.", groupId,
topicIdPartition, memberId);
+ if (!stateBatches.isEmpty()) {
+ writeShareGroupState(stateBatches).whenComplete((result,
exception) -> {
+ if (exception != null) {
+ log.error("Failed to write the share group state on
acquisition lock timeout for share partition: {}-{} memberId: {}",
+ groupId, topicIdPartition, memberId, exception);
+ }
+ // Even if write share group state RPC call fails, we will
still go ahead with the state transition.
+ // Update the cached state and start and end offsets after
releasing the acquisition lock on timeout.
+ maybeUpdateCachedStateAndOffsets();
+ });
}
-
- // Update the cached state and start and end offsets after
releasing the acquisition lock on timeout.
- maybeUpdateCachedStateAndOffsets();
} finally {
lock.writeLock().unlock();
}
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index fb1383db966..2b39e969ce6 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -272,14 +272,14 @@ public class SharePartitionManager implements
AutoCloseable {
acknowledgeTopics.forEach((topicIdPartition,
acknowledgePartitionBatches) -> {
SharePartition sharePartition =
partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
if (sharePartition != null) {
- CompletableFuture<Errors> future =
sharePartition.acknowledge(memberId,
acknowledgePartitionBatches).thenApply(throwable -> {
- if (throwable.isPresent()) {
- return Errors.forException(throwable.get());
+ CompletableFuture<Errors> future = new CompletableFuture<>();
+ sharePartition.acknowledge(memberId,
acknowledgePartitionBatches).whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ future.complete(Errors.forException(throwable));
+ return;
}
- acknowledgePartitionBatches.forEach(batch -> {
-
batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement);
- });
- return Errors.NONE;
+ acknowledgePartitionBatches.forEach(batch ->
batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement));
+ future.complete(Errors.NONE);
});
futures.put(topicIdPartition, future);
} else {
@@ -337,11 +337,13 @@ public class SharePartitionManager implements
AutoCloseable {
log.error("No share partition found for groupId {}
topicPartition {} while releasing acquired topic partitions", groupId,
topicIdPartition);
futuresMap.put(topicIdPartition,
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
} else {
- CompletableFuture<Errors> future =
sharePartition.releaseAcquiredRecords(memberId).thenApply(throwable -> {
- if (throwable.isPresent()) {
- return Errors.forException(throwable.get());
+ CompletableFuture<Errors> future = new CompletableFuture<>();
+
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result,
throwable) -> {
+ if (throwable != null) {
+ future.complete(Errors.forException(throwable));
+ return;
}
- return Errors.NONE;
+ future.complete(Errors.NONE);
});
futuresMap.put(topicIdPartition, future);
}
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index ce778a56898..894ab947f0b 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionContext;
import org.apache.kafka.server.share.ShareSessionKey;
+import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
@@ -1367,9 +1368,9 @@ public class SharePartitionManagerTest {
SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
-
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
-
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(CompletableFuture.completedFuture(
- Optional.of(new InvalidRecordStateException("Unable to release
acquired records for the batch"))
+
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(CompletableFuture.completedFuture(null));
+
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(memberId.toString()))).thenReturn(FutureUtils.failedFuture(
+ new InvalidRecordStateException("Unable to release acquired
records for the batch")
));
ShareSessionCache cache = mock(ShareSessionCache.class);
@@ -1515,7 +1516,7 @@ public class SharePartitionManagerTest {
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
SharePartition sp = mock(SharePartition.class);
- when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+ when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
@@ -1549,9 +1550,9 @@ public class SharePartitionManagerTest {
SharePartition sp2 = mock(SharePartition.class);
SharePartition sp3 = mock(SharePartition.class);
- when(sp1.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
- when(sp2.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
- when(sp3.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+ when(sp1.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(null));
+ when(sp2.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(null));
+ when(sp3.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
@@ -1669,8 +1670,8 @@ public class SharePartitionManagerTest {
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
SharePartition sp = mock(SharePartition.class);
- when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(CompletableFuture.completedFuture(
- Optional.of(new InvalidRequestException("Member is not the
owner of batch record"))
+ when(sp.acknowledge(ArgumentMatchers.eq(memberId),
any())).thenReturn(FutureUtils.failedFuture(
+ new InvalidRequestException("Member is not the owner of batch
record")
));
Map<SharePartitionManager.SharePartitionKey, SharePartition>
partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new
SharePartitionManager.SharePartitionKey(groupId, tp), sp);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 56a783b3977..9754b31c165 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -22,8 +22,12 @@ import kafka.server.share.SharePartition.RecordState;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
@@ -64,6 +68,7 @@ import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -187,6 +192,76 @@ public class SharePartitionTest {
assertEquals(10, sharePartition.nextFetchOffset());
}
+ @Test
+ public void testInitializeWithErrorPartitionResponse() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
+ // Mock NOT_COORDINATOR error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.NOT_COORDINATOR.code(), Errors.NOT_COORDINATOR.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(CoordinatorNotAvailableException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+
+ // Mock COORDINATOR_NOT_AVAILABLE error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.COORDINATOR_NOT_AVAILABLE.code(),
Errors.COORDINATOR_NOT_AVAILABLE.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(CoordinatorNotAvailableException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+
+ // Mock COORDINATOR_LOAD_IN_PROGRESS error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.COORDINATOR_LOAD_IN_PROGRESS.code(),
Errors.COORDINATOR_LOAD_IN_PROGRESS.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(CoordinatorNotAvailableException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+
+ // Mock GROUP_ID_NOT_FOUND error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(InvalidRequestException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+
+ // Mock UNKNOWN_TOPIC_OR_PARTITION error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
Errors.UNKNOWN_TOPIC_OR_PARTITION.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(InvalidRequestException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+
+ // Mock FENCED_STATE_EPOCH error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.FENCED_STATE_EPOCH.code(), Errors.FENCED_STATE_EPOCH.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(FencedStateEpochException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+
+ // Mock FENCED_LEADER_EPOCH error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.FENCED_LEADER_EPOCH.code(), Errors.FENCED_LEADER_EPOCH.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(NotLeaderOrFollowerException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+
+ // Mock UNKNOWN_SERVER_ERROR error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message(),
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ assertThrows(UnknownServerException.class, () ->
SharePartitionBuilder.builder().withPersister(persister).build());
+ }
+
@Test
public void testInitializeWithInvalidStartOffsetStateBatches() {
Persister persister = Mockito.mock(Persister.class);
@@ -497,11 +572,11 @@ public class SharePartitionTest {
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(1, 1,
Collections.singletonList((byte) 1))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(2, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
@@ -522,11 +597,11 @@ public class SharePartitionTest {
assertFalse(result.isCompletedExceptionally());
assertEquals(1, result.join().size());
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 14,
Collections.singletonList((byte) 1))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(15, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
@@ -558,7 +633,7 @@ public class SharePartitionTest {
assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(),
result.join().toArray());
assertEquals(19, sharePartition.nextFetchOffset());
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Arrays.asList(
new ShareAcknowledgementBatch(5, 6,
Collections.singletonList((byte) 2)),
@@ -567,8 +642,8 @@ public class SharePartitionTest {
(byte) 2, (byte) 2, (byte) 0,
(byte) 0, (byte) 0, (byte) 1
))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
@@ -622,7 +697,7 @@ public class SharePartitionTest {
assertEquals(21, sharePartition.nextFetchOffset());
// Acknowledging over subset of both batch with subset of gap offsets.
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(6, 18,
Arrays.asList(
(byte) 1, (byte) 1, (byte) 1,
@@ -630,8 +705,8 @@ public class SharePartitionTest {
(byte) 0, (byte) 0, (byte) 1,
(byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(21, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
@@ -663,12 +738,11 @@ public class SharePartitionTest {
public void testAcknowledgeOutOfRangeCachedData() {
SharePartition sharePartition =
SharePartitionBuilder.builder().build();
// Acknowledge a batch when cache is empty.
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(0, 15,
Collections.singletonList((byte) 3))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
MemoryRecords records = memoryRecords(5, 5);
CompletableFuture<List<AcquiredRecords>> result =
sharePartition.acquire(
@@ -684,9 +758,8 @@ public class SharePartitionTest {
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(20, 25,
Collections.singletonList((byte) 3))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRequestException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRequestException.class);
}
@Test
@@ -715,11 +788,10 @@ public class SharePartitionTest {
List<ShareAcknowledgementBatch> acknowledgeBatches = Arrays.asList(
new ShareAcknowledgementBatch(0, 10,
Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(20, 24,
Collections.singletonList((byte) 1)));
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID, acknowledgeBatches);
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRequestException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRequestException.class);
// Create data for the batch with offsets 5-10.
records = memoryRecords(6, 5);
@@ -733,8 +805,8 @@ public class SharePartitionTest {
// Previous failed acknowledge request should succeed now.
ackResult = sharePartition.acknowledge(
MEMBER_ID, acknowledgeBatches);
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
}
@Test
@@ -751,12 +823,11 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.cachedState().size());
assertNotNull(sharePartition.cachedState().get(5L));
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
"member-2",
Collections.singletonList(new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 3))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
}
@Test
@@ -773,19 +844,18 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.cachedState().size());
assertNotNull(sharePartition.cachedState().get(5L));
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 2))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
// Acknowledge the same batch again but with ACCEPT type.
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
// Re-acquire the same batch and then acknowledge subset with ACCEPT
type.
result = sharePartition.acquire(
@@ -798,16 +868,15 @@ public class SharePartitionTest {
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(6, 8,
Collections.singletonList((byte) 3))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
// Re-acknowledge the subset batch with REJECT type.
ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(6, 8,
Collections.singletonList((byte) 3))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
}
@Test
@@ -839,7 +908,7 @@ public class SharePartitionTest {
// Cached data with offset 5-19 should exist.
assertEquals(3, sharePartition.cachedState().size());
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Arrays.asList(
new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 2)),
@@ -847,9 +916,8 @@ public class SharePartitionTest {
new ShareAcknowledgementBatch(15, 19,
Collections.singletonList((byte) 1)),
// Add another batch which should fail the request.
new ShareAcknowledgementBatch(15, 19,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
// Check the state of the cache. The state should be acquired itself.
assertEquals(3, sharePartition.cachedState().size());
@@ -887,7 +955,7 @@ public class SharePartitionTest {
// Cached data with offset 5-19 should exist.
assertEquals(3, sharePartition.cachedState().size());
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Arrays.asList(
new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 2)),
@@ -895,9 +963,8 @@ public class SharePartitionTest {
new ShareAcknowledgementBatch(15, 19,
Collections.singletonList((byte) 1)),
// Add another batch which should fail the request.
new ShareAcknowledgementBatch(16, 19,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
// Check the state of the cache. The state should be acquired itself.
assertEquals(3, sharePartition.cachedState().size());
@@ -923,11 +990,11 @@ public class SharePartitionTest {
assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(),
acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(12, 13,
Collections.singletonList((byte) 2))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(12, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
@@ -1010,11 +1077,11 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(23L).offsetState());
assertNull(sharePartition.cachedState().get(28L).offsetState());
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(12, 30,
Collections.singletonList((byte) 2))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(12, sharePartition.nextFetchOffset());
assertEquals(4, sharePartition.cachedState().size());
@@ -1687,20 +1754,18 @@ public class SharePartitionTest {
() -> "Acquisition lock never got released.");
// Acknowledge with ACCEPT type should throw
InvalidRecordStateException since they've been released due to acquisition lock
timeout.
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(MEMBER_ID,
+ CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
// Try acknowledging with REJECT type should throw
InvalidRecordStateException since they've been released due to acquisition lock
timeout.
ackResult = sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 3))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
}
@@ -1853,9 +1918,9 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
@@ -1872,9 +1937,9 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
@@ -1905,9 +1970,9 @@ public class SharePartitionTest {
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new
ShareAcknowledgementBatch(5, 18, Collections.singletonList((byte) 1))));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(3, sharePartition.cachedState().size());
assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).batchState());
@@ -1947,9 +2012,9 @@ public class SharePartitionTest {
(byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
@@ -2000,9 +2065,9 @@ public class SharePartitionTest {
(byte) 1))));
// Release acquired records for "member-1".
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(19, sharePartition.nextFetchOffset());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
@@ -2024,8 +2089,8 @@ public class SharePartitionTest {
// Release acquired records for "member-2".
releaseResult = sharePartition.releaseAcquiredRecords("member-2");
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
@@ -2073,9 +2138,9 @@ public class SharePartitionTest {
(byte) 1))));
// Release acquired records for "member-1".
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(19, sharePartition.nextFetchOffset());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
@@ -2101,8 +2166,8 @@ public class SharePartitionTest {
// Release acquired records for "member-2".
releaseResult = sharePartition.releaseAcquiredRecords("member-2");
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(6, sharePartition.nextFetchOffset());
// Check cached state.
@@ -2129,9 +2194,9 @@ public class SharePartitionTest {
public void testReleaseAcquiredRecordsForEmptyCachedData() {
SharePartition sharePartition =
SharePartitionBuilder.builder().build();
// Release a batch when cache is empty.
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
}
@@ -2148,9 +2213,9 @@ public class SharePartitionTest {
sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(8, 9,
Collections.singletonList((byte) 1))));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@@ -2182,9 +2247,9 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
@@ -2234,9 +2299,9 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(10, sharePartition.nextFetchOffset());
assertEquals(3, sharePartition.cachedState().size());
@@ -2308,9 +2373,9 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(25, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
@@ -2329,9 +2394,9 @@ public class SharePartitionTest {
Collections.singletonList(new ShareAcknowledgementBatch(5, 7,
Collections.singletonList((byte) 1))));
// Release acquired records subset with another member.
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords("member-2");
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords("member-2");
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
@@ -2361,9 +2426,9 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
- assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertTrue(releaseResult.isCompletedExceptionally());
+ assertFutureThrows(releaseResult, InvalidRequestException.class);
// Due to failure in writeShareGroupState, the cached state should not
be updated.
assertEquals(1, sharePartition.cachedState().size());
@@ -2397,9 +2462,9 @@ public class SharePartitionTest {
PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
- assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertTrue(releaseResult.isCompletedExceptionally());
+ assertFutureThrows(releaseResult, InvalidRequestException.class);
// Due to failure in writeShareGroupState, the cached state should not
be updated.
assertEquals(1, sharePartition.cachedState().size());
@@ -2426,9 +2491,9 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
@@ -2471,9 +2536,9 @@ public class SharePartitionTest {
(byte) 0, (byte) 1, (byte) 0,
(byte) 1))));
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
@@ -3114,9 +3179,9 @@ public class SharePartitionTest {
assertEquals(7, sharePartition.cachedState().size());
// Release acquired records for MEMBER_ID.
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@@ -3178,9 +3243,9 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.cachedState().size());
// Release acquired records.
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).batchMemberId());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(5L).batchState());
@@ -3209,9 +3274,9 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.cachedState().size());
// Release acquired records.
- CompletableFuture<Optional<Throwable>> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
assertFalse(releaseResult.isCompletedExceptionally());
- assertFalse(releaseResult.join().isPresent());
assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).batchMemberId());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(5L).batchState());
@@ -3402,12 +3467,12 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
// Acknowledge with RELEASE action.
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+ CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
new ShareAcknowledgementBatch(2, 6,
Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(10, 14,
Collections.singletonList((byte) 2))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(12, sharePartition.nextFetchOffset());
assertEquals(12, sharePartition.startOffset());
@@ -3466,10 +3531,10 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(20L).batchState());
// Acknowledge with ACCEPT action.
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(
+ CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(
new ShareAcknowledgementBatch(2, 14,
Collections.singletonList((byte) 1))));
+ assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
assertEquals(25, sharePartition.nextFetchOffset());
// For cached state corresponding to entry 2, the offset states will
be ARCHIVED, ARCHIVED, ARCHIVED, ARCHIVED and ACKNOWLEDGED.
@@ -3610,7 +3675,9 @@ public class SharePartitionTest {
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Collections.emptyList()));
+ CompletableFuture<Void> result =
sharePartition.writeShareGroupState(Collections.emptyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalStateException.class);
}
@Test
@@ -3622,7 +3689,9 @@ public class SharePartitionTest {
WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(null);
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ CompletableFuture<Void> result =
sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalStateException.class);
}
@Test
@@ -3635,27 +3704,35 @@ public class SharePartitionTest {
// TopicsData is empty.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.emptyList());
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ CompletableFuture<Void> writeResult =
sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(writeResult.isCompletedExceptionally());
+ assertFutureThrows(writeResult, IllegalStateException.class);
// TopicsData contains more results than expected.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Arrays.asList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.emptyList()),
new TopicData<>(Uuid.randomUuid(), Collections.emptyList())));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(writeResult.isCompletedExceptionally());
+ assertFutureThrows(writeResult, IllegalStateException.class);
// TopicsData contains no partition data.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.emptyList())));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(writeResult.isCompletedExceptionally());
+ assertFutureThrows(writeResult, IllegalStateException.class);
// TopicsData contains wrong topicId.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(Uuid.randomUuid(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(writeResult.isCompletedExceptionally());
+ assertFutureThrows(writeResult, IllegalStateException.class);
// TopicsData contains more partition data than expected.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
@@ -3663,14 +3740,18 @@ public class SharePartitionTest {
PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(writeResult.isCompletedExceptionally());
+ assertFutureThrows(writeResult, IllegalStateException.class);
// TopicsData contains wrong partition.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ writeResult = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(writeResult.isCompletedExceptionally());
+ assertFutureThrows(writeResult, IllegalStateException.class);
}
@Test
@@ -3680,11 +3761,13 @@ public class SharePartitionTest {
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Write exception")));
- assertThrows(IllegalStateException.class, () ->
sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ CompletableFuture<Void> writeResult =
sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(writeResult.isCompletedExceptionally());
+ assertFutureThrows(writeResult, IllegalStateException.class);
}
@Test
- public void testIsWriteShareGroupStateSuccessful() {
+ public void testWriteShareGroupState() {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -3695,22 +3778,96 @@ public class SharePartitionTest {
PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-
assertTrue(sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ CompletableFuture<Void> result =
sharePartition.writeShareGroupState(Mockito.anyList());
+ assertNull(result.join());
+ assertFalse(result.isCompletedExceptionally());
}
@Test
- public void testIsWriteShareGroupStateFailure() {
+ public void testWriteShareGroupStateFailure() {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
- // Mock Write state RPC to return error response.
+ // Mock Write state RPC to return error response, NOT_COORDINATOR.
WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
PartitionFactory.newPartitionErrorData(0,
Errors.NOT_COORDINATOR.code(), Errors.NOT_COORDINATOR.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
-
assertFalse(sharePartition.isWriteShareGroupStateSuccessful(Mockito.anyList()));
+ CompletableFuture<Void> result =
sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, CoordinatorNotAvailableException.class);
+
+ // Mock Write state RPC to return error response,
COORDINATOR_NOT_AVAILABLE.
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0,
Errors.COORDINATOR_NOT_AVAILABLE.code(),
Errors.COORDINATOR_NOT_AVAILABLE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ result = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, CoordinatorNotAvailableException.class);
+
+ // Mock Write state RPC to return error response,
COORDINATOR_LOAD_IN_PROGRESS.
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0,
Errors.COORDINATOR_LOAD_IN_PROGRESS.code(),
Errors.COORDINATOR_LOAD_IN_PROGRESS.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ result = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, CoordinatorNotAvailableException.class);
+
+ // Mock Write state RPC to return error response, GROUP_ID_NOT_FOUND.
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ result = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, InvalidRequestException.class);
+
+ // Mock Write state RPC to return error response,
UNKNOWN_TOPIC_OR_PARTITION.
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0,
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ result = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, InvalidRequestException.class);
+
+ // Mock Write state RPC to return error response, FENCED_STATE_EPOCH.
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0,
Errors.FENCED_STATE_EPOCH.code(), Errors.FENCED_STATE_EPOCH.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ result = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, FencedStateEpochException.class);
+
+ // Mock Write state RPC to return error response, FENCED_LEADER_EPOCH.
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0,
Errors.FENCED_LEADER_EPOCH.code(), Errors.FENCED_LEADER_EPOCH.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ result = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, NotLeaderOrFollowerException.class);
+
+ // Mock Write state RPC to return error response, UNKNOWN_SERVER_ERROR.
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ result = sharePartition.writeShareGroupState(Mockito.anyList());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, UnknownServerException.class);
}
@Test
@@ -3720,7 +3877,9 @@ public class SharePartitionTest {
new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id,
(short) 2),
new PersisterStateBatch(11L, 15L, RecordState.ARCHIVED.id,
(short) 3));
-
assertTrue(sharePartition.isWriteShareGroupStateSuccessful(stateBatches));
+ CompletableFuture<Void> result =
sharePartition.writeShareGroupState(stateBatches);
+ assertNull(result.join());
+ assertFalse(result.isCompletedExceptionally());
}
@Test
@@ -4109,17 +4268,17 @@ public class SharePartitionTest {
WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
- PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+ PartitionFactory.newPartitionErrorData(0,
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))));
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
sharePartition.acquire(MEMBER_ID, new FetchPartitionData(Errors.NONE,
20, 0, memoryRecords(10, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(MEMBER_ID,
+ CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(5, 14,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRequestException.class);
// Due to failure in writeShareGroupState, the cached state should not
be updated.
assertEquals(1, sharePartition.cachedState().size());
@@ -4144,11 +4303,10 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge(
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
Collections.singletonList(new ShareAcknowledgementBatch(8, 10,
Collections.singletonList((byte) 3))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertFalse(ackResult.join().isPresent());
+ assertTrue(ackResult.isCompletedExceptionally());
// Due to failure in writeShareGroupState, the cached state should not
be updated.
assertEquals(1, sharePartition.cachedState().size());
@@ -4179,11 +4337,10 @@ public class SharePartitionTest {
Collections.singletonList(new ShareAcknowledgementBatch(5, 7,
Collections.singletonList((byte) 1))));
// Acknowledge subset with another member.
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge("member-2",
+ CompletableFuture<Void> ackResult =
sharePartition.acknowledge("member-2",
Collections.singletonList(new ShareAcknowledgementBatch(9, 11,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
}
@Test
@@ -4202,15 +4359,14 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge("member-1", Arrays.asList(
+ CompletableFuture<Void> ackResult =
sharePartition.acknowledge("member-1", Arrays.asList(
new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 2)),
// Acknowledging batch with another member will cause failure
and rollback.
new ShareAcknowledgementBatch(10, 14,
Collections.singletonList((byte) 1)),
new ShareAcknowledgementBatch(15, 19,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
// State should be rolled back to the previous state for any changes.
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
@@ -4240,14 +4396,13 @@ public class SharePartitionTest {
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
- CompletableFuture<Optional<Throwable>> ackResult =
sharePartition.acknowledge("member-1", Arrays.asList(
+ CompletableFuture<Void> ackResult =
sharePartition.acknowledge("member-1", Arrays.asList(
new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 2)),
new ShareAcknowledgementBatch(10, 14,
Collections.singletonList((byte) 1)),
// Acknowledging subset with another member will cause failure
and rollback.
new ShareAcknowledgementBatch(16, 18,
Collections.singletonList((byte) 1))));
- assertFalse(ackResult.isCompletedExceptionally());
- assertTrue(ackResult.join().isPresent());
- assertEquals(InvalidRecordStateException.class,
ackResult.join().get().getClass());
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(ackResult, InvalidRecordStateException.class);
assertEquals(3, sharePartition.cachedState().size());
// Check the state of the cache. State should be rolled back to the
previous state for any changes.
@@ -4440,12 +4595,12 @@ public class SharePartitionTest {
sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(
new ShareAcknowledgementBatch(2, 6,
Collections.singletonList((byte) 1))));
// Acknowledge records will induce 1 write state RPC call via function
isWriteShareGroupStateSuccessful.
- Mockito.verify(sharePartition,
Mockito.times(1)).isWriteShareGroupStateSuccessful(anyList());
+ Mockito.verify(sharePartition,
Mockito.times(1)).writeShareGroupState(anyList());
sharePartition.releaseAcquiredRecords(MEMBER_ID);
// Release acquired records will induce 0 write state RPC call via
function isWriteShareGroupStateSuccessful
// because the in-flight batch has been acknowledged. Hence, the total
calls remain 1.
- Mockito.verify(sharePartition,
Mockito.times(1)).isWriteShareGroupStateSuccessful(anyList());
+ Mockito.verify(sharePartition,
Mockito.times(1)).writeShareGroupState(anyList());
}
@Test