This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 659ace836ca MINOR: Replace `singleton` with `Set.of` in the 
SubscriptionStateTest (#19993)
659ace836ca is described below

commit 659ace836caa3abc59f73ef2c24d45dfc6fa30ff
Author: Lan Ding <[email protected]>
AuthorDate: Fri Jun 20 15:33:41 2025 +0800

    MINOR: Replace `singleton` with `Set.of` in the SubscriptionStateTest 
(#19993)
    
    Replace `singleton` with `Set.of` in the SubscriptionStateTest.
    
    Reviewers: Ken Huang <[email protected]>, PoAn Yang
    <[email protected]>, Yung <[email protected]>, TengYao Chi
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../consumer/internals/SubscriptionStateTest.java  | 228 ++++++++++-----------
 1 file changed, 113 insertions(+), 115 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index d77f6d808a7..4d4a725d45c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -44,7 +43,6 @@ import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
-import static java.util.Collections.singleton;
 import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
 import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -66,14 +64,14 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionAssignment() {
-        state.assignFromUser(singleton(tp0));
-        assertEquals(singleton(tp0), state.assignedPartitions());
+        state.assignFromUser(Set.of(tp0));
+        assertEquals(Set.of(tp0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
         assertFalse(state.hasAllFetchPositions());
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertEquals(1L, state.position(tp0).offset);
-        state.assignFromUser(Collections.emptySet());
+        state.assignFromUser(Set.of());
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
         assertFalse(state.isAssigned(tp0));
@@ -94,20 +92,20 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
+        state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
-        state.assignFromSubscribed(singleton(t1p0));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(t1p0)));
+        state.assignFromSubscribed(Set.of(t1p0));
         // assigned partitions should immediately change
-        assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
         // assigned partitions should remain unchanged
-        assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
         state.unsubscribe();
@@ -166,24 +164,24 @@ public class SubscriptionStateTest {
 
     @Test
     public void testGroupSubscribe() {
-        state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
-        assertEquals(singleton(topic1), state.metadataTopics());
+        state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
+        assertEquals(Set.of(topic1), state.metadataTopics());
 
-        assertFalse(state.groupSubscribe(singleton(topic1)));
-        assertEquals(singleton(topic1), state.metadataTopics());
+        assertFalse(state.groupSubscribe(Set.of(topic1)));
+        assertEquals(Set.of(topic1), state.metadataTopics());
 
         assertTrue(state.groupSubscribe(Set.of(topic, topic1)));
         assertEquals(Set.of(topic, topic1), state.metadataTopics());
 
         // `groupSubscribe` does not accumulate
-        assertFalse(state.groupSubscribe(singleton(topic1)));
-        assertEquals(singleton(topic1), state.metadataTopics());
+        assertFalse(state.groupSubscribe(Set.of(topic1)));
+        assertEquals(Set.of(topic1), state.metadataTopics());
 
-        state.subscribe(singleton("anotherTopic"), 
Optional.of(rebalanceListener));
+        state.subscribe(Set.of("anotherTopic"), 
Optional.of(rebalanceListener));
         assertEquals(Set.of(topic1, "anotherTopic"), state.metadataTopics());
 
-        assertFalse(state.groupSubscribe(singleton("anotherTopic")));
-        assertEquals(singleton("anotherTopic"), state.metadataTopics());
+        assertFalse(state.groupSubscribe(Set.of("anotherTopic")));
+        assertEquals(Set.of("anotherTopic"), state.metadataTopics());
     }
 
     @Test
@@ -193,44 +191,44 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.subscribeFromPattern(Collections.singleton(topic));
+        state.subscribeFromPattern(Set.of(topic));
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
-        state.assignFromSubscribed(singleton(tp1));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
+        state.assignFromSubscribed(Set.of(tp1));
 
         // assigned partitions should immediately change
-        assertEquals(singleton(tp1), state.assignedPartitions());
+        assertEquals(Set.of(tp1), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
-        assertEquals(singleton(topic), state.subscription());
+        assertEquals(Set.of(topic), state.subscription());
 
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
-        state.assignFromSubscribed(singleton(t1p0));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(t1p0)));
+        state.assignFromSubscribed(Set.of(t1p0));
 
         // assigned partitions should immediately change
-        assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
-        assertEquals(singleton(topic), state.subscription());
+        assertEquals(Set.of(topic), state.subscription());
 
         state.subscribe(Pattern.compile(".*t"), 
Optional.of(rebalanceListener));
         // assigned partitions should remain unchanged
-        assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
-        state.subscribeFromPattern(singleton(topic));
+        state.subscribeFromPattern(Set.of(topic));
         // assigned partitions should remain unchanged
-        assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(Set.of(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
-        state.assignFromSubscribed(singleton(tp0));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
+        state.assignFromSubscribed(Set.of(tp0));
 
         // assigned partitions should immediately change
-        assertEquals(singleton(tp0), state.assignedPartitions());
+        assertEquals(Set.of(tp0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
-        assertEquals(singleton(topic), state.subscription());
+        assertEquals(Set.of(topic), state.subscription());
 
         state.unsubscribe();
         // assigned partitions should immediately change
@@ -248,10 +246,10 @@ public class SubscriptionStateTest {
 
         state.unsubscribe();
         assertEquals(2, state.assignmentId());
-        assertEquals(Collections.emptySet(), state.assignedPartitions());
+        assertEquals(Set.of(), state.assignedPartitions());
 
         Set<TopicPartition> autoAssignment = Set.of(t1p0);
-        state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
+        state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
         assertTrue(state.checkAssignmentMatchedSubscription(autoAssignment));
         state.assignFromSubscribed(autoAssignment);
         assertEquals(3, state.assignmentId());
@@ -260,7 +258,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionReset() {
-        state.assignFromUser(singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         state.seek(tp0, 5);
         assertEquals(5L, state.position(tp0).offset);
         state.requestOffsetReset(tp0);
@@ -276,29 +274,29 @@ public class SubscriptionStateTest {
 
     @Test
     public void topicSubscription() {
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
         assertEquals(1, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
         assertTrue(state.hasAutoAssignedPartitions());
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
-        state.assignFromSubscribed(singleton(tp0));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
+        state.assignFromSubscribed(Set.of(tp0));
 
         state.seek(tp0, 1);
         assertEquals(1L, state.position(tp0).offset);
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
-        state.assignFromSubscribed(singleton(tp1));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
+        state.assignFromSubscribed(Set.of(tp1));
 
         assertTrue(state.isAssigned(tp1));
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp1));
-        assertEquals(singleton(tp1), state.assignedPartitions());
+        assertEquals(Set.of(tp1), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
     }
 
     @Test
     public void partitionPause() {
-        state.assignFromUser(singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
         state.pause(tp0);
@@ -309,10 +307,10 @@ public class SubscriptionStateTest {
 
     @Test
     public void testMarkingPartitionPending() {
-        state.assignFromUser(singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
-        state.markPendingRevocation(singleton(tp0));
+        state.markPendingRevocation(Set.of(tp0));
         assertFalse(state.isFetchable(tp0));
         assertFalse(state.isPaused(tp0));
     }
@@ -320,17 +318,17 @@ public class SubscriptionStateTest {
     @Test
     public void 
testAssignedPartitionsAwaitingCallbackKeepPositionDefinedInCallback() {
         // New partition assigned. Should not be fetchable or initializing 
positions.
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
-        state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
         assertAssignmentAppliedAwaitingCallback(tp0);
-        assertEquals(singleton(tp0.topic()), state.subscription());
+        assertEquals(Set.of(tp0.topic()), state.subscription());
 
         // Simulate callback setting position to start fetching from
         state.seek(tp0, 100);
 
         // Callback completed. Partition should be fetchable, and should not 
require
         // initializing positions (position already defined in the callback)
-        state.enablePartitionsAwaitingCallback(singleton(tp0));
+        state.enablePartitionsAwaitingCallback(Set.of(tp0));
         assertEquals(0, state.initializingPartitions().size());
         assertTrue(state.isFetchable(tp0));
         assertTrue(state.hasAllFetchPositions());
@@ -340,14 +338,14 @@ public class SubscriptionStateTest {
     @Test
     public void 
testAssignedPartitionsAwaitingCallbackInitializePositionsWhenCallbackCompletes()
 {
         // New partition assigned. Should not be fetchable or initializing 
positions.
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
-        state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
         assertAssignmentAppliedAwaitingCallback(tp0);
-        assertEquals(singleton(tp0.topic()), state.subscription());
+        assertEquals(Set.of(tp0.topic()), state.subscription());
 
         // Callback completed (without updating positions). Partition should 
require initializing
         // positions, and start fetching once a valid position is set.
-        state.enablePartitionsAwaitingCallback(singleton(tp0));
+        state.enablePartitionsAwaitingCallback(Set.of(tp0));
         assertEquals(1, state.initializingPartitions().size());
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
@@ -358,23 +356,23 @@ public class SubscriptionStateTest {
     @Test
     public void 
testAssignedPartitionsAwaitingCallbackDoesNotAffectPreviouslyOwnedPartitions() {
         // First partition assigned and callback completes.
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
-        state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
         assertAssignmentAppliedAwaitingCallback(tp0);
-        assertEquals(singleton(tp0.topic()), state.subscription());
-        state.enablePartitionsAwaitingCallback(singleton(tp0));
+        assertEquals(Set.of(tp0.topic()), state.subscription());
+        state.enablePartitionsAwaitingCallback(Set.of(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
 
         // New partition added to the assignment. Owned partitions should 
continue to be
         // fetchable, while the newly added should not be fetchable until 
callback completes.
-        state.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1), 
singleton(tp1));
+        state.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1), 
Set.of(tp1));
         assertTrue(state.isFetchable(tp0));
         assertFalse(state.isFetchable(tp1));
         assertEquals(1, state.initializingPartitions().size());
 
         // Callback completed. Added partition be initializing positions and 
become fetchable when it gets one.
-        state.enablePartitionsAwaitingCallback(singleton(tp1));
+        state.enablePartitionsAwaitingCallback(Set.of(tp1));
         assertEquals(1, state.initializingPartitions().size());
         assertEquals(tp1, state.initializingPartitions().iterator().next());
         state.seek(tp1, 200);
@@ -382,7 +380,7 @@ public class SubscriptionStateTest {
     }
 
     private void assertAssignmentAppliedAwaitingCallback(TopicPartition 
topicPartition) {
-        assertEquals(singleton(topicPartition), state.assignedPartitions());
+        assertEquals(Set.of(topicPartition), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
         assertFalse(state.isFetchable(topicPartition));
@@ -392,9 +390,9 @@ public class SubscriptionStateTest {
 
     @Test
     public void invalidPositionUpdate() {
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
-        state.assignFromSubscribed(singleton(tp0));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
+        state.assignFromSubscribed(Set.of(tp0));
 
         assertThrows(IllegalStateException.class, () -> state.position(tp0,
             new SubscriptionState.FetchPosition(0, Optional.empty(), 
leaderAndEpoch)));
@@ -402,15 +400,15 @@ public class SubscriptionStateTest {
 
     @Test
     public void cantAssignPartitionForUnsubscribedTopics() {
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
-        
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        assertFalse(state.checkAssignmentMatchedSubscription(List.of(t1p0)));
     }
 
     @Test
     public void cantAssignPartitionForUnmatchedPattern() {
         state.subscribe(Pattern.compile(".*t"), 
Optional.of(rebalanceListener));
-        state.subscribeFromPattern(Collections.singleton(topic));
-        
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
+        state.subscribeFromPattern(Set.of(topic));
+        assertFalse(state.checkAssignmentMatchedSubscription(List.of(t1p0)));
     }
 
     @Test
@@ -421,26 +419,26 @@ public class SubscriptionStateTest {
 
     @Test
     public void cantSubscribeTopicAndPattern() {
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
         assertThrows(IllegalStateException.class, () -> 
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
     }
 
     @Test
     public void cantSubscribePartitionAndPattern() {
-        state.assignFromUser(singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         assertThrows(IllegalStateException.class, () -> 
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
     }
 
     @Test
     public void cantSubscribePatternAndTopic() {
         state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
-        assertThrows(IllegalStateException.class, () -> 
state.subscribe(singleton(topic), Optional.of(rebalanceListener)));
+        assertThrows(IllegalStateException.class, () -> 
state.subscribe(Set.of(topic), Optional.of(rebalanceListener)));
     }
 
     @Test
     public void cantSubscribePatternAndPartition() {
         state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
-        assertThrows(IllegalStateException.class, () -> 
state.assignFromUser(singleton(tp0)));
+        assertThrows(IllegalStateException.class, () -> 
state.assignFromUser(Set.of(tp0)));
     }
 
     @Test
@@ -485,14 +483,14 @@ public class SubscriptionStateTest {
 
         TopicIdPartitionSet reconciledAssignmentFromRegex = new 
TopicIdPartitionSet();
         reconciledAssignmentFromRegex.addAll(Uuid.randomUuid(), topic, 
Set.of(0));
-        state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
+        state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
         assertAssignmentAppliedAwaitingCallback(tp0);
 
         // Simulate callback setting position to start fetching from
         state.seek(tp0, 100);
 
         // Callback completed. Partition should be fetchable, from the 
position previously defined
-        state.enablePartitionsAwaitingCallback(singleton(tp0));
+        state.enablePartitionsAwaitingCallback(Set.of(tp0));
         assertEquals(0, state.initializingPartitions().size());
         assertTrue(state.isFetchable(tp0));
         assertTrue(state.hasAllFetchPositions());
@@ -513,7 +511,7 @@ public class SubscriptionStateTest {
         state.setAssignedTopicIds(Set.of(firstAssignedUuid, 
secondAssignedUuid));
 
         // First reconciliation completes and updates the subscription state
-        state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
+        state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
 
         // First assignment should have been applied
         assertAssignmentAppliedAwaitingCallback(tp0);
@@ -557,16 +555,16 @@ public class SubscriptionStateTest {
     public void unsubscribeUserAssignment() {
         state.assignFromUser(Set.of(tp0, tp1));
         state.unsubscribe();
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
-        assertEquals(singleton(topic), state.subscription());
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+        assertEquals(Set.of(topic), state.subscription());
     }
 
     @Test
     public void unsubscribeUserSubscribe() {
-        state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+        state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
         state.unsubscribe();
-        state.assignFromUser(singleton(tp0));
-        assertEquals(singleton(tp0), state.assignedPartitions());
+        state.assignFromUser(Set.of(tp0));
+        assertEquals(Set.of(tp0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
     }
 
@@ -574,10 +572,10 @@ public class SubscriptionStateTest {
     public void unsubscription() {
         state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
         state.subscribeFromPattern(Set.of(topic, topic1));
-        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
-        state.assignFromSubscribed(singleton(tp1));
+        assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
+        state.assignFromSubscribed(Set.of(tp1));
 
-        assertEquals(singleton(tp1), state.assignedPartitions());
+        assertEquals(Set.of(tp1), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
         state.unsubscribe();
@@ -585,8 +583,8 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.assignFromUser(singleton(tp0));
-        assertEquals(singleton(tp0), state.assignedPartitions());
+        state.assignFromUser(Set.of(tp0));
+        assertEquals(Set.of(tp0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
         state.unsubscribe();
@@ -597,15 +595,15 @@ public class SubscriptionStateTest {
 
     @Test
     public void testPreferredReadReplicaLease() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         // Default state
         assertFalse(state.preferredReadReplica(tp0, 0L).isPresent());
 
         // Set the preferred replica with lease
         state.updatePreferredReadReplica(tp0, 42, () -> 10L);
-        TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L),  value 
-> assertEquals(value.intValue(), 42));
-        TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L),  value 
-> assertEquals(value.intValue(), 42));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L),  value 
-> assertEquals(42, value.intValue()));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L),  value 
-> assertEquals(42, value.intValue()));
         assertFalse(state.preferredReadReplica(tp0, 11L).isPresent());
 
         // Unset the preferred replica
@@ -615,20 +613,20 @@ public class SubscriptionStateTest {
 
         // Set to new preferred replica with lease
         state.updatePreferredReadReplica(tp0, 43, () -> 20L);
-        TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L),  value 
-> assertEquals(value.intValue(), 43));
-        TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L),  value 
-> assertEquals(value.intValue(), 43));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L),  value 
-> assertEquals(43, value.intValue()));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L),  value 
-> assertEquals(43, value.intValue()));
         assertFalse(state.preferredReadReplica(tp0, 21L).isPresent());
 
         // Set to new preferred replica without clearing first
         state.updatePreferredReadReplica(tp0, 44, () -> 30L);
-        TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L),  value 
-> assertEquals(value.intValue(), 44));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L),  value 
-> assertEquals(44, value.intValue()));
         assertFalse(state.preferredReadReplica(tp0, 31L).isPresent());
     }
 
     @Test
     public void testSeekUnvalidatedWithNoOffsetEpoch() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         // Seek with no offset epoch requires no validation no matter what the 
current leader is
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, 
Optional.empty(),
@@ -652,7 +650,7 @@ public class SubscriptionStateTest {
     @Test
     public void testSeekUnvalidatedWithNoEpochClearsAwaitingValidation() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         // Seek with no offset epoch requires no validation no matter what the 
current leader is
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, 
Optional.of(2),
@@ -672,7 +670,7 @@ public class SubscriptionStateTest {
         ApiVersions apiVersions = new ApiVersions();
         apiVersions.update(broker1.idString(), NodeApiVersions.create());
 
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, 
Optional.of(2),
                 new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(5))));
@@ -701,7 +699,7 @@ public class SubscriptionStateTest {
     @Test
     public void testSeekValidatedShouldClearAwaitingValidation() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, 
Optional.of(5),
                 new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
@@ -719,7 +717,7 @@ public class SubscriptionStateTest {
     @Test
     public void testCompleteValidationShouldClearAwaitingValidation() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, 
Optional.of(5),
                 new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
@@ -736,7 +734,7 @@ public class SubscriptionStateTest {
     @Test
     public void testOffsetResetWhileAwaitingValidation() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, 
Optional.of(5),
                 new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
@@ -750,7 +748,7 @@ public class SubscriptionStateTest {
     @Test
     public void testMaybeCompleteValidation() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         int currentEpoch = 10;
         long initialOffset = 10L;
@@ -777,7 +775,7 @@ public class SubscriptionStateTest {
         apiVersions.update("1", oldApis);
 
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, 
Optional.of(5),
                 new Metadata.LeaderAndEpoch(Optional.of(broker1), 
Optional.of(10))));
@@ -806,7 +804,7 @@ public class SubscriptionStateTest {
     @Test
     public void testMaybeCompleteValidationAfterPositionChange() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         int currentEpoch = 10;
         long initialOffset = 10L;
@@ -835,7 +833,7 @@ public class SubscriptionStateTest {
     @Test
     public void testMaybeCompleteValidationAfterOffsetReset() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         int currentEpoch = 10;
         long initialOffset = 10L;
@@ -861,7 +859,7 @@ public class SubscriptionStateTest {
     @Test
     public void testTruncationDetectionWithResetPolicy() {
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         int currentEpoch = 10;
         long initialOffset = 10L;
@@ -890,7 +888,7 @@ public class SubscriptionStateTest {
     public void testTruncationDetectionWithoutResetPolicy() {
         Node broker1 = new Node(1, "localhost", 9092);
         state = new SubscriptionState(new LogContext(), 
AutoOffsetResetStrategy.NONE);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         int currentEpoch = 10;
         long initialOffset = 10L;
@@ -920,7 +918,7 @@ public class SubscriptionStateTest {
     public void testTruncationDetectionUnknownDivergentOffsetWithResetPolicy() 
{
         Node broker1 = new Node(1, "localhost", 9092);
         state = new SubscriptionState(new LogContext(), 
AutoOffsetResetStrategy.EARLIEST);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         int currentEpoch = 10;
         long initialOffset = 10L;
@@ -945,7 +943,7 @@ public class SubscriptionStateTest {
     public void 
testTruncationDetectionUnknownDivergentOffsetWithoutResetPolicy() {
         Node broker1 = new Node(1, "localhost", 9092);
         state = new SubscriptionState(new LogContext(), 
AutoOffsetResetStrategy.NONE);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         int currentEpoch = 10;
         long initialOffset = 10L;
@@ -993,7 +991,7 @@ public class SubscriptionStateTest {
         // Check that offset reset works when we can't validate offsets (older 
brokers)
 
         Node broker1 = new Node(1, "localhost", 9092);
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         // Reset offsets
         state.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST);
@@ -1039,7 +1037,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void nullPositionLagOnNoPosition() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
 
         assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
         assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
@@ -1053,7 +1051,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void testPositionOrNull() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         final TopicPartition unassignedPartition = new 
TopicPartition("unassigned", 0);
         state.seek(tp0, 5);
 
@@ -1063,7 +1061,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void testTryUpdatingHighWatermark() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         final TopicPartition unassignedPartition = new 
TopicPartition("unassigned", 0);
 
         final long highWatermark = 10L;
@@ -1074,7 +1072,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void testTryUpdatingLogStartOffset() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         final TopicPartition unassignedPartition = new 
TopicPartition("unassigned", 0);
         final long position = 25;
         state.seek(tp0, position);
@@ -1087,7 +1085,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void testTryUpdatingLastStableOffset() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         final TopicPartition unassignedPartition = new 
TopicPartition("unassigned", 0);
 
         final long lastStableOffset = 10L;
@@ -1098,7 +1096,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void testTryUpdatingPreferredReadReplica() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         final TopicPartition unassignedPartition = new 
TopicPartition("unassigned", 0);
 
         final int preferredReadReplicaId = 10;
@@ -1111,7 +1109,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void testRequestOffsetResetIfPartitionAssigned() {
-        state.assignFromUser(Collections.singleton(tp0));
+        state.assignFromUser(Set.of(tp0));
         final TopicPartition unassignedPartition = new 
TopicPartition("unassigned", 0);
 
         state.requestOffsetResetIfPartitionAssigned(tp0);


Reply via email to