This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d087ade527e HOTFIX: Fix StreamThreadTest (#19562) d087ade527e is described below commit d087ade527e0679b74fa87d6b22344dc470318aa Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Fri Apr 25 15:03:39 2025 +0200 HOTFIX: Fix StreamThreadTest (#19562) Commit 732ed06 changed the logic of handling shutdowns, but in parallel commit 3fae785 had introduced a new unit test for checking how to shut down, which was broken by the later commit. Reviewers: David Jacot <dja...@confluent.io> --- .../processor/internals/StreamThreadTest.java | 38 ++++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 35029519dfb..6fc8b4efb30 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.KafkaMetricsContext; @@ -52,6 +53,7 @@ import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogCaptureAppender; @@ -82,7 +84,6 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.StreamThread.State; -import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; @@ -178,6 +179,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -640,7 +642,7 @@ public class StreamThreadTest { thread.setState(State.PARTITIONS_REVOKED); thread.runOnceWithoutProcessingThreads(); - Mockito.verify(taskManager, Mockito.never()).process(Mockito.anyInt(), Mockito.any()); + Mockito.verify(taskManager, never()).process(Mockito.anyInt(), Mockito.any()); } @ParameterizedTest @@ -3800,7 +3802,7 @@ public class StreamThreadTest { Map.of(), Map.of() ); - final AtomicInteger assignmentErrorCode = new AtomicInteger(0); + final Runnable shutdownErrorHook = mock(Runnable.class); final Properties props = configProps(false, false, false); final StreamsConfig config = new StreamsConfig(props); @@ -3819,10 +3821,10 @@ public class StreamThreadTest { PROCESS_ID, CLIENT_ID, new LogContext(""), - assignmentErrorCode, + null, new AtomicLong(Long.MAX_VALUE), new LinkedList<>(), - null, + shutdownErrorHook, HANDLER, null, Optional.of(streamsRebalanceData), @@ -3831,11 +3833,15 @@ public class StreamThreadTest { thread.setState(State.STARTING); thread.runOnceWithoutProcessingThreads(); - assertEquals(0, assignmentErrorCode.get()); + verify(shutdownErrorHook, never()).run(); - streamsRebalanceData.requestShutdown(); + streamsRebalanceData.setStatuses(List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) + .setStatusDetail("Shutdown requested") + )); thread.runOnceWithoutProcessingThreads(); - assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(), assignmentErrorCode.get()); + verify(shutdownErrorHook).run(); } @Test @@ -3850,9 +3856,9 @@ public class StreamThreadTest { Map.of(), Map.of() ); - final AtomicInteger assignmentErrorCode = new AtomicInteger(0); final Properties props = configProps(false, false, false); + final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); thread = new StreamThread( new MockTime(1), @@ -3869,10 +3875,10 @@ public class StreamThreadTest { PROCESS_ID, CLIENT_ID, new LogContext(""), - assignmentErrorCode, + null, new AtomicLong(Long.MAX_VALUE), new LinkedList<>(), - null, + shutdownErrorHook, HANDLER, null, Optional.of(streamsRebalanceData), @@ -3881,11 +3887,15 @@ public class StreamThreadTest { thread.setState(State.STARTING); thread.runOnceWithProcessingThreads(); - assertEquals(0, assignmentErrorCode.get()); + verify(shutdownErrorHook, never()).run(); - streamsRebalanceData.requestShutdown(); + streamsRebalanceData.setStatuses(List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) + .setStatusDetail("Shutdown requested") + )); thread.runOnceWithProcessingThreads(); - assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(), assignmentErrorCode.get()); + verify(shutdownErrorHook).run(); } @Test