This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 180112a4a95 KAFKA-18084 Added write locks in SharePartition where 
locks were async calls were being made (#17957)
180112a4a95 is described below

commit 180112a4a95d754f38d004a910b1a31151311605
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue Dec 3 23:16:29 2024 +0530

    KAFKA-18084 Added write locks in SharePartition where locks were async 
calls were being made (#17957)
    
    Reviewers: Andrew Schofield <[email protected]>, poorv Mittal 
<[email protected]>, Sushant Mahajan <[email protected]>, 
Chia-Ping Tsai <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  53 +++---
 .../kafka/server/share/SharePartitionTest.java     |  33 ++--
 .../share/persister/DefaultStatePersister.java     |  18 +-
 .../kafka/server/share/persister/Persister.java    |   4 +-
 .../share/persister/DefaultStatePersisterTest.java | 203 +++++++++++----------
 5 files changed, 169 insertions(+), 142 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index ddc023a5315..36541ef3195 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -380,15 +380,24 @@ public class SharePartition {
 
             // Update state to initializing to avoid any concurrent requests 
to be processed.
             partitionState = SharePartitionState.INITIALIZING;
-            // Initialize the share partition by reading the state from the 
persister.
-            persister.readState(new ReadShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
-                    .setGroupId(this.groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(topicIdPartition.topicId(),
-                        
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(),
 leaderEpoch)))))
-                    .build())
-                .build()
-            ).whenComplete((result, exception) -> {
+        } catch (Exception e) {
+            log.error("Failed to initialize the share partition: {}-{}", 
groupId, topicIdPartition, e);
+            completeInitializationWithException(future, e);
+            return future;
+        } finally {
+            lock.writeLock().unlock();
+        }
+        // Initialize the share partition by reading the state from the 
persister.
+        persister.readState(new ReadShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+                .setGroupId(this.groupId)
+                .setTopicsData(Collections.singletonList(new 
TopicData<>(topicIdPartition.topicId(),
+                    
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(),
 leaderEpoch)))))
+                .build())
+            .build()
+        ).whenComplete((result, exception) -> {
+            lock.writeLock().lock();
+            try {
                 if (exception != null) {
                     log.error("Failed to initialize the share partition: 
{}-{}", groupId, topicIdPartition, exception);
                     completeInitializationWithException(future, exception);
@@ -462,13 +471,10 @@ public class SharePartition {
                 // Set the partition state to Active and complete the future.
                 partitionState = SharePartitionState.ACTIVE;
                 future.complete(null);
-            });
-        } catch (Exception e) {
-            log.error("Failed to initialize the share partition: {}-{}", 
groupId, topicIdPartition, e);
-            completeInitializationWithException(future, e);
-        } finally {
-            lock.writeLock().unlock();
-        }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        });
 
         return future;
     }
@@ -1645,8 +1651,13 @@ public class SharePartition {
                 future.complete(null);
                 return;
             }
+        } finally {
+            lock.writeLock().unlock();
+        }
 
-            writeShareGroupState(stateBatches).whenComplete((result, 
exception) -> {
+        writeShareGroupState(stateBatches).whenComplete((result, exception) -> 
{
+            lock.writeLock().lock();
+            try {
                 if (exception != null) {
                     log.error("Failed to write state to persister for the 
share partition: {}-{}",
                         groupId, topicIdPartition, exception);
@@ -1665,10 +1676,10 @@ public class SharePartition {
                 // Update the cached state and start and end offsets after 
acknowledging/releasing the acquired records.
                 maybeUpdateCachedStateAndOffsets();
                 future.complete(null);
-            });
-        } finally {
-            lock.writeLock().unlock();
-        }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        });
     }
 
     private void maybeUpdateCachedStateAndOffsets() {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 3e90005902e..dd895b4badc 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -490,11 +490,8 @@ public class SharePartitionTest {
             if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
                 executorService.shutdown();
         }
-
-        for (CompletableFuture<Void> result : results) {
-            assertTrue(result.isDone());
-            assertFalse(result.isCompletedExceptionally());
-        }
+        assertTrue(results.stream().allMatch(CompletableFuture::isDone));
+        
assertFalse(results.stream().allMatch(CompletableFuture::isCompletedExceptionally));
 
         assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
         // Verify the persister read state is called only once.
@@ -771,24 +768,20 @@ public class SharePartitionTest {
         Persister persister = Mockito.mock(Persister.class);
         // Complete the future exceptionally for read state.
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
 RuntimeException("Read exception")));
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+        SharePartition sharePartition1 = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
-        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        CompletableFuture<Void> result = sharePartition1.maybeInitialize();
         assertTrue(result.isDone());
         assertTrue(result.isCompletedExceptionally());
         assertFutureThrows(result, RuntimeException.class);
-        assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
+        assertEquals(SharePartitionState.FAILED, 
sharePartition1.partitionState());
 
         persister = Mockito.mock(Persister.class);
         // Throw exception for read state.
         Mockito.when(persister.readState(Mockito.any())).thenThrow(new 
RuntimeException("Read exception"));
-        sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+        SharePartition sharePartition2 = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
-        result = sharePartition.maybeInitialize();
-        assertTrue(result.isDone());
-        assertTrue(result.isCompletedExceptionally());
-        assertFutureThrows(result, RuntimeException.class);
-        assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
+        assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
     }
 
     @Test
@@ -4453,12 +4446,20 @@ public class SharePartitionTest {
     public void testWriteShareGroupStateWithWriteException() {
         Persister persister = Mockito.mock(Persister.class);
         mockPersisterReadStateMethod(persister);
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+        SharePartition sharePartition1 = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new
 RuntimeException("Write exception")));
-        CompletableFuture<Void> writeResult = 
sharePartition.writeShareGroupState(anyList());
+        CompletableFuture<Void> writeResult = 
sharePartition1.writeShareGroupState(anyList());
         assertTrue(writeResult.isCompletedExceptionally());
         assertFutureThrows(writeResult, IllegalStateException.class);
+
+        persister = Mockito.mock(Persister.class);
+        // Throw exception for write state.
+        mockPersisterReadStateMethod(persister);
+        SharePartition sharePartition2 = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        Mockito.when(persister.writeState(Mockito.any())).thenThrow(new 
RuntimeException("Write exception"));
+        assertThrows(RuntimeException.class, () -> 
sharePartition2.writeShareGroupState(anyList()));
     }
 
     @Test
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
 
b/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 19dcc2c2170..07ac3eab116 100644
--- 
a/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -78,8 +78,13 @@ public class DefaultStatePersister implements Persister {
      * @param request WriteShareGroupStateParameters
      * @return A completable future of WriteShareGroupStateResult
      */
-    public CompletableFuture<WriteShareGroupStateResult> 
writeState(WriteShareGroupStateParameters request) throws 
IllegalArgumentException {
-        validate(request);
+    public CompletableFuture<WriteShareGroupStateResult> 
writeState(WriteShareGroupStateParameters request) {
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate write state request", e);
+            return CompletableFuture.failedFuture(e);
+        }
         GroupTopicPartitionData<PartitionStateBatchData> gtp = 
request.groupTopicPartitionData();
         String groupId = gtp.groupId();
 
@@ -169,8 +174,13 @@ public class DefaultStatePersister implements Persister {
      * @param request ReadShareGroupStateParameters
      * @return A completable future of ReadShareGroupStateResult
      */
-    public CompletableFuture<ReadShareGroupStateResult> 
readState(ReadShareGroupStateParameters request) throws 
IllegalArgumentException {
-        validate(request);
+    public CompletableFuture<ReadShareGroupStateResult> 
readState(ReadShareGroupStateParameters request) {
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate read state request", e);
+            return CompletableFuture.failedFuture(e);
+        }
         GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = 
request.groupTopicPartitionData();
         String groupId = gtp.groupId();
         Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateResponse>>> futureMap = new HashMap<>();
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java 
b/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java
index 49073c83cd6..ac0a7955a70 100644
--- a/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java
+++ b/share/src/main/java/org/apache/kafka/server/share/persister/Persister.java
@@ -40,7 +40,7 @@ public interface Persister {
      * @param request Request parameters
      * @return A {@link CompletableFuture} that completes with the result.
      */
-    CompletableFuture<ReadShareGroupStateResult> 
readState(ReadShareGroupStateParameters request) throws 
IllegalArgumentException;
+    CompletableFuture<ReadShareGroupStateResult> 
readState(ReadShareGroupStateParameters request);
 
     /**
      * Write share-partition state.
@@ -48,7 +48,7 @@ public interface Persister {
      * @param request Request parameters
      * @return A {@link CompletableFuture} that completes with the result.
      */
-    CompletableFuture<WriteShareGroupStateResult> 
writeState(WriteShareGroupStateParameters request) throws 
IllegalArgumentException;
+    CompletableFuture<WriteShareGroupStateResult> 
writeState(WriteShareGroupStateParameters request);
 
     /**
      * Delete share-partition state.
diff --git 
a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
 
b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 81742ca7a27..9e0ce91bc85 100644
--- 
a/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++ 
b/share/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -50,8 +50,8 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
@@ -132,68 +132,71 @@ class DefaultStatePersisterTest {
         int incorrectPartition = -1;
 
         // Request Parameters are null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.writeState(null);
-        });
+        DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
+        CompletableFuture<WriteShareGroupStateResult> result = 
defaultStatePersister.writeState(null);
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // groupTopicPartitionData is null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // groupId is null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
-                    .setGroupId(null).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+                .setGroupId(null).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // topicsData is empty
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.emptyList()).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.emptyList()).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // topicId is null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(null,
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.singletonList(new TopicData<>(null,
                         
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
-                            partition, 1, 0, 0, null))))
-                    ).build()).build());
-        });
+                            partition, 1, 0, 0, null))))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // partitionData is empty
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId,
-                        Collections.emptyList()))
-                    ).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId, Collections.emptyList()))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // partition value is incorrect
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId,
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.writeState(new 
WriteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId,
                         
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
-                            incorrectPartition, 1, 0, 0, null))))
-                    ).build()).build());
-        });
+                            incorrectPartition, 1, 0, 0, 
null))))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
     }
 
     @Test
@@ -205,68 +208,70 @@ class DefaultStatePersisterTest {
         int incorrectPartition = -1;
 
         // Request Parameters are null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.readState(null);
-        });
+        DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
+        CompletableFuture<ReadShareGroupStateResult> result = 
defaultStatePersister.readState(null);
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // groupTopicPartitionData is null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // groupId is null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
-                    .setGroupId(null).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+                .setGroupId(null).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // topicsData is empty
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.emptyList()).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.emptyList()).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // topicId is null
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(null,
-                        
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(
-                            partition, 1))))
-                    ).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.singletonList(new TopicData<>(null,
+                        
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(partition,
 1))))
+                ).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // partitionData is empty
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId,
-                        Collections.emptyList()))
-                    ).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId, Collections.emptyList()))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
 
         // partition value is incorrect
-        assertThrows(IllegalArgumentException.class, () -> {
-            DefaultStatePersister defaultStatePersister = 
DefaultStatePersisterBuilder.builder().build();
-            defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
-                    .setGroupId(groupId)
-                    .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId,
-                        
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(
-                            incorrectPartition, 1))))
-                    ).build()).build());
-        });
+        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
+        result = defaultStatePersister.readState(new 
ReadShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
+                .setGroupId(groupId)
+                .setTopicsData(Collections.singletonList(new 
TopicData<>(topicId,
+                        
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition,
 1))))).build()).build());
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(result, IllegalArgumentException.class);
     }
 
     @Test

Reply via email to