This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 16c51af4e20bfa521d994d4430c1ea61a88c0893 Author: Bruno Cadonna <[email protected]> AuthorDate: Wed Oct 16 15:16:59 2024 +0200 Resolve conflicts from 11/25 trunk rebase - MINOR: Rebase dev branch on current trunk --- .../StreamsGroupHeartbeatRequestManager.java | 19 +++- .../events/ApplicationEventProcessor.java | 4 + .../StreamsGroupHeartbeatRequestManagerTest.java | 10 +- .../StreamsGroupInitializeRequestManagerTest.java | 27 ++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +- .../group/GroupCoordinatorConfigTest.java | 1 + .../group/GroupMetadataManagerTest.java | 9 +- .../group/streams/TaskAssignmentTestUtil.java | 4 +- .../group/taskassignor/MockAssignorTest.java | 29 +++--- .../group/taskassignor/StickyTaskAssignorTest.java | 111 ++++++++++----------- 10 files changed, 121 insertions(+), 97 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 7e31859571d..896800cff3c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -57,6 +57,8 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { private final Logger logger; + private final int maxPollIntervalMs; + private final CoordinatorRequestManager coordinatorRequestManager; private final StreamsGroupHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; @@ -96,7 +98,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { this.membershipManager = membershipManager; this.streamsGroupInitializeRequestManager = streamsGroupInitializeRequestManager; this.backgroundEventHandler = backgroundEventHandler; - int maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); this.heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsAssignmentInterface, membershipManager, @@ -144,6 +146,10 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } + public ConsumerMembershipManager membershipManager() { + return membershipManager; + } + @Override public long maximumTimeToWait(long currentTimeMs) { pollTimer.update(currentTimeMs); @@ -156,6 +162,17 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); } + public void resetPollTimer(final long pollMs) { + pollTimer.update(pollMs); + if (pollTimer.isExpired()) { + logger.warn("Time between subsequent calls to poll() was longer than the configured " + + "max.poll.interval.ms, exceeded approximately by {} ms. Member {} will rejoin the group now.", + pollTimer.isExpiredBy(), membershipManager().memberId()); + membershipManager().maybeRejoinStaleMember(); + } + pollTimer.reset(maxPollIntervalMs); + } + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 52aee8f20e0..135857a06b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -192,6 +192,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven hrm.membershipManager().onConsumerPoll(); hrm.resetPollTimer(event.pollTimeMs()); }); + requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); } else { requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { hrm.membershipManager().onConsumerPoll(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 91b9b5ae45a..193c901e3b7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -54,12 +54,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.anyString; @@ -251,7 +251,7 @@ class StreamsGroupHeartbeatRequestManagerTest { Collections.singleton("sink0"), Collections.singletonMap("repartition0", emptyTopicInfo), Collections.singletonMap("changelog0", emptyTopicInfo), - Collections.singletonList(mkSet("source0", "repartition0")) + Collections.singletonList(Set.of("source0", "repartition0")) )); streamsAssignmentInterface.subtopologyMap().put("1", new Subtopology( @@ -259,7 +259,7 @@ class StreamsGroupHeartbeatRequestManagerTest { Collections.singleton("sink1"), Collections.singletonMap("repartition1", emptyTopicInfo), Collections.singletonMap("changelog1", emptyTopicInfo), - Collections.singletonList(mkSet("source1", "repartition1")) + Collections.singletonList(Set.of("source1", "repartition1")) )); streamsAssignmentInterface.subtopologyMap().put("2", new Subtopology( @@ -267,7 +267,7 @@ class StreamsGroupHeartbeatRequestManagerTest { Collections.singleton("sink2"), Collections.singletonMap("repartition2", emptyTopicInfo), Collections.singletonMap("changelog2", emptyTopicInfo), - Collections.singletonList(mkSet("source2", "repartition2")) + Collections.singletonList(Set.of("source2", "repartition2")) )); StreamsGroupHeartbeatResponseData data = new StreamsGroupHeartbeatResponseData() @@ -296,7 +296,7 @@ class StreamsGroupHeartbeatRequestManagerTest { assertEquals(1000, response.heartbeatIntervalMs()); final List<TopicPartitions> tps = response.assignment().topicPartitions(); assertEquals(2, tps.size()); - assertEquals(mkSet(uuid0, uuid1), tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet())); + assertEquals(Set.of(uuid0, uuid1), tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet())); assertEquals(Collections.singletonList(0), tps.get(0).partitions()); assertEquals(Collections.singletonList(0), tps.get(1).partitions()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java index f5bc091e7b0..35eeaaf4baf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java @@ -36,7 +36,6 @@ import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -71,8 +70,8 @@ class StreamsGroupInitializeRequestManagerTest { final CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(node)); final StreamsAssignmentInterface streamsAssignmentInterface = mock(StreamsAssignmentInterface.class); - final Set<String> sourceTopics = mkSet("sourceTopic1", "sourceTopic2"); - final Set<String> sinkTopics = mkSet("sinkTopic1", "sinkTopic2", "sinkTopic3"); + final Set<String> sourceTopics = Set.of("sourceTopic1", "sourceTopic2"); + final Set<String> sinkTopics = Set.of("sinkTopic1", "sinkTopic2", "sinkTopic3"); final Map<String, StreamsAssignmentInterface.TopicInfo> repartitionTopics = mkMap( mkEntry("repartitionTopic1", new StreamsAssignmentInterface.TopicInfo(Optional.of(2), Optional.of((short) 1), Collections.emptyMap())), mkEntry("repartitionTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.of(3), Optional.of((short) 3), Collections.emptyMap())) @@ -82,9 +81,9 @@ class StreamsGroupInitializeRequestManagerTest { mkEntry("changelogTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 2), Collections.emptyMap())), mkEntry("changelogTopic3", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 3), Collections.emptyMap())) ); - final Collection<Set<String>> copartitionGroup = mkSet( - mkSet("sourceTopic1", "repartitionTopic2"), - mkSet("sourceTopic2", "repartitionTopic1") + final Collection<Set<String>> copartitionGroup = Set.of( + Set.of("sourceTopic1", "repartitionTopic2"), + Set.of("sourceTopic2", "repartitionTopic1") ); final StreamsAssignmentInterface.Subtopology subtopology1 = new StreamsAssignmentInterface.Subtopology( sourceTopics, @@ -139,11 +138,15 @@ class StreamsGroupInitializeRequestManagerTest { }); assertEquals(2, subtopology.copartitionGroups().size()); - final StreamsGroupInitializeRequestData.CopartitionGroup copartitionGroupData1 = subtopology.copartitionGroups().get(0); - assertEquals(Collections.singletonList((short) 0), copartitionGroupData1.sourceTopics()); - assertEquals(Collections.singletonList((short) 1), copartitionGroupData1.repartitionSourceTopics()); - final StreamsGroupInitializeRequestData.CopartitionGroup copartitionGroupData2 = subtopology.copartitionGroups().get(1); - assertEquals(Collections.singletonList((short) 1), copartitionGroupData2.sourceTopics()); - assertEquals(Collections.singletonList((short) 0), copartitionGroupData2.repartitionSourceTopics()); + final StreamsGroupInitializeRequestData.CopartitionGroup expectedCopartitionGroupData1 = + new StreamsGroupInitializeRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + .setSourceTopics(Collections.singletonList((short) 1)); + final StreamsGroupInitializeRequestData.CopartitionGroup expectedCopartitionGroupData2 = + new StreamsGroupInitializeRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 1)) + .setSourceTopics(Collections.singletonList((short) 0)); + assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1)); + assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2)); } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3b9aa091b8a..378709eda0c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -600,8 +600,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) "This is part of the early access of KIP-932 and MUST NOT be used in production.") } if (protocols.contains(GroupType.STREAMS)) { - if (processRoles.isEmpty) { - throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster.") + if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { + throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") } warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " + "This is part of the preview of KIP-1071 and MUST NOT be used in production.") diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index baf55529fa5..decfaea980e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -186,6 +186,7 @@ public class GroupCoordinatorConfigTest { configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000); configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0); assertEquals("Invalid value 0 for configuration group.streams.session.timeout.ms: Value must be at least 1", diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index bf458507eac..8ff26e74f35 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -576,14 +576,15 @@ public class GroupMetadataManagerTest { assertEquals(5, coordinatorRecords.size()); assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId, 2))); assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord(groupId, subtopologies))); + org.apache.kafka.coordinator.group.streams.TopicMetadata topicMetadata = new org.apache.kafka.coordinator.group.streams.TopicMetadata( inputTopicId, inputTopicName, 3, mkMap( - mkEntry(0, mkSet("rack0", "rack1")), - mkEntry(1, mkSet("rack1", "rack2")), - mkEntry(2, mkSet("rack2", "rack3")) + mkEntry(0, new HashSet<>(List.of("rack1", "rack0"))), + mkEntry(1, new HashSet<>(List.of("rack2", "rack1"))), + mkEntry(2, new HashSet<>(List.of("rack2", "rack3"))) ) ); assertTrue(coordinatorRecords.contains( @@ -837,7 +838,7 @@ public class GroupMetadataManagerTest { mkMap( mkEntry( subtopologyId, - mkSet(0, 1, 2) + new HashSet<>(List.of(0, 1, 2)) ) ), Collections.emptyMap(), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java index c3980d0343f..bcf1ba65be7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java @@ -19,10 +19,10 @@ package org.apache.kafka.coordinator.group.streams; import org.apache.kafka.coordinator.group.taskassignor.AssignmentMemberSpec; import java.util.AbstractMap; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -73,7 +73,7 @@ public class TaskAssignmentTestUtil { ) { return new AbstractMap.SimpleEntry<>( subtopologyId, - new HashSet<>(Arrays.asList(tasks)) + new HashSet<>(List.of(tasks)) ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java index a63b2e7c7ca..e20ba62e5a5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java @@ -21,13 +21,14 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -81,7 +82,7 @@ public class MockAssignorTest { final MemberAssignment testMember = result.members().get("test_member"); assertNotNull(testMember); assertEquals(mkMap( - mkEntry("test-subtopology", mkSet(0, 1, 2, 3)) + mkEntry("test-subtopology", Set.of(0, 1, 2, 3)) ), testMember.activeTasks()); } @@ -121,12 +122,12 @@ public class MockAssignorTest { ); final Map<String, Set<Integer>> expected1 = mkMap( - mkEntry("test-subtopology1", mkSet(1, 3)), - mkEntry("test-subtopology2", mkSet(1, 3)) + mkEntry("test-subtopology1", Set.of(1, 3)), + mkEntry("test-subtopology2", Set.of(1, 3)) ); final Map<String, Set<Integer>> expected2 = mkMap( - mkEntry("test-subtopology1", mkSet(0, 2)), - mkEntry("test-subtopology2", mkSet(0, 2)) + mkEntry("test-subtopology1", Set.of(0, 2)), + mkEntry("test-subtopology2", Set.of(0, 2)) ); assertEquals(2, result.members().size()); @@ -145,8 +146,8 @@ public class MockAssignorTest { Optional.empty(), Optional.empty(), mkMap( - mkEntry("test-subtopology1", mkSet(0, 2, 3)), - mkEntry("test-subtopology2", mkSet(0)) + mkEntry("test-subtopology1", new HashSet<>(List.of(0, 2, 3))), + mkEntry("test-subtopology2", new HashSet<>(List.of(0))) ), Collections.emptyMap(), Collections.emptyMap(), @@ -159,8 +160,8 @@ public class MockAssignorTest { Optional.empty(), Optional.empty(), mkMap( - mkEntry("test-subtopology1", mkSet(1)), - mkEntry("test-subtopology2", mkSet(3)) + mkEntry("test-subtopology1", new HashSet<>(List.of(1))), + mkEntry("test-subtopology2", new HashSet<>(List.of(3))) ), Collections.emptyMap(), Collections.emptyMap(), @@ -183,12 +184,12 @@ public class MockAssignorTest { assertNotNull(testMember1); assertNotNull(testMember2); assertEquals(mkMap( - mkEntry("test-subtopology1", mkSet(0, 2, 3)), - mkEntry("test-subtopology2", mkSet(0)) + mkEntry("test-subtopology1", Set.of(0, 2, 3)), + mkEntry("test-subtopology2", Set.of(0)) ), testMember1.activeTasks()); assertEquals(mkMap( - mkEntry("test-subtopology1", mkSet(1)), - mkEntry("test-subtopology2", mkSet(1, 2, 3)) + mkEntry("test-subtopology1", Set.of(1)), + mkEntry("test-subtopology2", Set.of(1, 2, 3)) ), testMember2.activeTasks()); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java index 6455fb5cd68..5ecc89158d9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java @@ -34,7 +34,6 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -69,7 +68,7 @@ public class StickyTaskAssignorTest { assertEquals(1, testMember.activeTasks().size()); actualActiveTasks.addAll(testMember.activeTasks().get("test-subtopology")); } - assertEquals(mkSet(0, 1, 2), actualActiveTasks); + assertEquals(Set.of(0, 1, 2), actualActiveTasks); } @Test @@ -100,13 +99,13 @@ public class StickyTaskAssignorTest { assertEquals(1, getAllActiveTaskCount(result, "member3_1")); assertEquals(1, getAllActiveTaskCount(result, "member3_2")); - assertEquals(mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), mkEntry("test-subtopology2", mkSet(0, 1, 2))), + assertEquals(mkMap(mkEntry("test-subtopology1", Set.of(0, 1, 2)), mkEntry("test-subtopology2", Set.of(0, 1, 2))), mergeAllActiveTasks(result, "member1_1", "member1_2", "member2_1", "member2_2", "member3_1", "member3_2")); } @Test public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() { - final Map<String, Set<Integer>> tasks = mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), mkEntry("test-subtopology2", mkSet(0, 1, 2))); + final Map<String, Set<Integer>> tasks = mkMap(mkEntry("test-subtopology1", Set.of(0, 1, 2)), mkEntry("test-subtopology2", Set.of(0, 1, 2))); final AssignmentMemberSpec memberSpec11 = createAssignmentMemberSpec("process1"); final AssignmentMemberSpec memberSpec12 = createAssignmentMemberSpec("process1"); @@ -187,7 +186,7 @@ public class StickyTaskAssignorTest { @Test public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Set.of(0, 2))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); @@ -252,8 +251,8 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() { final Map<String, Set<Integer>> activeTasks = mkMap( - mkEntry("test-subtopology1", mkSet(0, 1, 2, 3, 4, 5)), - mkEntry("test-subtopology2", mkSet(0))); + mkEntry("test-subtopology1", Set.of(0, 1, 2, 3, 4, 5)), + mkEntry("test-subtopology2", Set.of(0))); final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", activeTasks, Collections.emptyMap()); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); @@ -443,7 +442,7 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssignedTo() { - final Map<String, Set<Integer>> tasks = mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3))); + final Map<String, Set<Integer>> tasks = mkMap(mkEntry("test-subtopology", Set.of(0, 1, 2, 3))); final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); @@ -508,9 +507,9 @@ public class StickyTaskAssignorTest { ); - assertEquals(mkSet(1, 2), new HashSet<>(getAllStandbyTaskIds(result, "member1"))); - assertEquals(mkSet(0, 2), new HashSet<>(getAllStandbyTaskIds(result, "member2"))); - assertEquals(mkSet(0, 1), new HashSet<>(getAllStandbyTaskIds(result, "member3"))); + assertEquals(Set.of(1, 2), new HashSet<>(getAllStandbyTaskIds(result, "member1"))); + assertEquals(Set.of(0, 2), new HashSet<>(getAllStandbyTaskIds(result, "member2"))); + assertEquals(Set.of(0, 1), new HashSet<>(getAllStandbyTaskIds(result, "member3"))); } @Test @@ -549,8 +548,8 @@ public class StickyTaskAssignorTest { - assertEquals(mkSet(0, 1, 2), new HashSet<>(getAllActiveTaskIds(result))); - assertEquals(mkSet(0, 1, 2), new HashSet<>(getAllStandbyTaskIds(result))); + assertEquals(Set.of(0, 1, 2), new HashSet<>(getAllActiveTaskIds(result))); + assertEquals(Set.of(0, 1, 2), new HashSet<>(getAllStandbyTaskIds(result))); } @Test @@ -593,7 +592,7 @@ public class StickyTaskAssignorTest { ); assertEquals(3, getAllActiveTaskIds(result, "member1", "member2", "member3", "member4", "member5", "member6").size()); - assertEquals(mkSet(0, 1, 2), getActiveTasks(result, "test-subtopology", "member1", "member2", "member3", "member4", "member5", "member6")); + assertEquals(Set.of(0, 1, 2), getActiveTasks(result, "test-subtopology", "member1", "member2", "member3", "member4", "member5", "member6")); } @Test @@ -669,9 +668,9 @@ public class StickyTaskAssignorTest { @Test public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(1, 2))), Collections.emptyMap()); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(3))), Collections.emptyMap()); - final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", mkSet(0))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Set.of(1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Set.of(3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Set.of(0))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); final List<String> allMemberIds = asList("member1", "member2", "member3", "member4"); @@ -698,9 +697,9 @@ public class StickyTaskAssignorTest { @Test public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", - mkMap(mkEntry("test-subtopology", mkSet(1, 2))), mkMap(mkEntry("test-subtopology", mkSet(3, 0)))); + mkMap(mkEntry("test-subtopology", Set.of(1, 2))), mkMap(mkEntry("test-subtopology", Set.of(3, 0)))); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", - mkMap(mkEntry("test-subtopology", mkSet(3, 0))), mkMap(mkEntry("test-subtopology", mkSet(1, 2)))); + mkMap(mkEntry("test-subtopology", Set.of(3, 0))), mkMap(mkEntry("test-subtopology", Set.of(1, 2)))); final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); @@ -728,7 +727,7 @@ public class StickyTaskAssignorTest { @Test public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() { - final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Set.of(0, 1, 2, 3))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); @@ -748,7 +747,7 @@ public class StickyTaskAssignorTest { @Test public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() { - final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Set.of(0, 1, 2, 3))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); @@ -766,7 +765,7 @@ public class StickyTaskAssignorTest { @Test public void shouldRebalanceTasksToClientsBasedOnCapacity() { - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(0, 3, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Set.of(0, 3, 2))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec31 = createAssignmentMemberSpec("process3"); final AssignmentMemberSpec memberSpec32 = createAssignmentMemberSpec("process3"); @@ -783,8 +782,8 @@ public class StickyTaskAssignorTest { @Test public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() { - final Set<Integer> p1PrevTasks = mkSet(0, 2); - final Set<Integer> p2PrevTasks = mkSet(1, 3); + final Set<Integer> p1PrevTasks = Set.of(0, 2); + final Set<Integer> p2PrevTasks = Set.of(1, 3); final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", p1PrevTasks)), Collections.emptyMap()); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", p2PrevTasks)), Collections.emptyMap()); final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); @@ -796,12 +795,10 @@ public class StickyTaskAssignorTest { new TopologyDescriberImpl(4, false) ); - - assertEquals(1, getAllActiveTaskCount(result, "member3")); final List<Integer> p3ActiveTasks = getAllActiveTaskIds(result, "member3"); - if (p1PrevTasks.removeAll(p3ActiveTasks)) { + if (new HashSet<>(p1PrevTasks).removeAll(p3ActiveTasks)) { assertEquals(p2PrevTasks, new HashSet<>(getAllActiveTaskIds(result, "member2"))); } else { assertEquals(p1PrevTasks, new HashSet<>(getAllActiveTaskIds(result, "member1"))); @@ -810,8 +807,8 @@ public class StickyTaskAssignorTest { @Test public void shouldNotMoveAnyTasksWhenNewTasksAdded() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 1))), Collections.emptyMap()); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(2, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Set.of(0, 1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Set.of(2, 3))), Collections.emptyMap()); final Map<String, AssignmentMemberSpec> members = mkMap( mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); @@ -832,8 +829,8 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(2, 1))), Collections.emptyMap()); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(0, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Set.of(2, 1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Set.of(0, 3))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); @@ -860,17 +857,17 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignTasksNotPreviouslyActiveToNewClient() { final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", - mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), - mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + mkMap(mkEntry("test-subtopology0", Set.of(1)), mkEntry("test-subtopology1", Set.of(2, 3))), + mkMap(mkEntry("test-subtopology0", Set.of(0)), mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(0, 1, 3)))); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", - mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), - mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + mkMap(mkEntry("test-subtopology0", Set.of(0)), mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(2))), + mkMap(mkEntry("test-subtopology0", Set.of(1, 2, 3)), mkEntry("test-subtopology1", Set.of(0, 2, 3)), mkEntry("test-subtopology2", Set.of(0, 1, 3)))); final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", - mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))), - mkMap(mkEntry("test-subtopology0", mkSet(2)), mkEntry("test-subtopology1", mkSet(2)))); + mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3))), + mkMap(mkEntry("test-subtopology0", Set.of(2)), mkEntry("test-subtopology1", Set.of(2)))); final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("process4", Collections.emptyMap(), - mkMap(mkEntry("test-subtopology0", mkSet(0, 1, 2, 3)), mkEntry("test-subtopology1", mkSet(0, 1, 2, 3)), mkEntry("test-subtopology2", mkSet(0, 1, 2, 3)))); + mkMap(mkEntry("test-subtopology0", Set.of(0, 1, 2, 3)), mkEntry("test-subtopology1", Set.of(0, 1, 2, 3)), mkEntry("test-subtopology2", Set.of(0, 1, 2, 3)))); final Map<String, AssignmentMemberSpec> members = mkMap( mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("newMember", newMemberSpec)); @@ -879,34 +876,34 @@ public class StickyTaskAssignorTest { new TopologyDescriberImpl(4, false) ); - assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), + assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(1)), mkEntry("test-subtopology1", Set.of(2, 3))), getAllActiveTasks(result, "member1")); - assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), + assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(0)), mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(2))), getAllActiveTasks(result, "member2")); - assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))), + assertEquals(mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3))), getAllActiveTasks(result, "member3")); - assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), mkEntry("test-subtopology1", mkSet(0))), + assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(2, 3)), mkEntry("test-subtopology1", Set.of(0))), getAllActiveTasks(result, "newMember")); } @Test public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() { final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", - mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), - mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + mkMap(mkEntry("test-subtopology0", Set.of(1)), mkEntry("test-subtopology1", Set.of(2, 3))), + mkMap(mkEntry("test-subtopology0", Set.of(0)), mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(0, 1, 3)))); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", - mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), - mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + mkMap(mkEntry("test-subtopology0", Set.of(0)), mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(2))), + mkMap(mkEntry("test-subtopology0", Set.of(1, 2, 3)), mkEntry("test-subtopology1", Set.of(0, 2, 3)), mkEntry("test-subtopology2", Set.of(0, 1, 3)))); final AssignmentMemberSpec bounce1 = createAssignmentMemberSpec("bounce1", Collections.emptyMap(), - mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3)))); final AssignmentMemberSpec bounce2 = createAssignmentMemberSpec("bounce2", Collections.emptyMap(), - mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), mkEntry("test-subtopology1", mkSet(0)))); + mkMap(mkEntry("test-subtopology0", Set.of(2, 3)), mkEntry("test-subtopology1", Set.of(0)))); @@ -918,19 +915,19 @@ public class StickyTaskAssignorTest { ); - assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), + assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(1)), mkEntry("test-subtopology1", Set.of(2, 3))), getAllActiveTasks(result, "member1")); - assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), + assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(0)), mkEntry("test-subtopology1", Set.of(1)), mkEntry("test-subtopology2", Set.of(2))), getAllActiveTasks(result, "member2")); - assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))), + assertEquals(mkMap(mkEntry("test-subtopology2", Set.of(0, 1, 3))), getAllActiveTasks(result, "bounce_member1")); - assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), mkEntry("test-subtopology1", mkSet(0))), + assertEquals(mkMap(mkEntry("test-subtopology0", Set.of(2, 3)), mkEntry("test-subtopology1", Set.of(0))), getAllActiveTasks(result, "bounce_member2")); } @Test public void shouldAssignTasksToNewClient() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Set.of(1, 2))), Collections.emptyMap()); final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); final Map<String, AssignmentMemberSpec> members = mkMap( @@ -945,8 +942,8 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2))), Collections.emptyMap()); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(3, 4, 5))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Set.of(0, 1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Set.of(3, 4, 5))), Collections.emptyMap()); final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("process3"); final Map<String, AssignmentMemberSpec> members = mkMap( @@ -973,8 +970,8 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 6))), Collections.emptyMap()); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", mkSet(3, 4, 5)))); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Set.of(0, 1, 2, 6))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Set.of(3, 4, 5)))); final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("newProcess"); final Map<String, AssignmentMemberSpec> members = mkMap(
