This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d087ade527e HOTFIX: Fix StreamThreadTest (#19562)
d087ade527e is described below

commit d087ade527e0679b74fa87d6b22344dc470318aa
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Fri Apr 25 15:03:39 2025 +0200

    HOTFIX: Fix StreamThreadTest (#19562)
    
    Commit 732ed06 changed the logic of handling shutdowns, but in parallel
    commit 3fae785 had introduced a new unit test for checking how to shut
    down, which was broken by the later commit.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../processor/internals/StreamThreadTest.java      | 38 ++++++++++++++--------
 1 file changed, 24 insertions(+), 14 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 35029519dfb..6fc8b4efb30 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -52,6 +53,7 @@ import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogCaptureAppender;
@@ -82,7 +84,6 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
@@ -178,6 +179,7 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -640,7 +642,7 @@ public class StreamThreadTest {
         thread.setState(State.PARTITIONS_REVOKED);
         thread.runOnceWithoutProcessingThreads();
 
-        Mockito.verify(taskManager, Mockito.never()).process(Mockito.anyInt(), 
Mockito.any());
+        Mockito.verify(taskManager, never()).process(Mockito.anyInt(), 
Mockito.any());
     }
 
     @ParameterizedTest
@@ -3800,7 +3802,7 @@ public class StreamThreadTest {
             Map.of(),
             Map.of()
         );
-        final AtomicInteger assignmentErrorCode = new AtomicInteger(0);
+        final Runnable shutdownErrorHook = mock(Runnable.class);
 
         final Properties props = configProps(false, false, false);
         final StreamsConfig config = new StreamsConfig(props);
@@ -3819,10 +3821,10 @@ public class StreamThreadTest {
             PROCESS_ID,
             CLIENT_ID,
             new LogContext(""),
-            assignmentErrorCode,
+            null,
             new AtomicLong(Long.MAX_VALUE),
             new LinkedList<>(),
-            null,
+            shutdownErrorHook,
             HANDLER,
             null,
             Optional.of(streamsRebalanceData),
@@ -3831,11 +3833,15 @@ public class StreamThreadTest {
 
         thread.setState(State.STARTING);
         thread.runOnceWithoutProcessingThreads();
-        assertEquals(0, assignmentErrorCode.get());
+        verify(shutdownErrorHook, never()).run();
 
-        streamsRebalanceData.requestShutdown();
+        streamsRebalanceData.setStatuses(List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
+                .setStatusDetail("Shutdown requested")
+        ));
         thread.runOnceWithoutProcessingThreads();
-        assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(), 
assignmentErrorCode.get());
+        verify(shutdownErrorHook).run();
     }
 
     @Test
@@ -3850,9 +3856,9 @@ public class StreamThreadTest {
             Map.of(),
             Map.of()
         );
-        final AtomicInteger assignmentErrorCode = new AtomicInteger(0);
 
         final Properties props = configProps(false, false, false);
+        final Runnable shutdownErrorHook = mock(Runnable.class);
         final StreamsConfig config = new StreamsConfig(props);
         thread = new StreamThread(
             new MockTime(1),
@@ -3869,10 +3875,10 @@ public class StreamThreadTest {
             PROCESS_ID,
             CLIENT_ID,
             new LogContext(""),
-            assignmentErrorCode,
+            null,
             new AtomicLong(Long.MAX_VALUE),
             new LinkedList<>(),
-            null,
+            shutdownErrorHook,
             HANDLER,
             null,
             Optional.of(streamsRebalanceData),
@@ -3881,11 +3887,15 @@ public class StreamThreadTest {
 
         thread.setState(State.STARTING);
         thread.runOnceWithProcessingThreads();
-        assertEquals(0, assignmentErrorCode.get());
+        verify(shutdownErrorHook, never()).run();
 
-        streamsRebalanceData.requestShutdown();
+        streamsRebalanceData.setStatuses(List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
+                .setStatusDetail("Shutdown requested")
+        ));
         thread.runOnceWithProcessingThreads();
-        assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(), 
assignmentErrorCode.get());
+        verify(shutdownErrorHook).run();
     }
 
     @Test

Reply via email to