This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new 5cc622a650f Use LeaveOnClose event with Streams membership manager 
(#18054)
5cc622a650f is described below

commit 5cc622a650fb77fa92dab699a86b10cc5811cf0d
Author: Bruno Cadonna <[email protected]>
AuthorDate: Thu Dec 5 12:23:48 2024 +0100

    Use LeaveOnClose event with Streams membership manager (#18054)
    
    The Streams membership manager did not use the LeaveOnClose event
    to faster leaving the group on closing. With the event the Streams
    membership manager leaves the group without invoking any callback
    for releasing the active tasks.
    
    With this commit the Streams membership member uses the LeaveOnClose
    event.
---
 checkstyle/suppressions.xml                        |  2 +-
 .../StreamsGroupHeartbeatRequestManager.java       |  2 +-
 .../internals/StreamsMembershipManager.java        | 54 +++++++++++++----
 .../events/ApplicationEventProcessor.java          | 15 +++--
 .../internals/StreamsMembershipManagerTest.java    | 67 ++++++++++++++++++----
 .../events/ApplicationEventProcessorTest.java      | 66 +++++++++++++++++++++
 6 files changed, 178 insertions(+), 28 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c17c9882f00..113555b93e0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -119,7 +119,7 @@
               
files="(Sender|Fetcher|FetchRequestManager|OffsetFetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
-              
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|FetchRequestManager|KafkaAdminClient|Message|KafkaProducer|NetworkClient)Test.java"/>
+              
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|FetchRequestManager|KafkaAdminClient|Message|KafkaProducer|NetworkClient|ApplicationEventProcessor)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
               files="MockAdminClient.java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index c99fea8a01a..3d235222f36 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -103,7 +103,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-        if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat()) {
+        if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager.shouldSkipHeartbeat()) {
             membershipManager.transitionToUnsubscribeIfLeaving();
             return NetworkClientDelegate.PollResult.EMPTY;
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index a3f55c92d3a..72378c0583f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -243,7 +243,7 @@ public class StreamsMembershipManager implements 
RequestManager {
         }
         resetEpoch();
         transitionTo(MemberState.JOINING);
-        clearPendingTaskAssignment();
+        clearCurrentTaskAssignment();
     }
 
     private void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
@@ -271,7 +271,7 @@ public class StreamsMembershipManager implements 
RequestManager {
 
     private void finalizeLeaving() {
         
updateMemberEpoch(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH);
-        clearPendingTaskAssignment();
+        clearCurrentTaskAssignment();
     }
 
     private void transitionToStale() {
@@ -373,7 +373,7 @@ public class StreamsMembershipManager implements 
RequestManager {
         }
     }
 
-    private void clearPendingTaskAssignment() {
+    private void clearCurrentTaskAssignment() {
         currentAssignment = LocalAssignment.NONE;
     }
 
@@ -582,6 +582,27 @@ public class StreamsMembershipManager implements 
RequestManager {
             
.collect(Collectors.toMap(StreamsGroupHeartbeatResponseData.TaskIds::subtopologyId,
 taskId -> new TreeSet<>(taskId.partitions())));
     }
 
+    /**
+     * Leaves the group when the member closes.
+     *
+     * <p>
+     * This method does the following:
+     * <ol>
+     *     <li>Transitions member state to {@link 
MemberState#PREPARE_LEAVING}.</li>
+     *     <li>Skips the invocation of the revocation callback or lost 
callback.</li>
+     *     <li>Clears the current and target assignment, unsubscribes from all 
topics and
+     *     transitions the member state to {@link MemberState#LEAVING}.</li>
+     * </ol>
+     * States {@link MemberState#PREPARE_LEAVING} and {@link 
MemberState#LEAVING} cause the heartbeat request manager
+     * to send a leave group heartbeat.
+     * </p>
+     *
+     * @return future that will complete when the heartbeat to leave the group 
has been sent out.
+     */
+    public CompletableFuture<Void> leaveGroupOnClose() {
+        return leaveGroup(true);
+    }
+
     /**
      * Leaves the group.
      *
@@ -589,8 +610,8 @@ public class StreamsMembershipManager implements 
RequestManager {
      * This method does the following:
      * <ol>
      *     <li>Transitions member state to {@link 
MemberState#PREPARE_LEAVING}.</li>
-     *     <li>Requests the invocation of the revocation callback.</li>
-     *     <li>Once the revocation callback completes, it clears the current 
and target assignment, unsubscribes from
+     *     <li>Requests the invocation of the revocation callback or lost 
callback.</li>
+     *     <li>Once the callback completes, it clears the current and target 
assignment, unsubscribes from
      *     all topics and transitions the member state to {@link 
MemberState#LEAVING}.</li>
      * </ol>
      * States {@link MemberState#PREPARE_LEAVING} and {@link 
MemberState#LEAVING} cause the heartbeat request manager
@@ -601,6 +622,10 @@ public class StreamsMembershipManager implements 
RequestManager {
      *         to leave the group has been sent out.
      */
     public CompletableFuture<Void> leaveGroup() {
+        return leaveGroup(false);
+    }
+
+    private CompletableFuture<Void> leaveGroup(final boolean isOnClose) {
         if (isNotInGroup()) {
             if (state == MemberState.FENCED) {
                 clearTaskAndPartitionAssignment();
@@ -615,16 +640,21 @@ public class StreamsMembershipManager implements 
RequestManager {
             return leaveGroupInProgress.get();
         }
 
+        transitionTo(MemberState.PREPARE_LEAVING);
         CompletableFuture<Void> onGroupLeft = new CompletableFuture<>();
         leaveGroupInProgress = Optional.of(onGroupLeft);
-        CompletableFuture<Void> onAllTasksRevokedCallbackExecuted = 
prepareLeaving();
-        onAllTasksRevokedCallbackExecuted.whenComplete((__, callbackError) -> 
leaving(callbackError));
+        if (!isOnClose) {
+            CompletableFuture<Void> onAllActiveTasksReleasedCallbackExecuted = 
releaseActiveTasks();
+            onAllActiveTasksReleasedCallbackExecuted
+                .whenComplete((__, callbackError) -> 
leavingAfterReleasingActiveTasks(callbackError));
+        } else {
+            leaving();
+        }
 
         return onGroupLeft;
     }
 
-    private CompletableFuture<Void> prepareLeaving() {
-        transitionTo(MemberState.PREPARE_LEAVING);
+    private CompletableFuture<Void> releaseActiveTasks() {
         if (memberEpoch > 0) {
             return 
revokeActiveTasks(toTaskIdSet(currentAssignment.activeTasks));
         } else {
@@ -632,7 +662,7 @@ public class StreamsMembershipManager implements 
RequestManager {
         }
     }
 
-    private void leaving(Throwable callbackError) {
+    private void leavingAfterReleasingActiveTasks(Throwable callbackError) {
         if (callbackError != null) {
             log.error("Member {} callback to revoke task assignment failed. It 
will proceed " +
                     "to clear its assignment and send a leave group heartbeat",
@@ -642,6 +672,10 @@ public class StreamsMembershipManager implements 
RequestManager {
                     "to clear its assignment and send a leave group heartbeat",
                 memberIdInfoForLog());
         }
+        leaving();
+    }
+
+    private void leaving() {
         clearTaskAndPartitionAssignment();
         subscriptionState.unsubscribe();
         transitionToSendingLeaveGroup(false);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index e2cf2526d38..6c8333a5f04 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -455,12 +455,15 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final LeaveGroupOnCloseEvent event) {
-        if (requestManagers.consumerMembershipManager.isEmpty())
-            return;
-
-        log.debug("Signal the ConsumerMembershipManager to leave the consumer 
group since the consumer is closing");
-        CompletableFuture<Void> future = 
requestManagers.consumerMembershipManager.get().leaveGroupOnClose();
-        future.whenComplete(complete(event.future()));
+        if (requestManagers.consumerMembershipManager.isPresent()) {
+            CompletableFuture<Void> future = 
requestManagers.consumerMembershipManager.get().leaveGroupOnClose();
+            future.whenComplete(complete(event.future()));
+            log.debug("Signal the ConsumerMembershipManager to leave the 
consumer group since the consumer is closing");
+        } else if (requestManagers.streamsMembershipManager.isPresent()) {
+            CompletableFuture<Void> future = 
requestManagers.streamsMembershipManager.get().leaveGroupOnClose();
+            future.whenComplete(complete(event.future()));
+            log.debug("Signal the StreamsMembershipManager to leave the 
Streams group since the member is closing");
+        }
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 1ee0975a8cb..ea0b3fa9d9e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackNeededEvent;
@@ -35,8 +34,6 @@ import org.apache.kafka.common.utils.Time;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -49,6 +46,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -89,11 +87,7 @@ public class StreamsMembershipManagerTest {
     @Mock
     private StreamsAssignmentInterface streamsAssignmentInterface;
 
-    @Captor
-    private ArgumentCaptor<StreamsOnTasksAssignedCallbackNeededEvent> 
onAssignmentCallbackNeededEventCaptor;
-
     private Queue<BackgroundEvent> backgroundEventQueue = new LinkedList<>();
-    private BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
 
     @BeforeEach
     public void setup() {
@@ -355,7 +349,16 @@ public class StreamsMembershipManagerTest {
 
     @Test
     public void testLeaveGroupWhenNotInGroup() {
-        final CompletableFuture<Void> future = membershipManager.leaveGroup();
+        testLeaveGroupWhenNotInGroup(membershipManager::leaveGroup);
+    }
+
+    @Test
+    public void testLeaveGroupOnCloseWhenNotInGroup() {
+        testLeaveGroupWhenNotInGroup(membershipManager::leaveGroupOnClose);
+    }
+
+    private void testLeaveGroupWhenNotInGroup(final 
Supplier<CompletableFuture<Void>> leaveGroup) {
+        final CompletableFuture<Void> future = leaveGroup.get();
 
         assertFalse(membershipManager.isLeavingGroup());
         assertTrue(future.isDone());
@@ -367,12 +370,21 @@ public class StreamsMembershipManagerTest {
 
     @Test
     public void testLeaveGroupWhenNotInGroupAndFenced() {
+        
testLeaveGroupOnCloseWhenNotInGroupAndFenced(membershipManager::leaveGroup);
+    }
+
+    @Test
+    public void testLeaveGroupOnCloseWhenNotInGroupAndFenced() {
+        
testLeaveGroupOnCloseWhenNotInGroupAndFenced(membershipManager::leaveGroupOnClose);
+    }
+
+    private void testLeaveGroupOnCloseWhenNotInGroupAndFenced(final 
Supplier<CompletableFuture<Void>> leaveGroup) {
         final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new 
CompletableFuture<>();
         
when(streamsAssignmentInterface.requestOnAllTasksLostCallbackInvocation())
             .thenReturn(onAllTasksLostCallbackExecuted);
         joining();
         fenced();
-        final CompletableFuture<Void> future = membershipManager.leaveGroup();
+        final CompletableFuture<Void> future = leaveGroup.get();
 
         assertFalse(membershipManager.isLeavingGroup());
         assertTrue(future.isDone());
@@ -405,14 +417,49 @@ public class StreamsMembershipManagerTest {
         verifyInStatePrepareLeaving(membershipManager);
         final CompletableFuture<Void> onGroupLeftBeforeRevocationCallback = 
membershipManager.leaveGroup();
         assertEquals(onGroupLeft, onGroupLeftBeforeRevocationCallback);
+        final CompletableFuture<Void> 
onGroupLeftOnCloseBeforeRevocationCallback = 
membershipManager.leaveGroupOnClose();
+        assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeRevocationCallback);
         onTasksRevokedCallbackExecuted.complete(null);
         verify(subscriptionState).unsubscribe();
         assertFalse(onGroupLeft.isDone());
         verifyInStateLeaving(membershipManager);
         final CompletableFuture<Void> onGroupLeftAfterRevocationCallback = 
membershipManager.leaveGroup();
         assertEquals(onGroupLeft, onGroupLeftAfterRevocationCallback);
-        membershipManager.transitionToUnsubscribeIfLeaving();
+        membershipManager.onHeartbeatRequestGenerated();
+        verifyInStateUnsubscribed(membershipManager);
+        
membershipManager.onHeartbeatSuccess(makeHeartbeatResponse(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        assertTrue(onGroupLeft.isDone());
+        assertFalse(onGroupLeft.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
+        
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
 TOPIC_0);
+        final Set<StreamsAssignmentInterface.TaskId> activeTasks =
+            Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks)))
+            .thenReturn(onTasksAssignedCallbackExecutedSetup);
+        joining();
+        reconcile(makeHeartbeatResponse(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+        final CompletableFuture<Void> onGroupLeft = 
membershipManager.leaveGroupOnClose();
+
+        assertFalse(onGroupLeft.isDone());
+        verifyInStateLeaving(membershipManager);
+        verify(subscriptionState).unsubscribe();
+        verify(streamsAssignmentInterface, 
never()).requestOnTasksRevokedCallbackInvocation(any());
+        final CompletableFuture<Void> 
onGroupLeftBeforeHeartbeatRequestGenerated = membershipManager.leaveGroup();
+        assertEquals(onGroupLeft, onGroupLeftBeforeHeartbeatRequestGenerated);
+        final CompletableFuture<Void> 
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated = 
membershipManager.leaveGroupOnClose();
+        assertEquals(onGroupLeft, 
onGroupLeftOnCloseBeforeHeartbeatRequestGenerated);
+        assertFalse(onGroupLeft.isDone());
+        membershipManager.onHeartbeatRequestGenerated();
         verifyInStateUnsubscribed(membershipManager);
+        
membershipManager.onHeartbeatSuccess(makeHeartbeatResponse(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        assertTrue(onGroupLeft.isDone());
+        assertFalse(onGroupLeft.isCompletedExceptionally());
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 511e602b9bf..7e808935951 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import 
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
 import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
 import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
 import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
@@ -63,8 +64,10 @@ import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEven
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -586,6 +589,69 @@ public class ApplicationEventProcessorTest {
         
verify(streamsMembershipManager).onAllTasksLostCallbackCompleted(event);
     }
 
+
+    @Test
+    public void testLeaveOnCloseWithStreamsMembershipManager() {
+        LeaveGroupOnCloseEvent event = new 
LeaveGroupOnCloseEvent(ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        when(streamsMembershipManager.leaveGroupOnClose()).thenReturn(future);
+
+        setupProcessorWithStreamsMembershipManager();
+        processor.process(event);
+        verifyLeaveOnClose(event, future);
+    }
+
+    @Test
+    public void 
testLeaveOnCloseCompletesExceptionallyWithStreamsMembershipManager() {
+        LeaveGroupOnCloseEvent event = new 
LeaveGroupOnCloseEvent(ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        when(streamsMembershipManager.leaveGroupOnClose()).thenReturn(future);
+
+        setupProcessorWithStreamsMembershipManager();
+        processor.process(event);
+        verifyLeaveOnCloseCompletesExceptionally(event, future);
+    }
+
+    @Test
+    public void testLeaveOnCloseWithConsumerMembershipManager() {
+        LeaveGroupOnCloseEvent event = new 
LeaveGroupOnCloseEvent(ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        when(membershipManager.leaveGroupOnClose()).thenReturn(future);
+
+        setupProcessor(true);
+        processor.process(event);
+        verifyLeaveOnClose(event, future);
+    }
+
+    @Test
+    public void 
testLeaveOnCloseCompletesExceptionallyWithConsumerMembershipManager() {
+        LeaveGroupOnCloseEvent event = new 
LeaveGroupOnCloseEvent(ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        when(membershipManager.leaveGroupOnClose()).thenReturn(future);
+
+        setupProcessor(true);
+        processor.process(event);
+        verifyLeaveOnCloseCompletesExceptionally(event, future);
+    }
+
+    private void verifyLeaveOnClose(LeaveGroupOnCloseEvent event, 
CompletableFuture<Void> future) {
+        assertFalse(event.future().isDone());
+        future.complete(null);
+        assertTrue(event.future().isDone());
+        assertFalse(event.future().isCompletedExceptionally());
+    }
+
+    private void 
verifyLeaveOnCloseCompletesExceptionally(LeaveGroupOnCloseEvent event, 
CompletableFuture<Void> future) {
+        assertFalse(event.future().isDone());
+        RuntimeException exception = new RuntimeException("Nobody expects the 
Spanish Inquisition!");
+        future.completeExceptionally(exception);
+        assertTrue(event.future().isDone());
+        assertTrue(event.future().isCompletedExceptionally());
+        ExecutionException thrown = assertThrows(ExecutionException.class, () 
-> event.future().get());
+        assertInstanceOf(RuntimeException.class, thrown.getCause());
+        assertEquals(exception.getMessage(), thrown.getCause().getMessage());
+    }
+
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
         return 
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
     }

Reply via email to