This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 16c51af4e20bfa521d994d4430c1ea61a88c0893
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Oct 16 15:16:59 2024 +0200

    Resolve conflicts from 11/25 trunk rebase - MINOR: Rebase dev branch on 
current trunk
---
 .../StreamsGroupHeartbeatRequestManager.java       |  19 +++-
 .../events/ApplicationEventProcessor.java          |   4 +
 .../StreamsGroupHeartbeatRequestManagerTest.java   |  10 +-
 .../StreamsGroupInitializeRequestManagerTest.java  |  27 ++---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 +-
 .../group/GroupCoordinatorConfigTest.java          |   1 +
 .../group/GroupMetadataManagerTest.java            |   9 +-
 .../group/streams/TaskAssignmentTestUtil.java      |   4 +-
 .../group/taskassignor/MockAssignorTest.java       |  29 +++---
 .../group/taskassignor/StickyTaskAssignorTest.java | 111 ++++++++++-----------
 10 files changed, 121 insertions(+), 97 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 7e31859571d..896800cff3c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -57,6 +57,8 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     private final Logger logger;
 
+    private final int maxPollIntervalMs;
+
     private final CoordinatorRequestManager coordinatorRequestManager;
 
     private final StreamsGroupHeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
@@ -96,7 +98,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         this.membershipManager = membershipManager;
         this.streamsGroupInitializeRequestManager = 
streamsGroupInitializeRequestManager;
         this.backgroundEventHandler = backgroundEventHandler;
-        int maxPollIntervalMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+        this.maxPollIntervalMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
         long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
         this.heartbeatState = new 
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsAssignmentInterface, 
membershipManager,
@@ -144,6 +146,10 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
         return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
     }
 
+    public ConsumerMembershipManager membershipManager() {
+        return membershipManager;
+    }
+
     @Override
     public long maximumTimeToWait(long currentTimeMs) {
         pollTimer.update(currentTimeMs);
@@ -156,6 +162,17 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
         return Math.min(pollTimer.remainingMs() / 2, 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
     }
 
+    public void resetPollTimer(final long pollMs) {
+        pollTimer.update(pollMs);
+        if (pollTimer.isExpired()) {
+            logger.warn("Time between subsequent calls to poll() was longer 
than the configured " +
+                    "max.poll.interval.ms, exceeded approximately by {} ms. 
Member {} will rejoin the group now.",
+                pollTimer.isExpiredBy(), membershipManager().memberId());
+            membershipManager().maybeRejoinStaleMember();
+        }
+        pollTimer.reset(maxPollIntervalMs);
+    }
+
     private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
                                                                      final 
boolean ignoreResponse) {
         NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 52aee8f20e0..135857a06b6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -192,6 +192,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 hrm.membershipManager().onConsumerPoll();
                 hrm.resetPollTimer(event.pollTimeMs());
             });
+            requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm 
-> {
+                hrm.membershipManager().onConsumerPoll();
+                hrm.resetPollTimer(event.pollTimeMs());
+            });
         } else {
             requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
                 hrm.membershipManager().onConsumerPoll();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 91b9b5ae45a..193c901e3b7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -54,12 +54,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -251,7 +251,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
                 Collections.singleton("sink0"),
                 Collections.singletonMap("repartition0", emptyTopicInfo),
                 Collections.singletonMap("changelog0", emptyTopicInfo),
-                Collections.singletonList(mkSet("source0", "repartition0"))
+                Collections.singletonList(Set.of("source0", "repartition0"))
             ));
         streamsAssignmentInterface.subtopologyMap().put("1",
             new Subtopology(
@@ -259,7 +259,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
                 Collections.singleton("sink1"),
                 Collections.singletonMap("repartition1", emptyTopicInfo),
                 Collections.singletonMap("changelog1", emptyTopicInfo),
-                Collections.singletonList(mkSet("source1", "repartition1"))
+                Collections.singletonList(Set.of("source1", "repartition1"))
             ));
         streamsAssignmentInterface.subtopologyMap().put("2",
             new Subtopology(
@@ -267,7 +267,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
                 Collections.singleton("sink2"),
                 Collections.singletonMap("repartition2", emptyTopicInfo),
                 Collections.singletonMap("changelog2", emptyTopicInfo),
-                Collections.singletonList(mkSet("source2", "repartition2"))
+                Collections.singletonList(Set.of("source2", "repartition2"))
             ));
 
         StreamsGroupHeartbeatResponseData data = new 
StreamsGroupHeartbeatResponseData()
@@ -296,7 +296,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
         assertEquals(1000, response.heartbeatIntervalMs());
         final List<TopicPartitions> tps = 
response.assignment().topicPartitions();
         assertEquals(2, tps.size());
-        assertEquals(mkSet(uuid0, uuid1), 
tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet()));
+        assertEquals(Set.of(uuid0, uuid1), 
tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet()));
         assertEquals(Collections.singletonList(0), tps.get(0).partitions());
         assertEquals(Collections.singletonList(0), tps.get(1).partitions());
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
index f5bc091e7b0..35eeaaf4baf 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
@@ -36,7 +36,6 @@ import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -71,8 +70,8 @@ class StreamsGroupInitializeRequestManagerTest {
         final CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(node));
         final StreamsAssignmentInterface streamsAssignmentInterface = 
mock(StreamsAssignmentInterface.class);
-        final Set<String> sourceTopics = mkSet("sourceTopic1", "sourceTopic2");
-        final Set<String> sinkTopics = mkSet("sinkTopic1", "sinkTopic2", 
"sinkTopic3");
+        final Set<String> sourceTopics = Set.of("sourceTopic1", 
"sourceTopic2");
+        final Set<String> sinkTopics = Set.of("sinkTopic1", "sinkTopic2", 
"sinkTopic3");
         final Map<String, StreamsAssignmentInterface.TopicInfo> 
repartitionTopics = mkMap(
             mkEntry("repartitionTopic1", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(2), Optional.of((short) 1), 
Collections.emptyMap())),
             mkEntry("repartitionTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(3), Optional.of((short) 3), 
Collections.emptyMap()))
@@ -82,9 +81,9 @@ class StreamsGroupInitializeRequestManagerTest {
             mkEntry("changelogTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 2), 
Collections.emptyMap())),
             mkEntry("changelogTopic3", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 3), 
Collections.emptyMap()))
         );
-        final Collection<Set<String>> copartitionGroup = mkSet(
-            mkSet("sourceTopic1", "repartitionTopic2"),
-            mkSet("sourceTopic2", "repartitionTopic1")
+        final Collection<Set<String>> copartitionGroup = Set.of(
+            Set.of("sourceTopic1", "repartitionTopic2"),
+            Set.of("sourceTopic2", "repartitionTopic1")
         );
         final StreamsAssignmentInterface.Subtopology subtopology1 = new 
StreamsAssignmentInterface.Subtopology(
             sourceTopics,
@@ -139,11 +138,15 @@ class StreamsGroupInitializeRequestManagerTest {
         });
 
         assertEquals(2, subtopology.copartitionGroups().size());
-        final StreamsGroupInitializeRequestData.CopartitionGroup 
copartitionGroupData1 = subtopology.copartitionGroups().get(0);
-        assertEquals(Collections.singletonList((short) 0), 
copartitionGroupData1.sourceTopics());
-        assertEquals(Collections.singletonList((short) 1), 
copartitionGroupData1.repartitionSourceTopics());
-        final StreamsGroupInitializeRequestData.CopartitionGroup 
copartitionGroupData2 = subtopology.copartitionGroups().get(1);
-        assertEquals(Collections.singletonList((short) 1), 
copartitionGroupData2.sourceTopics());
-        assertEquals(Collections.singletonList((short) 0), 
copartitionGroupData2.repartitionSourceTopics());
+        final StreamsGroupInitializeRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
+            new StreamsGroupInitializeRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
0))
+                .setSourceTopics(Collections.singletonList((short) 1));
+        final StreamsGroupInitializeRequestData.CopartitionGroup 
expectedCopartitionGroupData2 =
+            new StreamsGroupInitializeRequestData.CopartitionGroup()
+                .setRepartitionSourceTopics(Collections.singletonList((short) 
1))
+                .setSourceTopics(Collections.singletonList((short) 0));
+        
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1));
+        
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2));
     }
 }
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3b9aa091b8a..378709eda0c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -600,8 +600,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
         "This is part of the early access of KIP-932 and MUST NOT be used in 
production.")
     }
     if (protocols.contains(GroupType.STREAMS)) {
-      if (processRoles.isEmpty) {
-        throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance 
protocol is only supported in KRaft cluster.")
+      if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+        throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance 
protocol is only supported in KRaft cluster with the new group coordinator.")
       }
       warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled 
along with the new group coordinator. " +
         "This is part of the preview of KIP-1071 and MUST NOT be used in 
production.")
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index baf55529fa5..decfaea980e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -186,6 +186,7 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
50000);
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
50000);
         assertEquals("group.share.heartbeat.interval.ms must be less than 
group.share.session.timeout.ms",
+                assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
         
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0);
         assertEquals("Invalid value 0 for configuration 
group.streams.session.timeout.ms: Value must be at least 1",
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index bf458507eac..8ff26e74f35 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -576,14 +576,15 @@ public class GroupMetadataManagerTest {
         assertEquals(5, coordinatorRecords.size());
         
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
 2)));
         
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 subtopologies)));
+
         org.apache.kafka.coordinator.group.streams.TopicMetadata topicMetadata 
= new org.apache.kafka.coordinator.group.streams.TopicMetadata(
             inputTopicId,
             inputTopicName,
             3,
             mkMap(
-                mkEntry(0, mkSet("rack0", "rack1")),
-                mkEntry(1, mkSet("rack1", "rack2")),
-                mkEntry(2, mkSet("rack2", "rack3"))
+                mkEntry(0, new HashSet<>(List.of("rack1", "rack0"))),
+                mkEntry(1, new HashSet<>(List.of("rack2", "rack1"))),
+                mkEntry(2, new HashSet<>(List.of("rack2", "rack3")))
             )
         );
         assertTrue(coordinatorRecords.contains(
@@ -837,7 +838,7 @@ public class GroupMetadataManagerTest {
                         mkMap(
                             mkEntry(
                                 subtopologyId,
-                                mkSet(0, 1, 2)
+                                new HashSet<>(List.of(0, 1, 2))
                             )
                         ),
                         Collections.emptyMap(),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
index c3980d0343f..bcf1ba65be7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
@@ -19,10 +19,10 @@ package org.apache.kafka.coordinator.group.streams;
 import org.apache.kafka.coordinator.group.taskassignor.AssignmentMemberSpec;
 
 import java.util.AbstractMap;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -73,7 +73,7 @@ public class TaskAssignmentTestUtil {
     ) {
         return new AbstractMap.SimpleEntry<>(
             subtopologyId,
-            new HashSet<>(Arrays.asList(tasks))
+            new HashSet<>(List.of(tasks))
         );
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
index a63b2e7c7ca..e20ba62e5a5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
@@ -21,13 +21,14 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -81,7 +82,7 @@ public class MockAssignorTest {
         final MemberAssignment testMember = 
result.members().get("test_member");
         assertNotNull(testMember);
         assertEquals(mkMap(
-            mkEntry("test-subtopology", mkSet(0, 1, 2, 3))
+            mkEntry("test-subtopology", Set.of(0, 1, 2, 3))
         ), testMember.activeTasks());
     }
 
@@ -121,12 +122,12 @@ public class MockAssignorTest {
         );
 
         final Map<String, Set<Integer>> expected1 = mkMap(
-            mkEntry("test-subtopology1", mkSet(1, 3)),
-            mkEntry("test-subtopology2", mkSet(1, 3))
+            mkEntry("test-subtopology1", Set.of(1, 3)),
+            mkEntry("test-subtopology2", Set.of(1, 3))
         );
         final Map<String, Set<Integer>> expected2 = mkMap(
-            mkEntry("test-subtopology1", mkSet(0, 2)),
-            mkEntry("test-subtopology2", mkSet(0, 2))
+            mkEntry("test-subtopology1", Set.of(0, 2)),
+            mkEntry("test-subtopology2", Set.of(0, 2))
         );
 
         assertEquals(2, result.members().size());
@@ -145,8 +146,8 @@ public class MockAssignorTest {
             Optional.empty(),
             Optional.empty(),
             mkMap(
-                mkEntry("test-subtopology1", mkSet(0, 2, 3)),
-                mkEntry("test-subtopology2", mkSet(0))
+                mkEntry("test-subtopology1", new HashSet<>(List.of(0, 2, 3))),
+                mkEntry("test-subtopology2", new HashSet<>(List.of(0)))
             ),
             Collections.emptyMap(),
             Collections.emptyMap(),
@@ -159,8 +160,8 @@ public class MockAssignorTest {
             Optional.empty(),
             Optional.empty(),
             mkMap(
-                mkEntry("test-subtopology1", mkSet(1)),
-                mkEntry("test-subtopology2", mkSet(3))
+                mkEntry("test-subtopology1", new HashSet<>(List.of(1))),
+                mkEntry("test-subtopology2", new HashSet<>(List.of(3)))
             ),
             Collections.emptyMap(),
             Collections.emptyMap(),
@@ -183,12 +184,12 @@ public class MockAssignorTest {
         assertNotNull(testMember1);
         assertNotNull(testMember2);
         assertEquals(mkMap(
-            mkEntry("test-subtopology1", mkSet(0, 2, 3)),
-            mkEntry("test-subtopology2", mkSet(0))
+            mkEntry("test-subtopology1", Set.of(0, 2, 3)),
+            mkEntry("test-subtopology2", Set.of(0))
         ), testMember1.activeTasks());
         assertEquals(mkMap(
-            mkEntry("test-subtopology1", mkSet(1)),
-            mkEntry("test-subtopology2", mkSet(1, 2, 3))
+            mkEntry("test-subtopology1", Set.of(1)),
+            mkEntry("test-subtopology2", Set.of(1, 2, 3))
         ), testMember2.activeTasks());
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
index 6455fb5cd68..5ecc89158d9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
@@ -34,7 +34,6 @@ import java.util.stream.Stream;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -69,7 +68,7 @@ public class StickyTaskAssignorTest {
             assertEquals(1, testMember.activeTasks().size());
             
actualActiveTasks.addAll(testMember.activeTasks().get("test-subtopology"));
         }
-        assertEquals(mkSet(0, 1, 2), actualActiveTasks);
+        assertEquals(Set.of(0, 1, 2), actualActiveTasks);
     }
 
     @Test
@@ -100,13 +99,13 @@ public class StickyTaskAssignorTest {
         assertEquals(1, getAllActiveTaskCount(result, "member3_1"));
         assertEquals(1, getAllActiveTaskCount(result, "member3_2"));
 
-        assertEquals(mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), 
mkEntry("test-subtopology2", mkSet(0, 1, 2))),
+        assertEquals(mkMap(mkEntry("test-subtopology1", Set.of(0, 1, 2)), 
mkEntry("test-subtopology2", Set.of(0, 1, 2))),
                 mergeAllActiveTasks(result, "member1_1", "member1_2", 
"member2_1", "member2_2", "member3_1", "member3_2"));
     }
 
     @Test
     public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() {
-        final Map<String, Set<Integer>> tasks = 
mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), 
mkEntry("test-subtopology2", mkSet(0, 1, 2)));
+        final Map<String, Set<Integer>> tasks = 
mkMap(mkEntry("test-subtopology1", Set.of(0, 1, 2)), 
mkEntry("test-subtopology2", Set.of(0, 1, 2)));
         final AssignmentMemberSpec memberSpec11 = 
createAssignmentMemberSpec("process1");
         final AssignmentMemberSpec memberSpec12 = 
createAssignmentMemberSpec("process1");
 
@@ -187,7 +186,7 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Set.of(0, 2))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
 
@@ -252,8 +251,8 @@ public class StickyTaskAssignorTest {
     @Test
     public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
         final Map<String, Set<Integer>> activeTasks = mkMap(
-                mkEntry("test-subtopology1", mkSet(0, 1, 2, 3, 4, 5)),
-                mkEntry("test-subtopology2", mkSet(0)));
+                mkEntry("test-subtopology1", Set.of(0, 1, 2, 3, 4, 5)),
+                mkEntry("test-subtopology2", Set.of(0)));
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", activeTasks, Collections.emptyMap());
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
 
@@ -443,7 +442,7 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssignedTo()
 {
-        final Map<String, Set<Integer>> tasks = 
mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3)));
+        final Map<String, Set<Integer>> tasks = 
mkMap(mkEntry("test-subtopology", Set.of(0, 1, 2, 3)));
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Collections.singleton(0))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Collections.singleton(2))), Collections.emptyMap());
@@ -508,9 +507,9 @@ public class StickyTaskAssignorTest {
         );
 
 
-        assertEquals(mkSet(1, 2), new HashSet<>(getAllStandbyTaskIds(result, 
"member1")));
-        assertEquals(mkSet(0, 2), new HashSet<>(getAllStandbyTaskIds(result, 
"member2")));
-        assertEquals(mkSet(0, 1), new HashSet<>(getAllStandbyTaskIds(result, 
"member3")));
+        assertEquals(Set.of(1, 2), new HashSet<>(getAllStandbyTaskIds(result, 
"member1")));
+        assertEquals(Set.of(0, 2), new HashSet<>(getAllStandbyTaskIds(result, 
"member2")));
+        assertEquals(Set.of(0, 1), new HashSet<>(getAllStandbyTaskIds(result, 
"member3")));
     }
 
     @Test
@@ -549,8 +548,8 @@ public class StickyTaskAssignorTest {
 
 
 
-        assertEquals(mkSet(0, 1, 2), new 
HashSet<>(getAllActiveTaskIds(result)));
-        assertEquals(mkSet(0, 1, 2), new 
HashSet<>(getAllStandbyTaskIds(result)));
+        assertEquals(Set.of(0, 1, 2), new 
HashSet<>(getAllActiveTaskIds(result)));
+        assertEquals(Set.of(0, 1, 2), new 
HashSet<>(getAllStandbyTaskIds(result)));
     }
 
     @Test
@@ -593,7 +592,7 @@ public class StickyTaskAssignorTest {
         );
 
         assertEquals(3, getAllActiveTaskIds(result, "member1", "member2", 
"member3", "member4", "member5", "member6").size());
-        assertEquals(mkSet(0, 1, 2), getActiveTasks(result, 
"test-subtopology", "member1", "member2", "member3", "member4", "member5", 
"member6"));
+        assertEquals(Set.of(0, 1, 2), getActiveTasks(result, 
"test-subtopology", "member1", "member2", "member3", "member4", "member5", 
"member6"));
     }
 
     @Test
@@ -669,9 +668,9 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(1, 2))), Collections.emptyMap());
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(3))), Collections.emptyMap());
-        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
mkSet(0))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Set.of(1, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Set.of(3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Set.of(0))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
 
         final List<String> allMemberIds = asList("member1", "member2", 
"member3", "member4");
@@ -698,9 +697,9 @@ public class StickyTaskAssignorTest {
     @Test
     public void 
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
-                mkMap(mkEntry("test-subtopology", mkSet(1, 2))), 
mkMap(mkEntry("test-subtopology", mkSet(3, 0))));
+                mkMap(mkEntry("test-subtopology", Set.of(1, 2))), 
mkMap(mkEntry("test-subtopology", Set.of(3, 0))));
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
-                mkMap(mkEntry("test-subtopology", mkSet(3, 0))), 
mkMap(mkEntry("test-subtopology", mkSet(1, 2))));
+                mkMap(mkEntry("test-subtopology", Set.of(3, 0))), 
mkMap(mkEntry("test-subtopology", Set.of(1, 2))));
         final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
         final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
 
@@ -728,7 +727,7 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() {
-        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2, 3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Set.of(0, 1, 2, 3))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
         final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
@@ -748,7 +747,7 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() {
-        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2, 3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Set.of(0, 1, 2, 3))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
 
@@ -766,7 +765,7 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldRebalanceTasksToClientsBasedOnCapacity() {
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(0, 3, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Set.of(0, 3, 2))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec31 = 
createAssignmentMemberSpec("process3");
         final AssignmentMemberSpec memberSpec32 = 
createAssignmentMemberSpec("process3");
 
@@ -783,8 +782,8 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() {
-        final Set<Integer> p1PrevTasks = mkSet(0, 2);
-        final Set<Integer> p2PrevTasks = mkSet(1, 3);
+        final Set<Integer> p1PrevTasks = Set.of(0, 2);
+        final Set<Integer> p2PrevTasks = Set.of(1, 3);
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
p1PrevTasks)), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
p2PrevTasks)), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
@@ -796,12 +795,10 @@ public class StickyTaskAssignorTest {
                 new TopologyDescriberImpl(4, false)
         );
 
-
-
         assertEquals(1, getAllActiveTaskCount(result, "member3"));
         final List<Integer> p3ActiveTasks = getAllActiveTaskIds(result, 
"member3");
 
-        if (p1PrevTasks.removeAll(p3ActiveTasks)) {
+        if (new HashSet<>(p1PrevTasks).removeAll(p3ActiveTasks)) {
             assertEquals(p2PrevTasks, new 
HashSet<>(getAllActiveTaskIds(result, "member2")));
         } else {
             assertEquals(p1PrevTasks, new 
HashSet<>(getAllActiveTaskIds(result, "member1")));
@@ -810,8 +807,8 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldNotMoveAnyTasksWhenNewTasksAdded() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1))), Collections.emptyMap());
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(2, 3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Set.of(0, 1))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Set.of(2, 3))), Collections.emptyMap());
 
         final Map<String, AssignmentMemberSpec> members = mkMap(
                 mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2));
@@ -832,8 +829,8 @@ public class StickyTaskAssignorTest {
     @Test
     public void 
shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() {
 
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(2, 1))), Collections.emptyMap());
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(0, 3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Set.of(2, 1))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Set.of(0, 3))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
 
 
@@ -860,17 +857,17 @@ public class StickyTaskAssignorTest {
     @Test
     public void shouldAssignTasksNotPreviouslyActiveToNewClient() {
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
-                mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
-                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 
1, 3))));
+                mkMap(mkEntry("test-subtopology0", Set.of(1)), 
mkEntry("test-subtopology1", Set.of(2, 3))),
+                mkMap(mkEntry("test-subtopology0", Set.of(0)), 
mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(0, 
1, 3))));
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
-                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
-                mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), 
mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", 
mkSet(0, 1, 3))));
+                mkMap(mkEntry("test-subtopology0", Set.of(0)), 
mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", 
Set.of(2))),
+                mkMap(mkEntry("test-subtopology0", Set.of(1, 2, 3)), 
mkEntry("test-subtopology1", Set.of(0, 2, 3)), mkEntry("test-subtopology2", 
Set.of(0, 1, 3))));
         final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3",
-                mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))),
-                mkMap(mkEntry("test-subtopology0", mkSet(2)), 
mkEntry("test-subtopology1", mkSet(2))));
+                mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3))),
+                mkMap(mkEntry("test-subtopology0", Set.of(2)), 
mkEntry("test-subtopology1", Set.of(2))));
         final AssignmentMemberSpec newMemberSpec = 
createAssignmentMemberSpec("process4",
                Collections.emptyMap(),
-                mkMap(mkEntry("test-subtopology0", mkSet(0, 1, 2, 3)), 
mkEntry("test-subtopology1", mkSet(0, 1, 2, 3)), mkEntry("test-subtopology2", 
mkSet(0, 1, 2, 3))));
+                mkMap(mkEntry("test-subtopology0", Set.of(0, 1, 2, 3)), 
mkEntry("test-subtopology1", Set.of(0, 1, 2, 3)), mkEntry("test-subtopology2", 
Set.of(0, 1, 2, 3))));
 
         final Map<String, AssignmentMemberSpec> members = mkMap(
                 mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3), mkEntry("newMember", 
newMemberSpec));
@@ -879,34 +876,34 @@ public class StickyTaskAssignorTest {
                 new TopologyDescriberImpl(4, false)
         );
 
-        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
+        assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(1)), 
mkEntry("test-subtopology1", Set.of(2, 3))),
                 getAllActiveTasks(result, "member1"));
-        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
+        assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(0)), 
mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", 
Set.of(2))),
                 getAllActiveTasks(result, "member2"));
-        assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))),
+        assertEquals(mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3))),
                 getAllActiveTasks(result, "member3"));
-        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), 
mkEntry("test-subtopology1", mkSet(0))),
+        assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(2, 3)), 
mkEntry("test-subtopology1", Set.of(0))),
                 getAllActiveTasks(result, "newMember"));
     }
 
     @Test
     public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() {
         final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
-                mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
-                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 
1, 3))));
+                mkMap(mkEntry("test-subtopology0", Set.of(1)), 
mkEntry("test-subtopology1", Set.of(2, 3))),
+                mkMap(mkEntry("test-subtopology0", Set.of(0)), 
mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(0, 
1, 3))));
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
-                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
-                mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), 
mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", 
mkSet(0, 1, 3))));
+                mkMap(mkEntry("test-subtopology0", Set.of(0)), 
mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", 
Set.of(2))),
+                mkMap(mkEntry("test-subtopology0", Set.of(1, 2, 3)), 
mkEntry("test-subtopology1", Set.of(0, 2, 3)), mkEntry("test-subtopology2", 
Set.of(0, 1, 3))));
 
 
         final AssignmentMemberSpec bounce1 = 
createAssignmentMemberSpec("bounce1",
                 Collections.emptyMap(),
-                mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))));
+                mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3))));
 
 
         final AssignmentMemberSpec bounce2 = 
createAssignmentMemberSpec("bounce2",
                 Collections.emptyMap(),
-                mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), 
mkEntry("test-subtopology1", mkSet(0))));
+                mkMap(mkEntry("test-subtopology0", Set.of(2, 3)), 
mkEntry("test-subtopology1", Set.of(0))));
 
 
 
@@ -918,19 +915,19 @@ public class StickyTaskAssignorTest {
         );
 
 
-        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
+        assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(1)), 
mkEntry("test-subtopology1", Set.of(2, 3))),
                 getAllActiveTasks(result, "member1"));
-        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
+        assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(0)), 
mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", 
Set.of(2))),
                 getAllActiveTasks(result, "member2"));
-        assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))),
+        assertEquals(mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3))),
                 getAllActiveTasks(result, "bounce_member1"));
-        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), 
mkEntry("test-subtopology1", mkSet(0))),
+        assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(2, 3)), 
mkEntry("test-subtopology1", Set.of(0))),
                 getAllActiveTasks(result, "bounce_member2"));
     }
 
     @Test
     public void shouldAssignTasksToNewClient() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(1, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Set.of(1, 2))), Collections.emptyMap());
         final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
 
         final Map<String, AssignmentMemberSpec> members = mkMap(
@@ -945,8 +942,8 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2))), Collections.emptyMap());
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(3, 4, 5))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Set.of(0, 1, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Set.of(3, 4, 5))), Collections.emptyMap());
         final AssignmentMemberSpec newMemberSpec = 
createAssignmentMemberSpec("process3");
 
         final Map<String, AssignmentMemberSpec> members = mkMap(
@@ -973,8 +970,8 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients()
 {
-        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2, 6))), Collections.emptyMap());
-        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", Collections.emptyMap(), 
mkMap(mkEntry("test-subtopology", mkSet(3, 4, 5))));
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Set.of(0, 1, 2, 6))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", Collections.emptyMap(), 
mkMap(mkEntry("test-subtopology", Set.of(3, 4, 5))));
         final AssignmentMemberSpec newMemberSpec = 
createAssignmentMemberSpec("newProcess");
 
         final Map<String, AssignmentMemberSpec> members = mkMap(


Reply via email to