This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 180112a4a95 KAFKA-18084 Added write locks in SharePartition where
locks were async calls were being made (#17957)
180112a4a95 is described below
commit 180112a4a95d754f38d004a910b1a31151311605
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue Dec 3 23:16:29 2024 +0530
KAFKA-18084 Added write locks in SharePartition where locks were async
calls were being made (#17957)
Reviewers: Andrew Schofield <[email protected]>, poorv Mittal
<[email protected]>, Sushant Mahajan <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 53 +++---
.../kafka/server/share/SharePartitionTest.java | 33 ++--
.../share/persister/DefaultStatePersister.java | 18 +-
.../kafka/server/share/persister/Persister.java | 4 +-
.../share/persister/DefaultStatePersisterTest.java | 203 +++++++++++----------
5 files changed, 169 insertions(+), 142 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index ddc023a5315..36541ef3195 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -380,15 +380,24 @@ public class SharePartition {
// Update state to initializing to avoid any concurrent requests
to be processed.
partitionState = SharePartitionState.INITIALIZING;
- // Initialize the share partition by reading the state from the
persister.
- persister.readState(new ReadShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
- .setGroupId(this.groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(topicIdPartition.topicId(),
-
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(),
leaderEpoch)))))
- .build())
- .build()
- ).whenComplete((result, exception) -> {
+ } catch (Exception e) {
+ log.error("Failed to initialize the share partition: {}-{}",
groupId, topicIdPartition, e);
+ completeInitializationWithException(future, e);
+ return future;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ // Initialize the share partition by reading the state from the
persister.
+ persister.readState(new ReadShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+ .setGroupId(this.groupId)
+ .setTopicsData(Collections.singletonList(new
TopicData<>(topicIdPartition.topicId(),
+
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(),
leaderEpoch)))))
+ .build())
+ .build()
+ ).whenComplete((result, exception) -> {
+ lock.writeLock().lock();
+ try {
if (exception != null) {
log.error("Failed to initialize the share partition:
{}-{}", groupId, topicIdPartition, exception);
completeInitializationWithException(future, exception);
@@ -462,13 +471,10 @@ public class SharePartition {
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
future.complete(null);
- });
- } catch (Exception e) {
- log.error("Failed to initialize the share partition: {}-{}",
groupId, topicIdPartition, e);
- completeInitializationWithException(future, e);
- } finally {
- lock.writeLock().unlock();
- }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ });
return future;
}
@@ -1645,8 +1651,13 @@ public class SharePartition {
future.complete(null);
return;
}
+ } finally {
+ lock.writeLock().unlock();
+ }
- writeShareGroupState(stateBatches).whenComplete((result,
exception) -> {
+ writeShareGroupState(stateBatches).whenComplete((result, exception) ->
{
+ lock.writeLock().lock();
+ try {
if (exception != null) {
log.error("Failed to write state to persister for the
share partition: {}-{}",
groupId, topicIdPartition, exception);
@@ -1665,10 +1676,10 @@ public class SharePartition {
// Update the cached state and start and end offsets after
acknowledging/releasing the acquired records.
maybeUpdateCachedStateAndOffsets();
future.complete(null);
- });
- } finally {
- lock.writeLock().unlock();
- }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ });
}
private void maybeUpdateCachedStateAndOffsets() {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 3e90005902e..dd895b4badc 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -490,11 +490,8 @@ public class SharePartitionTest {
if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
executorService.shutdown();
}
-
- for (CompletableFuture<Void> result : results) {
- assertTrue(result.isDone());
- assertFalse(result.isCompletedExceptionally());
- }
+ assertTrue(results.stream().allMatch(CompletableFuture::isDone));
+
assertFalse(results.stream().allMatch(CompletableFuture::isCompletedExceptionally));
assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
// Verify the persister read state is called only once.
@@ -771,24 +768,20 @@ public class SharePartitionTest {
Persister persister = Mockito.mock(Persister.class);
// Complete the future exceptionally for read state.
Mockito.when(persister.readState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Read exception")));
- SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+ SharePartition sharePartition1 =
SharePartitionBuilder.builder().withPersister(persister).build();
- CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ CompletableFuture<Void> result = sharePartition1.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(result, RuntimeException.class);
- assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
+ assertEquals(SharePartitionState.FAILED,
sharePartition1.partitionState());
persister = Mockito.mock(Persister.class);
// Throw exception for read state.
Mockito.when(persister.readState(Mockito.any())).thenThrow(new
RuntimeException("Read exception"));
- sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+ SharePartition sharePartition2 =
SharePartitionBuilder.builder().withPersister(persister).build();
- result = sharePartition.maybeInitialize();
- assertTrue(result.isDone());
- assertTrue(result.isCompletedExceptionally());
- assertFutureThrows(result, RuntimeException.class);
- assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
+ assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
}
@Test
@@ -4453,12 +4446,20 @@ public class SharePartitionTest {
public void testWriteShareGroupStateWithWriteException() {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
- SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+ SharePartition sharePartition1 =
SharePartitionBuilder.builder().withPersister(persister).build();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Write exception")));
- CompletableFuture<Void> writeResult =
sharePartition.writeShareGroupState(anyList());
+ CompletableFuture<Void> writeResult =
sharePartition1.writeShareGroupState(anyList());
assertTrue(writeResult.isCompletedExceptionally());
assertFutureThrows(writeResult, IllegalStateException.class);
+
+ persister = Mockito.mock(Persister.class);
+ // Throw exception for write state.
+ mockPersisterReadStateMethod(persister);
+ SharePartition sharePartition2 =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ Mockito.when(persister.writeState(Mockito.any())).thenThrow(new
RuntimeException("Write exception"));
+ assertThrows(RuntimeException.class, () ->
sharePartition2.writeShareGroupState(anyList()));
}
@Test
diff --git
a/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
b/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 19dcc2c2170..07ac3eab116 100644
---
a/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++
b/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -78,8 +78,13 @@ public class DefaultStatePersister implements Persister {
* @param request WriteShareGroupStateParameters
* @return A completable future of WriteShareGroupStateResult
*/
- public CompletableFuture<WriteShareGroupStateResult>
writeState(WriteShareGroupStateParameters request) throws
IllegalArgumentException {
- validate(request);
+ public CompletableFuture<WriteShareGroupStateResult>
writeState(WriteShareGroupStateParameters request) {
+ try {
+ validate(request);
+ } catch (Exception e) {
+ log.error("Unable to validate write state request", e);
+ return CompletableFuture.failedFuture(e);
+ }
GroupTopicPartitionData<PartitionStateBatchData> gtp =
request.groupTopicPartitionData();
String groupId = gtp.groupId();
@@ -169,8 +174,13 @@ public class DefaultStatePersister implements Persister {
* @param request ReadShareGroupStateParameters
* @return A completable future of ReadShareGroupStateResult
*/
- public CompletableFuture<ReadShareGroupStateResult>
readState(ReadShareGroupStateParameters request) throws
IllegalArgumentException {
- validate(request);
+ public CompletableFuture<ReadShareGroupStateResult>
readState(ReadShareGroupStateParameters request) {
+ try {
+ validate(request);
+ } catch (Exception e) {
+ log.error("Unable to validate read state request", e);
+ return CompletableFuture.failedFuture(e);
+ }
GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp =
request.groupTopicPartitionData();
String groupId = gtp.groupId();
Map<Uuid, Map<Integer,
CompletableFuture<ReadShareGroupStateResponse>>> futureMap = new HashMap<>();
diff --git
a/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java
b/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java
index 49073c83cd6..ac0a7955a70 100644
--- a/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java
+++ b/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java
@@ -40,7 +40,7 @@ public interface Persister {
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
- CompletableFuture<ReadShareGroupStateResult>
readState(ReadShareGroupStateParameters request) throws
IllegalArgumentException;
+ CompletableFuture<ReadShareGroupStateResult>
readState(ReadShareGroupStateParameters request);
/**
* Write share-partition state.
@@ -48,7 +48,7 @@ public interface Persister {
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
- CompletableFuture<WriteShareGroupStateResult>
writeState(WriteShareGroupStateParameters request) throws
IllegalArgumentException;
+ CompletableFuture<WriteShareGroupStateResult>
writeState(WriteShareGroupStateParameters request);
/**
* Delete share-partition state.
diff --git
a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 81742ca7a27..9e0ce91bc85 100644
---
a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++
b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -50,8 +50,8 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
@@ -132,68 +132,71 @@ class DefaultStatePersisterTest {
int incorrectPartition = -1;
// Request Parameters are null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.writeState(null);
- });
+ DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
+ CompletableFuture<WriteShareGroupStateResult> result =
defaultStatePersister.writeState(null);
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// groupTopicPartitionData is null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// groupId is null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
- .setGroupId(null).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+ .setGroupId(null).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// topicsData is empty
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.emptyList()).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.emptyList()).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// topicId is null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(null,
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.singletonList(new TopicData<>(null,
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
- partition, 1, 0, 0, null))))
- ).build()).build());
- });
+ partition, 1, 0, 0, null))))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// partitionData is empty
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(topicId,
- Collections.emptyList()))
- ).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.singletonList(new
TopicData<>(topicId, Collections.emptyList()))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// partition value is incorrect
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(topicId,
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.writeState(new
WriteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.singletonList(new
TopicData<>(topicId,
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
- incorrectPartition, 1, 0, 0, null))))
- ).build()).build());
- });
+ incorrectPartition, 1, 0, 0,
null))))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
}
@Test
@@ -205,68 +208,70 @@ class DefaultStatePersisterTest {
int incorrectPartition = -1;
// Request Parameters are null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.readState(null);
- });
+ DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
+ CompletableFuture<ReadShareGroupStateResult> result =
defaultStatePersister.readState(null);
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// groupTopicPartitionData is null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// groupId is null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
- .setGroupId(null).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+ .setGroupId(null).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// topicsData is empty
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.emptyList()).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.emptyList()).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// topicId is null
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(null,
-
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(
- partition, 1))))
- ).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.singletonList(new TopicData<>(null,
+
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(partition,
1))))
+ ).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// partitionData is empty
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(topicId,
- Collections.emptyList()))
- ).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.singletonList(new
TopicData<>(topicId, Collections.emptyList()))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
// partition value is incorrect
- assertThrows(IllegalArgumentException.class, () -> {
- DefaultStatePersister defaultStatePersister =
DefaultStatePersisterBuilder.builder().build();
- defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
- .setGroupId(groupId)
- .setTopicsData(Collections.singletonList(new
TopicData<>(topicId,
-
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(
- incorrectPartition, 1))))
- ).build()).build());
- });
+ defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+ result = defaultStatePersister.readState(new
ReadShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+ .setGroupId(groupId)
+ .setTopicsData(Collections.singletonList(new
TopicData<>(topicId,
+
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition,
1))))).build()).build());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(result, IllegalArgumentException.class);
}
@Test