This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 659ace836ca MINOR: Replace `singleton` with `Set.of` in the
SubscriptionStateTest (#19993)
659ace836ca is described below
commit 659ace836caa3abc59f73ef2c24d45dfc6fa30ff
Author: Lan Ding <[email protected]>
AuthorDate: Fri Jun 20 15:33:41 2025 +0800
MINOR: Replace `singleton` with `Set.of` in the SubscriptionStateTest
(#19993)
Replace `singleton` with `Set.of` in the SubscriptionStateTest.
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, Yung <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../consumer/internals/SubscriptionStateTest.java | 228 ++++++++++-----------
1 file changed, 113 insertions(+), 115 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index d77f6d808a7..4d4a725d45c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -44,7 +43,6 @@ import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.regex.Pattern;
-import static java.util.Collections.singleton;
import static
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
import static
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -66,14 +64,14 @@ public class SubscriptionStateTest {
@Test
public void partitionAssignment() {
- state.assignFromUser(singleton(tp0));
- assertEquals(singleton(tp0), state.assignedPartitions());
+ state.assignFromUser(Set.of(tp0));
+ assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertFalse(state.hasAllFetchPositions());
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
assertEquals(1L, state.position(tp0).offset);
- state.assignFromUser(Collections.emptySet());
+ state.assignFromUser(Set.of());
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
assertFalse(state.isAssigned(tp0));
@@ -94,20 +92,20 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
- state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
+ state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
- state.assignFromSubscribed(singleton(t1p0));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(t1p0)));
+ state.assignFromSubscribed(Set.of(t1p0));
// assigned partitions should immediately change
- assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
// assigned partitions should remain unchanged
- assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
@@ -166,24 +164,24 @@ public class SubscriptionStateTest {
@Test
public void testGroupSubscribe() {
- state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
- assertEquals(singleton(topic1), state.metadataTopics());
+ state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
+ assertEquals(Set.of(topic1), state.metadataTopics());
- assertFalse(state.groupSubscribe(singleton(topic1)));
- assertEquals(singleton(topic1), state.metadataTopics());
+ assertFalse(state.groupSubscribe(Set.of(topic1)));
+ assertEquals(Set.of(topic1), state.metadataTopics());
assertTrue(state.groupSubscribe(Set.of(topic, topic1)));
assertEquals(Set.of(topic, topic1), state.metadataTopics());
// `groupSubscribe` does not accumulate
- assertFalse(state.groupSubscribe(singleton(topic1)));
- assertEquals(singleton(topic1), state.metadataTopics());
+ assertFalse(state.groupSubscribe(Set.of(topic1)));
+ assertEquals(Set.of(topic1), state.metadataTopics());
- state.subscribe(singleton("anotherTopic"),
Optional.of(rebalanceListener));
+ state.subscribe(Set.of("anotherTopic"),
Optional.of(rebalanceListener));
assertEquals(Set.of(topic1, "anotherTopic"), state.metadataTopics());
- assertFalse(state.groupSubscribe(singleton("anotherTopic")));
- assertEquals(singleton("anotherTopic"), state.metadataTopics());
+ assertFalse(state.groupSubscribe(Set.of("anotherTopic")));
+ assertEquals(Set.of("anotherTopic"), state.metadataTopics());
}
@Test
@@ -193,44 +191,44 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
- state.subscribeFromPattern(Collections.singleton(topic));
+ state.subscribeFromPattern(Set.of(topic));
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
- state.assignFromSubscribed(singleton(tp1));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
+ state.assignFromSubscribed(Set.of(tp1));
// assigned partitions should immediately change
- assertEquals(singleton(tp1), state.assignedPartitions());
+ assertEquals(Set.of(tp1), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
- assertEquals(singleton(topic), state.subscription());
+ assertEquals(Set.of(topic), state.subscription());
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
- state.assignFromSubscribed(singleton(t1p0));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(t1p0)));
+ state.assignFromSubscribed(Set.of(t1p0));
// assigned partitions should immediately change
- assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
- assertEquals(singleton(topic), state.subscription());
+ assertEquals(Set.of(topic), state.subscription());
state.subscribe(Pattern.compile(".*t"),
Optional.of(rebalanceListener));
// assigned partitions should remain unchanged
- assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
- state.subscribeFromPattern(singleton(topic));
+ state.subscribeFromPattern(Set.of(topic));
// assigned partitions should remain unchanged
- assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(Set.of(t1p0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
- state.assignFromSubscribed(singleton(tp0));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
+ state.assignFromSubscribed(Set.of(tp0));
// assigned partitions should immediately change
- assertEquals(singleton(tp0), state.assignedPartitions());
+ assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
- assertEquals(singleton(topic), state.subscription());
+ assertEquals(Set.of(topic), state.subscription());
state.unsubscribe();
// assigned partitions should immediately change
@@ -248,10 +246,10 @@ public class SubscriptionStateTest {
state.unsubscribe();
assertEquals(2, state.assignmentId());
- assertEquals(Collections.emptySet(), state.assignedPartitions());
+ assertEquals(Set.of(), state.assignedPartitions());
Set<TopicPartition> autoAssignment = Set.of(t1p0);
- state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
+ state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
assertTrue(state.checkAssignmentMatchedSubscription(autoAssignment));
state.assignFromSubscribed(autoAssignment);
assertEquals(3, state.assignmentId());
@@ -260,7 +258,7 @@ public class SubscriptionStateTest {
@Test
public void partitionReset() {
- state.assignFromUser(singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seek(tp0, 5);
assertEquals(5L, state.position(tp0).offset);
state.requestOffsetReset(tp0);
@@ -276,29 +274,29 @@ public class SubscriptionStateTest {
@Test
public void topicSubscription() {
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertEquals(1, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
assertTrue(state.hasAutoAssignedPartitions());
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
- state.assignFromSubscribed(singleton(tp0));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
+ state.assignFromSubscribed(Set.of(tp0));
state.seek(tp0, 1);
assertEquals(1L, state.position(tp0).offset);
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
- state.assignFromSubscribed(singleton(tp1));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
+ state.assignFromSubscribed(Set.of(tp1));
assertTrue(state.isAssigned(tp1));
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp1));
- assertEquals(singleton(tp1), state.assignedPartitions());
+ assertEquals(Set.of(tp1), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
}
@Test
public void partitionPause() {
- state.assignFromUser(singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.pause(tp0);
@@ -309,10 +307,10 @@ public class SubscriptionStateTest {
@Test
public void testMarkingPartitionPending() {
- state.assignFromUser(singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
- state.markPendingRevocation(singleton(tp0));
+ state.markPendingRevocation(Set.of(tp0));
assertFalse(state.isFetchable(tp0));
assertFalse(state.isPaused(tp0));
}
@@ -320,17 +318,17 @@ public class SubscriptionStateTest {
@Test
public void
testAssignedPartitionsAwaitingCallbackKeepPositionDefinedInCallback() {
// New partition assigned. Should not be fetchable or initializing
positions.
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
- state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+ state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
- assertEquals(singleton(tp0.topic()), state.subscription());
+ assertEquals(Set.of(tp0.topic()), state.subscription());
// Simulate callback setting position to start fetching from
state.seek(tp0, 100);
// Callback completed. Partition should be fetchable, and should not
require
// initializing positions (position already defined in the callback)
- state.enablePartitionsAwaitingCallback(singleton(tp0));
+ state.enablePartitionsAwaitingCallback(Set.of(tp0));
assertEquals(0, state.initializingPartitions().size());
assertTrue(state.isFetchable(tp0));
assertTrue(state.hasAllFetchPositions());
@@ -340,14 +338,14 @@ public class SubscriptionStateTest {
@Test
public void
testAssignedPartitionsAwaitingCallbackInitializePositionsWhenCallbackCompletes()
{
// New partition assigned. Should not be fetchable or initializing
positions.
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
- state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+ state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
- assertEquals(singleton(tp0.topic()), state.subscription());
+ assertEquals(Set.of(tp0.topic()), state.subscription());
// Callback completed (without updating positions). Partition should
require initializing
// positions, and start fetching once a valid position is set.
- state.enablePartitionsAwaitingCallback(singleton(tp0));
+ state.enablePartitionsAwaitingCallback(Set.of(tp0));
assertEquals(1, state.initializingPartitions().size());
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
@@ -358,23 +356,23 @@ public class SubscriptionStateTest {
@Test
public void
testAssignedPartitionsAwaitingCallbackDoesNotAffectPreviouslyOwnedPartitions() {
// First partition assigned and callback completes.
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
- state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+ state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
- assertEquals(singleton(tp0.topic()), state.subscription());
- state.enablePartitionsAwaitingCallback(singleton(tp0));
+ assertEquals(Set.of(tp0.topic()), state.subscription());
+ state.enablePartitionsAwaitingCallback(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
// New partition added to the assignment. Owned partitions should
continue to be
// fetchable, while the newly added should not be fetchable until
callback completes.
- state.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1),
singleton(tp1));
+ state.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1),
Set.of(tp1));
assertTrue(state.isFetchable(tp0));
assertFalse(state.isFetchable(tp1));
assertEquals(1, state.initializingPartitions().size());
// Callback completed. Added partition be initializing positions and
become fetchable when it gets one.
- state.enablePartitionsAwaitingCallback(singleton(tp1));
+ state.enablePartitionsAwaitingCallback(Set.of(tp1));
assertEquals(1, state.initializingPartitions().size());
assertEquals(tp1, state.initializingPartitions().iterator().next());
state.seek(tp1, 200);
@@ -382,7 +380,7 @@ public class SubscriptionStateTest {
}
private void assertAssignmentAppliedAwaitingCallback(TopicPartition
topicPartition) {
- assertEquals(singleton(topicPartition), state.assignedPartitions());
+ assertEquals(Set.of(topicPartition), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
assertFalse(state.isFetchable(topicPartition));
@@ -392,9 +390,9 @@ public class SubscriptionStateTest {
@Test
public void invalidPositionUpdate() {
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
- state.assignFromSubscribed(singleton(tp0));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp0)));
+ state.assignFromSubscribed(Set.of(tp0));
assertThrows(IllegalStateException.class, () -> state.position(tp0,
new SubscriptionState.FetchPosition(0, Optional.empty(),
leaderAndEpoch)));
@@ -402,15 +400,15 @@ public class SubscriptionStateTest {
@Test
public void cantAssignPartitionForUnsubscribedTopics() {
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
-
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+ assertFalse(state.checkAssignmentMatchedSubscription(List.of(t1p0)));
}
@Test
public void cantAssignPartitionForUnmatchedPattern() {
state.subscribe(Pattern.compile(".*t"),
Optional.of(rebalanceListener));
- state.subscribeFromPattern(Collections.singleton(topic));
-
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
+ state.subscribeFromPattern(Set.of(topic));
+ assertFalse(state.checkAssignmentMatchedSubscription(List.of(t1p0)));
}
@Test
@@ -421,26 +419,26 @@ public class SubscriptionStateTest {
@Test
public void cantSubscribeTopicAndPattern() {
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () ->
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
}
@Test
public void cantSubscribePartitionAndPattern() {
- state.assignFromUser(singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
assertThrows(IllegalStateException.class, () ->
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
}
@Test
public void cantSubscribePatternAndTopic() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
- assertThrows(IllegalStateException.class, () ->
state.subscribe(singleton(topic), Optional.of(rebalanceListener)));
+ assertThrows(IllegalStateException.class, () ->
state.subscribe(Set.of(topic), Optional.of(rebalanceListener)));
}
@Test
public void cantSubscribePatternAndPartition() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
- assertThrows(IllegalStateException.class, () ->
state.assignFromUser(singleton(tp0)));
+ assertThrows(IllegalStateException.class, () ->
state.assignFromUser(Set.of(tp0)));
}
@Test
@@ -485,14 +483,14 @@ public class SubscriptionStateTest {
TopicIdPartitionSet reconciledAssignmentFromRegex = new
TopicIdPartitionSet();
reconciledAssignmentFromRegex.addAll(Uuid.randomUuid(), topic,
Set.of(0));
- state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
+ state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
// Simulate callback setting position to start fetching from
state.seek(tp0, 100);
// Callback completed. Partition should be fetchable, from the
position previously defined
- state.enablePartitionsAwaitingCallback(singleton(tp0));
+ state.enablePartitionsAwaitingCallback(Set.of(tp0));
assertEquals(0, state.initializingPartitions().size());
assertTrue(state.isFetchable(tp0));
assertTrue(state.hasAllFetchPositions());
@@ -513,7 +511,7 @@ public class SubscriptionStateTest {
state.setAssignedTopicIds(Set.of(firstAssignedUuid,
secondAssignedUuid));
// First reconciliation completes and updates the subscription state
- state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
+ state.assignFromSubscribedAwaitingCallback(Set.of(tp0), Set.of(tp0));
// First assignment should have been applied
assertAssignmentAppliedAwaitingCallback(tp0);
@@ -557,16 +555,16 @@ public class SubscriptionStateTest {
public void unsubscribeUserAssignment() {
state.assignFromUser(Set.of(tp0, tp1));
state.unsubscribe();
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
- assertEquals(singleton(topic), state.subscription());
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
+ assertEquals(Set.of(topic), state.subscription());
}
@Test
public void unsubscribeUserSubscribe() {
- state.subscribe(singleton(topic), Optional.of(rebalanceListener));
+ state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
state.unsubscribe();
- state.assignFromUser(singleton(tp0));
- assertEquals(singleton(tp0), state.assignedPartitions());
+ state.assignFromUser(Set.of(tp0));
+ assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
}
@@ -574,10 +572,10 @@ public class SubscriptionStateTest {
public void unsubscription() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
state.subscribeFromPattern(Set.of(topic, topic1));
- assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
- state.assignFromSubscribed(singleton(tp1));
+ assertTrue(state.checkAssignmentMatchedSubscription(Set.of(tp1)));
+ state.assignFromSubscribed(Set.of(tp1));
- assertEquals(singleton(tp1), state.assignedPartitions());
+ assertEquals(Set.of(tp1), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
@@ -585,8 +583,8 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
- state.assignFromUser(singleton(tp0));
- assertEquals(singleton(tp0), state.assignedPartitions());
+ state.assignFromUser(Set.of(tp0));
+ assertEquals(Set.of(tp0), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
@@ -597,15 +595,15 @@ public class SubscriptionStateTest {
@Test
public void testPreferredReadReplicaLease() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
// Default state
assertFalse(state.preferredReadReplica(tp0, 0L).isPresent());
// Set the preferred replica with lease
state.updatePreferredReadReplica(tp0, 42, () -> 10L);
- TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L), value
-> assertEquals(value.intValue(), 42));
- TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L), value
-> assertEquals(value.intValue(), 42));
+ TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L), value
-> assertEquals(42, value.intValue()));
+ TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L), value
-> assertEquals(42, value.intValue()));
assertFalse(state.preferredReadReplica(tp0, 11L).isPresent());
// Unset the preferred replica
@@ -615,20 +613,20 @@ public class SubscriptionStateTest {
// Set to new preferred replica with lease
state.updatePreferredReadReplica(tp0, 43, () -> 20L);
- TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L), value
-> assertEquals(value.intValue(), 43));
- TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L), value
-> assertEquals(value.intValue(), 43));
+ TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L), value
-> assertEquals(43, value.intValue()));
+ TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L), value
-> assertEquals(43, value.intValue()));
assertFalse(state.preferredReadReplica(tp0, 21L).isPresent());
// Set to new preferred replica without clearing first
state.updatePreferredReadReplica(tp0, 44, () -> 30L);
- TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L), value
-> assertEquals(value.intValue(), 44));
+ TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L), value
-> assertEquals(44, value.intValue()));
assertFalse(state.preferredReadReplica(tp0, 31L).isPresent());
}
@Test
public void testSeekUnvalidatedWithNoOffsetEpoch() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
// Seek with no offset epoch requires no validation no matter what the
current leader is
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.empty(),
@@ -652,7 +650,7 @@ public class SubscriptionStateTest {
@Test
public void testSeekUnvalidatedWithNoEpochClearsAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
// Seek with no offset epoch requires no validation no matter what the
current leader is
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.of(2),
@@ -672,7 +670,7 @@ public class SubscriptionStateTest {
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(broker1.idString(), NodeApiVersions.create());
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L,
Optional.of(2),
new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(5))));
@@ -701,7 +699,7 @@ public class SubscriptionStateTest {
@Test
public void testSeekValidatedShouldClearAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L,
Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
@@ -719,7 +717,7 @@ public class SubscriptionStateTest {
@Test
public void testCompleteValidationShouldClearAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L,
Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
@@ -736,7 +734,7 @@ public class SubscriptionStateTest {
@Test
public void testOffsetResetWhileAwaitingValidation() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L,
Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
@@ -750,7 +748,7 @@ public class SubscriptionStateTest {
@Test
public void testMaybeCompleteValidation() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@@ -777,7 +775,7 @@ public class SubscriptionStateTest {
apiVersions.update("1", oldApis);
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L,
Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1),
Optional.of(10))));
@@ -806,7 +804,7 @@ public class SubscriptionStateTest {
@Test
public void testMaybeCompleteValidationAfterPositionChange() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@@ -835,7 +833,7 @@ public class SubscriptionStateTest {
@Test
public void testMaybeCompleteValidationAfterOffsetReset() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@@ -861,7 +859,7 @@ public class SubscriptionStateTest {
@Test
public void testTruncationDetectionWithResetPolicy() {
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@@ -890,7 +888,7 @@ public class SubscriptionStateTest {
public void testTruncationDetectionWithoutResetPolicy() {
Node broker1 = new Node(1, "localhost", 9092);
state = new SubscriptionState(new LogContext(),
AutoOffsetResetStrategy.NONE);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@@ -920,7 +918,7 @@ public class SubscriptionStateTest {
public void testTruncationDetectionUnknownDivergentOffsetWithResetPolicy()
{
Node broker1 = new Node(1, "localhost", 9092);
state = new SubscriptionState(new LogContext(),
AutoOffsetResetStrategy.EARLIEST);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@@ -945,7 +943,7 @@ public class SubscriptionStateTest {
public void
testTruncationDetectionUnknownDivergentOffsetWithoutResetPolicy() {
Node broker1 = new Node(1, "localhost", 9092);
state = new SubscriptionState(new LogContext(),
AutoOffsetResetStrategy.NONE);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
int currentEpoch = 10;
long initialOffset = 10L;
@@ -993,7 +991,7 @@ public class SubscriptionStateTest {
// Check that offset reset works when we can't validate offsets (older
brokers)
Node broker1 = new Node(1, "localhost", 9092);
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
// Reset offsets
state.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST);
@@ -1039,7 +1037,7 @@ public class SubscriptionStateTest {
@Test
public void nullPositionLagOnNoPosition() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
@@ -1053,7 +1051,7 @@ public class SubscriptionStateTest {
@Test
public void testPositionOrNull() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new
TopicPartition("unassigned", 0);
state.seek(tp0, 5);
@@ -1063,7 +1061,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingHighWatermark() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new
TopicPartition("unassigned", 0);
final long highWatermark = 10L;
@@ -1074,7 +1072,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingLogStartOffset() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new
TopicPartition("unassigned", 0);
final long position = 25;
state.seek(tp0, position);
@@ -1087,7 +1085,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingLastStableOffset() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new
TopicPartition("unassigned", 0);
final long lastStableOffset = 10L;
@@ -1098,7 +1096,7 @@ public class SubscriptionStateTest {
@Test
public void testTryUpdatingPreferredReadReplica() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new
TopicPartition("unassigned", 0);
final int preferredReadReplicaId = 10;
@@ -1111,7 +1109,7 @@ public class SubscriptionStateTest {
@Test
public void testRequestOffsetResetIfPartitionAssigned() {
- state.assignFromUser(Collections.singleton(tp0));
+ state.assignFromUser(Set.of(tp0));
final TopicPartition unassignedPartition = new
TopicPartition("unassigned", 0);
state.requestOffsetResetIfPartitionAssigned(tp0);