This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f7631c8c65 MINOR: Fix StreamsRebalanceListenerInvoker (#20575)
1f7631c8c65 is described below

commit 1f7631c8c656cff27d36e6a4fa9118888effbd02
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Sep 24 09:03:07 2025 +0200

    MINOR: Fix StreamsRebalanceListenerInvoker (#20575)
    
    StreamsRebalanceListenerInvoker was implemented to match the behavior of
    ConsumerRebalanceListenerInvoker, however StreamsRebalanceListener has a
    subtly different interface than ConsumerRebalanceListener - it does not
    throw exceptions, but returns it as an Optional.
    
    In the interest of consistency, this change fixes this mismatch by
    changing the StreamsRebalanceListener interface to behave more like the
    ConsumerRebalanceListener - throwing exceptions directly.
    
    In another minor fix, the StreamsRebalanceListenerInvoker is changed to
    simply skip callback execution instead of throwing an
    IllegalStateException when no streamRebalanceListener is defined. This
    can happen when the consumer is closed before Consumer.subscribe is
    called.
    
    Reviewers: Lianet Magrans <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../internals/StreamsRebalanceListener.java        | 13 ++--
 .../internals/StreamsRebalanceListenerInvoker.java |  8 +--
 .../consumer/internals/AsyncKafkaConsumerTest.java |  2 -
 .../StreamsRebalanceListenerInvokerTest.java       | 33 +++-------
 .../kafka/api/AuthorizerIntegrationTest.scala      | 11 +---
 .../kafka/api/IntegrationTestHarness.scala         | 10 +--
 .../internals/DefaultStreamsRebalanceListener.java | 72 +++++++++-------------
 .../DefaultStreamsRebalanceListenerTest.java       | 34 +++++-----
 8 files changed, 66 insertions(+), 117 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListener.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListener.java
index 55de41d77a5..2c8a7449a72 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListener.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListener.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -28,22 +27,18 @@ public interface StreamsRebalanceListener {
      * Called when tasks are revoked from a stream thread.
      *
      * @param tasks The tasks to be revoked.
-     * @return The exception thrown during the callback, if any.
      */
-    Optional<Exception> onTasksRevoked(final Set<StreamsRebalanceData.TaskId> 
tasks);
+    void onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks);
 
     /**
      * Called when tasks are assigned from a stream thread.
      *
      * @param assignment The tasks assigned.
-     * @return The exception thrown during the callback, if any.
      */
-    Optional<Exception> onTasksAssigned(final StreamsRebalanceData.Assignment 
assignment);
+    void onTasksAssigned(final StreamsRebalanceData.Assignment assignment);
 
     /**
-     * Called when a stream thread loses all assigned tasks.
-     *
-     * @return The exception thrown during the callback, if any.
+     * Called when a stream thread loses all assigned tasks
      */
-    Optional<Exception> onAllTasksLost();
+    void onAllTasksLost();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java
index f4c5aa4addc..fc8c78c13dc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java
@@ -51,14 +51,14 @@ public class StreamsRebalanceListenerInvoker {
 
     public Exception invokeAllTasksRevoked() {
         if (listener.isEmpty()) {
-            throw new IllegalStateException("StreamsRebalanceListener is not 
defined");
+            return null;
         }
         return 
invokeTasksRevoked(streamsRebalanceData.reconciledAssignment().activeTasks());
     }
 
     public Exception invokeTasksAssigned(final StreamsRebalanceData.Assignment 
assignment) {
         if (listener.isEmpty()) {
-            throw new IllegalStateException("StreamsRebalanceListener is not 
defined");
+            return null;
         }
         log.info("Invoking tasks assigned callback for new assignment: {}", 
assignment);
         try {
@@ -78,7 +78,7 @@ public class StreamsRebalanceListenerInvoker {
 
     public Exception invokeTasksRevoked(final Set<StreamsRebalanceData.TaskId> 
tasks) {
         if (listener.isEmpty()) {
-            throw new IllegalStateException("StreamsRebalanceListener is not 
defined");
+            return null;
         }
         log.info("Invoking task revoked callback for revoked active tasks {}", 
tasks);
         try {
@@ -98,7 +98,7 @@ public class StreamsRebalanceListenerInvoker {
 
     public Exception invokeAllTasksLost() {
         if (listener.isEmpty()) {
-            throw new IllegalStateException("StreamsRebalanceListener is not 
defined");
+            return null;
         }
         log.info("Invoking tasks lost callback for all tasks");
         try {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 9aef4cf7f52..8e44b3fcc25 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -2218,7 +2218,6 @@ public class AsyncKafkaConsumerTest {
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
             consumer = 
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), 
streamsRebalanceData);
             StreamsRebalanceListener mockStreamsListener = 
mock(StreamsRebalanceListener.class);
-            
when(mockStreamsListener.onTasksRevoked(any())).thenReturn(Optional.empty());
             consumer.subscribe(singletonList("topic"), mockStreamsListener);
             final MemberStateListener groupMetadataUpdateListener = 
captureGroupMetadataUpdateListener(requestManagers);
             final int memberEpoch = 42;
@@ -2239,7 +2238,6 @@ public class AsyncKafkaConsumerTest {
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
             consumer = 
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), 
streamsRebalanceData);
             StreamsRebalanceListener mockStreamsListener = 
mock(StreamsRebalanceListener.class);
-            
when(mockStreamsListener.onAllTasksLost()).thenReturn(Optional.empty());
             consumer.subscribe(singletonList("topic"), mockStreamsListener);
             final MemberStateListener groupMetadataUpdateListener = 
captureGroupMetadataUpdateListener(requestManagers);
             final int memberEpoch = 0;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
index 2f3e5ab0523..749a4594ab8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
@@ -28,7 +28,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
-import java.util.Optional;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,7 +72,6 @@ public class StreamsRebalanceListenerInvokerTest {
 
         StreamsRebalanceData.Assignment mockAssignment = 
createMockAssignment();
         
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
-        
when(secondListener.onTasksRevoked(any())).thenReturn(Optional.empty());
 
         // Set first listener
         invoker.setRebalanceListener(firstListener);
@@ -89,21 +87,10 @@ public class StreamsRebalanceListenerInvokerTest {
 
     @Test
     public void testInvokeMethodsWithNoListener() {
-        IllegalStateException exception1 = 
assertThrows(IllegalStateException.class, 
-            () -> invoker.invokeAllTasksRevoked());
-        assertEquals("StreamsRebalanceListener is not defined", 
exception1.getMessage());
-
-        IllegalStateException exception2 = 
assertThrows(IllegalStateException.class, 
-            () -> invoker.invokeTasksAssigned(createMockAssignment()));
-        assertEquals("StreamsRebalanceListener is not defined", 
exception2.getMessage());
-
-        IllegalStateException exception3 = 
assertThrows(IllegalStateException.class, 
-            () -> invoker.invokeTasksRevoked(createMockTasks()));
-        assertEquals("StreamsRebalanceListener is not defined", 
exception3.getMessage());
-
-        IllegalStateException exception4 = 
assertThrows(IllegalStateException.class, 
-            () -> invoker.invokeAllTasksLost());
-        assertEquals("StreamsRebalanceListener is not defined", 
exception4.getMessage());
+        assertNull(invoker.invokeAllTasksRevoked());
+        assertNull(invoker.invokeTasksAssigned(createMockAssignment()));
+        assertNull(invoker.invokeTasksRevoked(createMockTasks()));
+        assertNull(invoker.invokeAllTasksLost());
     }
 
     @Test
@@ -112,8 +99,7 @@ public class StreamsRebalanceListenerInvokerTest {
         
         StreamsRebalanceData.Assignment mockAssignment = 
createMockAssignment();
         
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
-        when(mockListener.onTasksRevoked(any())).thenReturn(Optional.empty());
-        
+
         Exception result = invoker.invokeAllTasksRevoked();
         
         assertNull(result);
@@ -124,8 +110,7 @@ public class StreamsRebalanceListenerInvokerTest {
     public void testInvokeTasksAssignedWithListener() {
         invoker.setRebalanceListener(mockListener);
         StreamsRebalanceData.Assignment assignment = createMockAssignment();
-        
when(mockListener.onTasksAssigned(assignment)).thenReturn(Optional.empty());
-        
+
         Exception result = invoker.invokeTasksAssigned(assignment);
         
         assertNull(result);
@@ -177,8 +162,7 @@ public class StreamsRebalanceListenerInvokerTest {
     public void testInvokeTasksRevokedWithListener() {
         invoker.setRebalanceListener(mockListener);
         Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
-        when(mockListener.onTasksRevoked(tasks)).thenReturn(Optional.empty());
-        
+
         Exception result = invoker.invokeTasksRevoked(tasks);
         
         assertNull(result);
@@ -229,8 +213,7 @@ public class StreamsRebalanceListenerInvokerTest {
     @Test
     public void testInvokeAllTasksLostWithListener() {
         invoker.setRebalanceListener(mockListener);
-        when(mockListener.onAllTasksLost()).thenReturn(Optional.empty());
-        
+
         Exception result = invoker.invokeAllTasksLost();
         
         assertNull(result);
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f950362354c..bfcc0bb0d4f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3912,14 +3912,9 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     consumer.subscribe(
       if (topicAsSourceTopic || topicAsRepartitionSourceTopic) 
util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic),
       new StreamsRebalanceListener {
-        override def onTasksRevoked(tasks: 
util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
-          Optional.empty()
-
-        override def onTasksAssigned(assignment: 
StreamsRebalanceData.Assignment): Optional[Exception] =
-          Optional.empty()
-
-        override def onAllTasksLost(): Optional[Exception] =
-          Optional.empty()
+        override def onTasksRevoked(tasks: 
util.Set[StreamsRebalanceData.TaskId]): Unit = ()
+        override def onTasksAssigned(assignment: 
StreamsRebalanceData.Assignment): Unit = ()
+        override def onAllTasksLost(): Unit = ()
       }
     )
     consumer.poll(Duration.ofMillis(500L))
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index fe24e45f16b..303e989e9b4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -272,13 +272,9 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     )
     consumer.subscribe(util.Set.of(inputTopic),
       new StreamsRebalanceListener {
-        override def onTasksRevoked(tasks: 
util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
-          Optional.empty()
-        override def onTasksAssigned(assignment: 
StreamsRebalanceData.Assignment): Optional[Exception] = {
-          Optional.empty()
-        }
-        override def onAllTasksLost(): Optional[Exception] =
-          Optional.empty()
+        override def onTasksRevoked(tasks: 
util.Set[StreamsRebalanceData.TaskId]): Unit = ()
+        override def onTasksAssigned(assignment: 
StreamsRebalanceData.Assignment): Unit = ()
+        override def onAllTasksLost(): Unit = ()
       })
     consumer
   }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
index a95fcef5a6c..de74b05ceb5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -52,59 +51,44 @@ public class DefaultStreamsRebalanceListener implements 
StreamsRebalanceListener
     }
 
     @Override
-    public Optional<Exception> onTasksRevoked(final 
Set<StreamsRebalanceData.TaskId> tasks) {
-        try {
-            final Map<TaskId, Set<TopicPartition>> 
activeTasksToRevokeWithPartitions =
-                pairWithTopicPartitions(tasks.stream());
-            final Set<TopicPartition> partitionsToRevoke = 
activeTasksToRevokeWithPartitions.values().stream()
-                .flatMap(Collection::stream)
-                .collect(Collectors.toSet());
+    public void onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks) {
+        final Map<TaskId, Set<TopicPartition>> 
activeTasksToRevokeWithPartitions =
+            pairWithTopicPartitions(tasks.stream());
+        final Set<TopicPartition> partitionsToRevoke = 
activeTasksToRevokeWithPartitions.values().stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
 
-            final long start = time.milliseconds();
-            try {
-                log.info("Revoking active tasks {}.", tasks);
-                taskManager.handleRevocation(partitionsToRevoke);
-            } finally {
-                log.info("partition revocation took {} ms.", 
time.milliseconds() - start);
-            }
-            if (streamThread.state() != StreamThread.State.PENDING_SHUTDOWN) {
-                streamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
-            }
-        } catch (final Exception exception) {
-            return Optional.of(exception);
+        final long start = time.milliseconds();
+        try {
+            log.info("Revoking active tasks {}.", tasks);
+            taskManager.handleRevocation(partitionsToRevoke);
+        } finally {
+            log.info("partition revocation took {} ms.", time.milliseconds() - 
start);
+        }
+        if (streamThread.state() != StreamThread.State.PENDING_SHUTDOWN) {
+            streamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
         }
-        return Optional.empty();
     }
 
     @Override
-    public Optional<Exception> onTasksAssigned(final 
StreamsRebalanceData.Assignment assignment) {
-        try {
-            final Map<TaskId, Set<TopicPartition>> activeTasksWithPartitions =
-                pairWithTopicPartitions(assignment.activeTasks().stream());
-            final Map<TaskId, Set<TopicPartition>> standbyTasksWithPartitions =
-                
pairWithTopicPartitions(Stream.concat(assignment.standbyTasks().stream(), 
assignment.warmupTasks().stream()));
+    public void onTasksAssigned(final StreamsRebalanceData.Assignment 
assignment) {
+        final Map<TaskId, Set<TopicPartition>> activeTasksWithPartitions =
+            pairWithTopicPartitions(assignment.activeTasks().stream());
+        final Map<TaskId, Set<TopicPartition>> standbyTasksWithPartitions =
+            
pairWithTopicPartitions(Stream.concat(assignment.standbyTasks().stream(), 
assignment.warmupTasks().stream()));
 
-            log.info("Processing new assignment {} from Streams Rebalance 
Protocol", assignment);
+        log.info("Processing new assignment {} from Streams Rebalance 
Protocol", assignment);
 
-            taskManager.handleAssignment(activeTasksWithPartitions, 
standbyTasksWithPartitions);
-            streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
-            taskManager.handleRebalanceComplete();
-            streamsRebalanceData.setReconciledAssignment(assignment);
-        } catch (final Exception exception) {
-            return Optional.of(exception);
-        }
-        return Optional.empty();
+        taskManager.handleAssignment(activeTasksWithPartitions, 
standbyTasksWithPartitions);
+        streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
+        taskManager.handleRebalanceComplete();
+        streamsRebalanceData.setReconciledAssignment(assignment);
     }
 
     @Override
-    public Optional<Exception> onAllTasksLost() {
-        try {
-            taskManager.handleLostAll();
-            
streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
-        } catch (final Exception exception) {
-            return Optional.of(exception);
-        }
-        return Optional.empty();
+    public void onAllTasksLost() {
+        taskManager.handleLostAll();
+        
streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
     }
 
     private Map<TaskId, Set<TopicPartition>> pairWithTopicPartitions(final 
Stream<StreamsRebalanceData.TaskId> taskIdStream) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
index 1297df7b1ee..736fa17e4a4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
@@ -32,8 +32,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
@@ -84,11 +85,9 @@ public class DefaultStreamsRebalanceListenerTest {
         ));
         when(streamThread.state()).thenReturn(state);
 
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksRevoked(
+        assertDoesNotThrow(() -> 
defaultStreamsRebalanceListener.onTasksRevoked(
             Set.of(new StreamsRebalanceData.TaskId("1", 0))
-        );
-
-        assertTrue(result.isEmpty());
+        ));
 
         final InOrder inOrder = inOrder(taskManager, streamThread);
         inOrder.verify(taskManager).handleRevocation(
@@ -109,9 +108,9 @@ public class DefaultStreamsRebalanceListenerTest {
 
         createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
 
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksRevoked(Set.of());
+        final Exception actualException = assertThrows(RuntimeException.class, 
() -> defaultStreamsRebalanceListener.onTasksRevoked(Set.of()));
 
-        assertTrue(result.isPresent());
+        assertEquals(actualException, exception);
         verify(taskManager).handleRevocation(any());
         verify(streamThread, never()).setState(any());
     }
@@ -153,9 +152,7 @@ public class DefaultStreamsRebalanceListenerTest {
             Set.of(new StreamsRebalanceData.TaskId("3", 0))
         );
 
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(assignment);
-
-        assertTrue(result.isEmpty());
+        assertDoesNotThrow(() -> 
defaultStreamsRebalanceListener.onTasksAssigned(assignment));
 
         final InOrder inOrder = inOrder(taskManager, streamThread, 
streamsRebalanceData);
         inOrder.verify(taskManager).handleAssignment(
@@ -179,11 +176,11 @@ public class DefaultStreamsRebalanceListenerTest {
         when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
         createRebalanceListenerWithRebalanceData(streamsRebalanceData);
 
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(
+        final Exception actualException = assertThrows(RuntimeException.class, 
() -> defaultStreamsRebalanceListener.onTasksAssigned(
             new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
-        );
-        assertTrue(result.isPresent());
-        assertEquals(exception, result.get());
+        ));
+
+        assertEquals(exception, actualException);
         verify(taskManager).handleAssignment(any(), any());
         verify(streamThread, 
never()).setState(StreamThread.State.PARTITIONS_ASSIGNED);
         verify(taskManager, never()).handleRebalanceComplete();
@@ -196,7 +193,7 @@ public class DefaultStreamsRebalanceListenerTest {
         when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
         createRebalanceListenerWithRebalanceData(streamsRebalanceData);
         
-        assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty());
+        assertDoesNotThrow(() -> 
defaultStreamsRebalanceListener.onAllTasksLost());
         
         final InOrder inOrder = inOrder(taskManager, streamsRebalanceData);
         inOrder.verify(taskManager).handleLostAll();
@@ -211,9 +208,10 @@ public class DefaultStreamsRebalanceListenerTest {
         final StreamsRebalanceData streamsRebalanceData = 
mock(StreamsRebalanceData.class);
         when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
         createRebalanceListenerWithRebalanceData(streamsRebalanceData);
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onAllTasksLost();
-        assertTrue(result.isPresent());
-        assertEquals(exception, result.get());
+
+        final Exception actualException = assertThrows(RuntimeException.class, 
() -> defaultStreamsRebalanceListener.onAllTasksLost());
+
+        assertEquals(exception, actualException);
         verify(taskManager).handleLostAll();
         verify(streamsRebalanceData, never()).setReconciledAssignment(any());
     }

Reply via email to