This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f28713f MINOR: Add unit tests for StreamsRebalanceListener (#9258) f28713f is described below commit f28713f92218f41d21d5149cdc6034fa374821ca Author: Bruno Cadonna <br...@confluent.io> AuthorDate: Wed Sep 16 01:25:19 2020 +0200 MINOR: Add unit tests for StreamsRebalanceListener (#9258) Reviewers: Walker Carlson <wcarl...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../internals/StreamsRebalanceListenerTest.java | 56 +++++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java index cc81fe3..b8ccc94 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java @@ -16,13 +16,16 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.processor.internals.StreamThread.State; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; +import org.junit.Before; import org.junit.Test; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; @@ -47,6 +50,13 @@ public class StreamsRebalanceListenerTest { assignmentErrorCode ); + @Before + public void before() { + expect(streamThread.state()).andStubReturn(null); + expect(taskManager.activeTaskIds()).andStubReturn(null); + expect(taskManager.standbyTaskIds()).andStubReturn(null); + } + @Test public void shouldThrowMissingSourceTopicException() { replay(taskManager, streamThread); @@ -61,9 +71,9 @@ public class StreamsRebalanceListenerTest { } @Test - public void shouldHandleOnPartitionAssigned() { + public void shouldHandleAssignedPartitions() { taskManager.handleRebalanceComplete(); - expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(null); + expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING); replay(taskManager, streamThread); assignmentErrorCode.set(AssignorError.NONE.code()); @@ -71,4 +81,46 @@ public class StreamsRebalanceListenerTest { verify(taskManager, streamThread); } + + @Test + public void shouldHandleRevokedPartitions() { + final Collection<TopicPartition> partitions = Collections.singletonList(new TopicPartition("topic", 0)); + expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING); + taskManager.handleRevocation(partitions); + replay(streamThread, taskManager); + + streamsRebalanceListener.onPartitionsRevoked(partitions); + + verify(taskManager, streamThread); + } + + @Test + public void shouldNotHandleRevokedPartitionsIfStateCannotTransitToPartitionRevoked() { + expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(null); + replay(streamThread, taskManager); + + streamsRebalanceListener.onPartitionsRevoked(Collections.singletonList(new TopicPartition("topic", 0))); + + verify(taskManager, streamThread); + } + + @Test + public void shouldNotHandleEmptySetOfRevokedPartitions() { + expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING); + replay(streamThread, taskManager); + + streamsRebalanceListener.onPartitionsRevoked(Collections.emptyList()); + + verify(taskManager, streamThread); + } + + @Test + public void shouldHandleLostPartitions() { + taskManager.handleLostAll(); + replay(streamThread, taskManager); + + streamsRebalanceListener.onPartitionsLost(Collections.singletonList(new TopicPartition("topic", 0))); + + verify(taskManager, streamThread); + } } \ No newline at end of file