This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 431cffc93f2 KAFKA-19135 Migrate initial IQ support for KIP-1071 from feature branch to trunk (#19588) 431cffc93f2 is described below commit 431cffc93f2a9c1a178bd4caede1001d329ab10b Author: Bill Bejeck <bbej...@apache.org> AuthorDate: Tue Apr 29 20:08:49 2025 -0400 KAFKA-19135 Migrate initial IQ support for KIP-1071 from feature branch to trunk (#19588) This PR is a migration of the initial IQ support for KIP-1071 from the feature branch to trunk. It includes a parameterized integration test that expects the same results whether using either the classic or new streams group protocol. Note that this PR will deliver IQ information in each heartbeat response. A follow-up PR will change that to be only sending IQ information when assignments change. Reviewers Lucas Brutschy <lucas...@apache.org> --- .../StreamsGroupHeartbeatRequestManager.java | 22 +- .../consumer/internals/StreamsRebalanceData.java | 32 ++- .../message/StreamsGroupHeartbeatResponse.json | 6 +- .../StreamsGroupHeartbeatRequestManagerTest.java | 8 +- .../coordinator/group/GroupMetadataManager.java | 23 +- .../topics/EndpointToPartitionsManager.java | 86 +++++++ .../topics/EndpointToPartitionsManagerTest.java | 162 +++++++++++++ .../IQv2EndpointToPartitionsIntegrationTest.java | 270 +++++++++++++++++++++ .../streams/processor/internals/StreamThread.java | 15 ++ .../processor/internals/StreamThreadTest.java | 14 +- .../org/apache/kafka/streams/utils/TestUtils.java | 2 +- 11 files changed, 620 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index b86c6d0498c..8f7d08e23a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -674,16 +674,24 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { membershipManager.transitionToFatal(); } - private static Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) { - Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> partitionsByHost = new HashMap<>(); + private static Map<StreamsRebalanceData.HostInfo, StreamsRebalanceData.EndpointPartitions> convertHostInfoMap( + final StreamsGroupHeartbeatResponseData data) { + Map<StreamsRebalanceData.HostInfo, StreamsRebalanceData.EndpointPartitions> partitionsByHost = new HashMap<>(); data.partitionsByUserEndpoint().forEach(endpoint -> { - List<TopicPartition> topicPartitions = endpoint.partitions().stream() - .flatMap(partition -> - partition.partitions().stream().map(partitionId -> new TopicPartition(partition.topic(), partitionId))) - .collect(Collectors.toList()); + List<TopicPartition> activeTopicPartitions = getTopicPartitionList(endpoint.activePartitions()); + List<TopicPartition> standbyTopicPartitions = getTopicPartitionList(endpoint.standbyPartitions()); StreamsGroupHeartbeatResponseData.Endpoint userEndpoint = endpoint.userEndpoint(); - partitionsByHost.put(new StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), topicPartitions); + StreamsRebalanceData.EndpointPartitions endpointPartitions = new StreamsRebalanceData.EndpointPartitions(activeTopicPartitions, standbyTopicPartitions); + partitionsByHost.put(new StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), endpointPartitions); }); return partitionsByHost; } + + static List<TopicPartition> getTopicPartitionList(List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions) { + return topicPartitions.stream() + .flatMap(partition -> + partition.partitions().stream().map(partitionId -> new TopicPartition(partition.topic(), partitionId))) + .collect(Collectors.toList()); + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java index 0158370a509..2fe7ae8ad35 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -119,6 +120,31 @@ public class StreamsRebalanceData { } } + public static class EndpointPartitions { + private final List<TopicPartition> activePartitions; + private final List<TopicPartition> standbyPartitions; + + public EndpointPartitions(final List<TopicPartition> activePartitions, + final List<TopicPartition> standbyPartitions) { + this.activePartitions = activePartitions; + this.standbyPartitions = standbyPartitions; + } + + public List<TopicPartition> activePartitions() { + return new ArrayList<>(activePartitions); + } + + public List<TopicPartition> standbyPartitions() { + return new ArrayList<>(standbyPartitions); + } + @Override + public String toString() { + return "EndpointPartitions {" + + "activePartitions=" + activePartitions + + ", standbyPartitions=" + standbyPartitions + + '}'; + } + } public static class Assignment { @@ -297,7 +323,7 @@ public class StreamsRebalanceData { private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY); - private final AtomicReference<Map<HostInfo, List<TopicPartition>>> partitionsByHost = new AtomicReference<>(Collections.emptyMap()); + private final AtomicReference<Map<HostInfo, EndpointPartitions>> partitionsByHost = new AtomicReference<>(Collections.emptyMap()); private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); @@ -341,11 +367,11 @@ public class StreamsRebalanceData { return reconciledAssignment.get(); } - public void setPartitionsByHost(final Map<HostInfo, List<TopicPartition>> partitionsByHost) { + public void setPartitionsByHost(final Map<HostInfo, EndpointPartitions> partitionsByHost) { this.partitionsByHost.set(partitionsByHost); } - public Map<HostInfo, List<TopicPartition>> partitionsByHost() { + public Map<HostInfo, EndpointPartitions> partitionsByHost() { return partitionsByHost.get(); } diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index a5f3a99f9de..7127fcd1282 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -70,8 +70,10 @@ "fields": [ { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", "about": "User-defined endpoint to connect to the node" }, - { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+", - "about": "All partitions available on the node" } + { "name": "ActivePartitions", "type": "[]TopicPartition", "versions": "0+", + "about": "All topic partitions materialized by active tasks on the node" }, + { "name": "StandbyPartitions", "type": "[]TopicPartition", "versions": "0+", + "about": "All topic partitions materialized by standby tasks on the node" } ] } ], diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 5e0c6652ef6..9839f3b2210 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -153,7 +153,7 @@ class StreamsGroupHeartbeatRequestManagerTest { List.of( new StreamsGroupHeartbeatResponseData.EndpointToPartitions() .setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(8080)) - .setPartitions(List.of( + .setActivePartitions(List.of( new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("topic").setPartitions(List.of(0))) ) ); @@ -591,9 +591,9 @@ class StreamsGroupHeartbeatRequestManagerTest { .get(new StreamsRebalanceData.HostInfo( ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) - ); - assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); - assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); + ).activePartitions(); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).topic(), topicPartitions.get(0).topic()); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); assertEquals( 1.0, metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index ad4a89d90eb..fb2c7c4c6af 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -158,6 +158,7 @@ import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor; import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor; import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; +import org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager; import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager; import org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException; import org.apache.kafka.image.MetadataDelta; @@ -1982,7 +1983,8 @@ public class GroupMetadataManager { StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)); + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)) + .setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); // The assignment is only provided in the following cases: // 1. The member is joining. @@ -2093,6 +2095,25 @@ public class GroupMetadataManager { .collect(Collectors.toList()); } + private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group) { + List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>(); + final Map<String, StreamsGroupMember> members = group.members(); + for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) { + final String memberIdForAssignment = entry.getKey(); + final Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = members.get(memberIdForAssignment).userEndpoint(); + StreamsGroupMember groupMember = entry.getValue(); + if (endpointOptional.isPresent()) { + final StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get(); + final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint(); + responseEndpoint.setHost(endpoint.host()); + responseEndpoint.setPort(endpoint.port()); + StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group); + endpointToPartitionsList.add(endpointToPartitions); + } + } + return endpointToPartitionsList.isEmpty() ? null : endpointToPartitionsList; + } + /** * Handles a regular heartbeat from a consumer group member. It mainly consists of * three parts: diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java new file mode 100644 index 00000000000..ea3eca20935 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.coordinator.group.streams.StreamsGroup; +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.TopicMetadata; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class EndpointToPartitionsManager { + + private EndpointToPartitionsManager() { + } + + public static StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions(final StreamsGroupMember streamsGroupMember, + final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint, + final StreamsGroup streamsGroup) { + StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions(); + Map<String, Set<Integer>> activeTasks = streamsGroupMember.assignedTasks().activeTasks(); + Map<String, Set<Integer>> standbyTasks = streamsGroupMember.assignedTasks().standbyTasks(); + endpointToPartitions.setUserEndpoint(responseEndpoint); + Map<String, ConfiguredSubtopology> configuredSubtopologies = streamsGroup.configuredTopology().flatMap(ConfiguredTopology::subtopologies).get(); + List<StreamsGroupHeartbeatResponseData.TopicPartition> activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies, streamsGroup.partitionMetadata()); + List<StreamsGroupHeartbeatResponseData.TopicPartition> standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies, streamsGroup.partitionMetadata()); + endpointToPartitions.setActivePartitions(activeTopicPartitions); + endpointToPartitions.setStandbyPartitions(standbyTopicPartitions); + return endpointToPartitions; + } + + private static List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions(final Map<String, Set<Integer>> tasks, + final Map<String, ConfiguredSubtopology> configuredSubtopologies, + final Map<String, TopicMetadata> groupTopicMetadata) { + List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionsForTasks = new ArrayList<>(); + for (Map.Entry<String, Set<Integer>> taskEntry : tasks.entrySet()) { + String subtopologyId = taskEntry.getKey(); + ConfiguredSubtopology configuredSubtopology = configuredSubtopologies.get(subtopologyId); + Set<String> sourceTopics = configuredSubtopology.sourceTopics(); + Set<String> repartitionSourceTopics = configuredSubtopology.repartitionSourceTopics().keySet(); + Set<String> allSourceTopic = new HashSet<>(sourceTopics); + allSourceTopic.addAll(repartitionSourceTopics); + List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionList = topicPartitionListForTask(taskEntry.getValue(), allSourceTopic, groupTopicMetadata); + topicPartitionsForTasks.addAll(topicPartitionList); + } + return topicPartitionsForTasks; + } + + private static List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionListForTask(final Set<Integer> taskSet, + final Set<String> topicNames, + final Map<String, TopicMetadata> groupTopicMetadata) { + return topicNames.stream().map(topic -> { + int numPartitionsForTopic = groupTopicMetadata.get(topic).numPartitions(); + StreamsGroupHeartbeatResponseData.TopicPartition tp = new StreamsGroupHeartbeatResponseData.TopicPartition(); + tp.setTopic(topic); + List<Integer> tpPartitions = new ArrayList<>(taskSet); + if (numPartitionsForTopic < taskSet.size()) { + Collections.sort(tpPartitions); + tp.setPartitions(tpPartitions.subList(0, numPartitionsForTopic)); + } else { + tp.setPartitions(tpPartitions); + } + return tp; + }).toList(); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java new file mode 100644 index 00000000000..2002774b60b --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.coordinator.group.streams.StreamsGroup; +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.TasksTuple; +import org.apache.kafka.coordinator.group.streams.TopicMetadata; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class EndpointToPartitionsManagerTest { + + private StreamsGroup streamsGroup; + private StreamsGroupMember streamsGroupMember; + private ConfiguredTopology configuredTopology; + private ConfiguredSubtopology configuredSubtopologyOne; + private ConfiguredSubtopology configuredSubtopologyTwo; + private final Map<String, Set<Integer>> activeTasks = new HashMap<>(); + private final Map<String, Set<Integer>> standbyTasks = new HashMap<>(); + private TasksTuple tasksTuple; + private final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint(); + + @BeforeEach + public void setUp() { + streamsGroup = mock(StreamsGroup.class); + streamsGroupMember = mock(StreamsGroupMember.class); + configuredTopology = mock(ConfiguredTopology.class); + configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new HashMap<>()); + Map<String, ConfiguredInternalTopic> repartitionSourceTopics = Map.of("Topic-B", new ConfiguredInternalTopic("Topic-B", 1, Optional.of((short) 1), Collections.emptyMap())); + configuredSubtopologyTwo = new ConfiguredSubtopology(new HashSet<>(), repartitionSourceTopics, new HashSet<>(), new HashMap<>()); + SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = new TreeMap<>(); + configuredSubtopologyOneMap.put("0", configuredSubtopologyOne); + SortedMap<String, ConfiguredSubtopology> configuredSubtopologyTwoMap = new TreeMap<>(); + configuredSubtopologyOneMap.put("1", configuredSubtopologyTwo); + when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap)); + when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyTwoMap)); + responseEndpoint.setHost("localhost"); + responseEndpoint.setPort(9092); + } + + @Test + void testEndpointToPartitionsWithStandbyTaskAssignments() { + Map<String, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), "Topic-A", 3)); + topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), "Topic-B", 3)); + + activeTasks.put("0", Set.of(0, 1, 2)); + standbyTasks.put("1", Set.of(0, 1, 2)); + tasksTuple = new TasksTuple(activeTasks, standbyTasks, Collections.emptyMap()); + when(streamsGroupMember.assignedTasks()).thenReturn(tasksTuple); + //when(streamsGroupMember.assignedTasks().standbyTasks()).thenReturn(tasksTuple.standbyTasks()); + when((streamsGroup.partitionMetadata())).thenReturn(topicMetadata); + when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology)); + SortedMap<String, ConfiguredSubtopology> configuredSubtopologyMap = new TreeMap<>(); + configuredSubtopologyMap.put("0", configuredSubtopologyOne); + configuredSubtopologyMap.put("1", configuredSubtopologyTwo); + when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyMap)); + + StreamsGroupHeartbeatResponseData.EndpointToPartitions result = + EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup); + + assertEquals(responseEndpoint, result.userEndpoint()); + assertEquals(1, result.activePartitions().size()); + assertEquals(1, result.standbyPartitions().size()); + List<StreamsGroupHeartbeatResponseData.TopicPartition> activePartitions = result.activePartitions(); + List<StreamsGroupHeartbeatResponseData.TopicPartition> standbyPartitions = result.standbyPartitions(); + activePartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic)); + standbyPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic)); + assertTopicPartitionsAssigned(activePartitions, "Topic-A"); + assertTopicPartitionsAssigned(standbyPartitions, "Topic-B"); + } + + private static void assertTopicPartitionsAssigned(List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions, String topicName) { + StreamsGroupHeartbeatResponseData.TopicPartition topicPartition = topicPartitions.stream().filter(tp -> tp.topic().equals(topicName)).findFirst().get(); + assertEquals(topicName, topicPartition.topic()); + assertEquals(List.of(0, 1, 2), topicPartition.partitions().stream().sorted().toList()); + } + + @ParameterizedTest(name = "{4}") + @MethodSource("argsProvider") + void testEndpointToPartitionsWithTwoTopicsAndDifferentPartitions(int topicAPartitions, + int topicBPartitions, + List<Integer> topicAExpectedPartitions, + List<Integer> topicBExpectedPartitions, + String testName + ) { + Map<String, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), "Topic-A", topicAPartitions)); + topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), "Topic-B", topicBPartitions)); + configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A", "Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>()); + + activeTasks.put("0", Set.of(0, 1, 2, 3, 4)); + when(streamsGroupMember.assignedTasks()).thenReturn(new TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap())); + when(streamsGroup.partitionMetadata()).thenReturn(topicMetadata); + when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology)); + SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = new TreeMap<>(); + configuredSubtopologyOneMap.put("0", configuredSubtopologyOne); + when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap)); + + StreamsGroupHeartbeatResponseData.EndpointToPartitions result = EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup); + + assertEquals(responseEndpoint, result.userEndpoint()); + assertEquals(2, result.activePartitions().size()); + + List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions = result.activePartitions(); + topicPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic)); + + StreamsGroupHeartbeatResponseData.TopicPartition topicAPartition = result.activePartitions().get(0); + assertEquals("Topic-A", topicAPartition.topic()); + assertEquals(topicAExpectedPartitions, topicAPartition.partitions().stream().sorted().toList()); + + StreamsGroupHeartbeatResponseData.TopicPartition topicBPartition = result.activePartitions().get(1); + assertEquals("Topic-B", topicBPartition.topic()); + assertEquals(topicBExpectedPartitions, topicBPartition.partitions().stream().sorted().toList()); + } + + static Stream<Arguments> argsProvider() { + return Stream.of( + arguments(2, 5, List.of(0, 1), List.of(0, 1, 2, 3, 4), "Should assign correct partitions when partitions differ between topics"), + arguments(3, 3, List.of(0, 1, 2), List.of(0, 1, 2), "Should assign correct partitions when partitions same between topics") + ); + } +} \ No newline at end of file diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java new file mode 100644 index 00000000000..7cad5998701 --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetadata; +import org.apache.kafka.streams.ThreadMetadata; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +@Timeout(600) +@Tag("integration") +public class IQv2EndpointToPartitionsIntegrationTest { + private String appId; + private String inputTopicTwoPartitions; + private String outputTopicTwoPartitions; + private Properties streamsApplicationProperties = new Properties(); + private Properties streamsSecondApplicationProperties = new Properties(); + + private static EmbeddedKafkaCluster cluster; + private static final int NUM_BROKERS = 3; + private static final String EXPECTED_STORE_NAME = "IQTest-count"; + + public void startCluster(final int standbyConfig) throws IOException { + final Properties properties = new Properties(); + properties.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, standbyConfig); + cluster = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS, properties); + cluster.start(); + } + + public void setUp() throws InterruptedException { + appId = safeUniqueTestName("endpointIntegrationTest"); + inputTopicTwoPartitions = appId + "-input-two"; + outputTopicTwoPartitions = appId + "-output-two"; + cluster.createTopic(inputTopicTwoPartitions, 2, 1); + cluster.createTopic(outputTopicTwoPartitions, 2, 1); + } + + public void closeCluster() { + cluster.stop(); + } + + @AfterEach + public void tearDown() throws Exception { + IntegrationTestUtils.purgeLocalStreamsState(streamsApplicationProperties); + if (!streamsSecondApplicationProperties.isEmpty()) { + IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties); + } + } + + @ParameterizedTest(name = "{3}") + @MethodSource("groupProtocolParameters") + public void shouldGetCorrectHostPartitionInformation(final String groupProtocolConfig, + final boolean usingStandbyReplicas, + final int numStandbyReplicas, + final String testName) throws Exception { + try { + startCluster(usingStandbyReplicas ? numStandbyReplicas : 0); + setUp(); + + final Properties streamOneProperties = new Properties(); + streamOneProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1"); + streamOneProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks1"); + streamOneProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:2020"); + streamOneProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocolConfig); + if (usingStandbyReplicas) { + streamOneProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); + } + streamsApplicationProperties = props(streamOneProperties); + + final Properties streamTwoProperties = new Properties(); + streamTwoProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"); + streamTwoProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2"); + streamTwoProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:3030"); + streamTwoProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocolConfig); + if (usingStandbyReplicas) { + streamTwoProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); + } + streamsSecondApplicationProperties = props(streamTwoProperties); + + final Topology topology = complexTopology(); + try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne); + waitForCondition(() -> !streamsOne.metadataForAllStreamsClients().isEmpty(), + IntegrationTestUtils.DEFAULT_TIMEOUT, + () -> "Kafka Streams didn't get metadata about the client."); + waitForCondition(() -> streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 4, + IntegrationTestUtils.DEFAULT_TIMEOUT, + () -> "Kafka Streams one didn't get 4 tasks"); + final List<StreamsMetadata> streamsMetadataAllClients = new ArrayList<>(streamsOne.metadataForAllStreamsClients()); + assertEquals(1, streamsMetadataAllClients.size()); + final StreamsMetadata streamsOneInitialMetadata = streamsMetadataAllClients.get(0); + assertEquals(2020, streamsOneInitialMetadata.hostInfo().port()); + final Set<TopicPartition> topicPartitions = streamsOneInitialMetadata.topicPartitions(); + assertEquals(4, topicPartitions.size()); + assertEquals(0, streamsOneInitialMetadata.standbyTopicPartitions().size()); + + final long repartitionTopicTaskCount = topicPartitions.stream().filter(tp -> tp.topic().contains("-repartition")).count(); + final long sourceTopicTaskCount = topicPartitions.stream().filter(tp -> tp.topic().contains("-input-two")).count(); + assertEquals(2, repartitionTopicTaskCount); + assertEquals(2, sourceTopicTaskCount); + final int expectedStandbyCount = usingStandbyReplicas ? 1 : 0; + + try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) { + streamsTwo.start(); + waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(), + IntegrationTestUtils.DEFAULT_TIMEOUT, + () -> "Kafka Streams one or two never transitioned to a RUNNING state."); + + waitForCondition(() -> { + final ThreadMetadata threadMetadata = streamsOne.metadataForLocalThreads().iterator().next(); + return threadMetadata.activeTasks().size() == 2 && threadMetadata.standbyTasks().size() == expectedStandbyCount; + }, TestUtils.DEFAULT_MAX_WAIT_MS, + "KafkaStreams one never released active tasks and received standby task"); + + waitForCondition(() -> { + final ThreadMetadata threadMetadata = streamsTwo.metadataForLocalThreads().iterator().next(); + return threadMetadata.activeTasks().size() == 2 && threadMetadata.standbyTasks().size() == expectedStandbyCount; + }, TestUtils.DEFAULT_MAX_WAIT_MS, + "KafkaStreams two never received active tasks and standby"); + + waitForCondition(() -> { + final List<StreamsMetadata> metadata = new ArrayList<>(streamsTwo.metadataForAllStreamsClients()); + return metadata.size() == 2 && + metadata.get(0).standbyTopicPartitions().size() == expectedStandbyCount && + metadata.get(1).standbyTopicPartitions().size() == expectedStandbyCount; + }, TestUtils.DEFAULT_MAX_WAIT_MS, + "Kafka Streams clients 1 and 2 never got metadata about standby tasks"); + + waitForCondition(() -> streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2, + IntegrationTestUtils.DEFAULT_TIMEOUT, + () -> "Kafka Streams one didn't give up active tasks"); + + final List<StreamsMetadata> allClientMetadataUpdated = new ArrayList<>(streamsTwo.metadataForAllStreamsClients()); + + final StreamsMetadata streamsOneMetadata = allClientMetadataUpdated.get(0); + final Set<TopicPartition> streamsOneActiveTopicPartitions = streamsOneMetadata.topicPartitions(); + final Set<TopicPartition> streamsOneStandbyTopicPartitions = streamsOneMetadata.standbyTopicPartitions(); + final Set<String> streamsOneStoreNames = streamsOneMetadata.stateStoreNames(); + final Set<String> streamsOneStandbyStoreNames = streamsOneMetadata.standbyStateStoreNames(); + + assertEquals(2020, streamsOneMetadata.hostInfo().port()); + assertEquals(2, streamsOneActiveTopicPartitions.size()); + assertEquals(expectedStandbyCount, streamsOneStandbyTopicPartitions.size()); + assertEquals(1, streamsOneStoreNames.size()); + assertEquals(expectedStandbyCount, streamsOneStandbyStoreNames.size()); + assertEquals(EXPECTED_STORE_NAME, streamsOneStoreNames.iterator().next()); + if (usingStandbyReplicas) { + assertEquals(EXPECTED_STORE_NAME, streamsOneStandbyStoreNames.iterator().next()); + } + + final long streamsOneRepartitionTopicCount = streamsOneActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-repartition")).count(); + final long streamsOneSourceTopicCount = streamsOneActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-input-two")).count(); + assertEquals(1, streamsOneRepartitionTopicCount); + assertEquals(1, streamsOneSourceTopicCount); + + final StreamsMetadata streamsTwoMetadata = allClientMetadataUpdated.get(1); + final Set<TopicPartition> streamsTwoActiveTopicPartitions = streamsTwoMetadata.topicPartitions(); + final Set<TopicPartition> streamsTwoStandbyTopicPartitions = streamsTwoMetadata.standbyTopicPartitions(); + final Set<String> streamsTwoStateStoreNames = streamsTwoMetadata.stateStoreNames(); + final Set<String> streamsTwoStandbyStateStoreNames = streamsTwoMetadata.standbyStateStoreNames(); + + assertEquals(3030, streamsTwoMetadata.hostInfo().port()); + assertEquals(2, streamsTwoActiveTopicPartitions.size()); + assertEquals(expectedStandbyCount, streamsTwoStandbyTopicPartitions.size()); + assertEquals(1, streamsTwoStateStoreNames.size()); + assertEquals(expectedStandbyCount, streamsTwoStandbyStateStoreNames.size()); + assertEquals(EXPECTED_STORE_NAME, streamsTwoStateStoreNames.iterator().next()); + if (usingStandbyReplicas) { + assertEquals(EXPECTED_STORE_NAME, streamsTwoStandbyStateStoreNames.iterator().next()); + } + + final long streamsTwoRepartitionTopicCount = streamsTwoActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-repartition")).count(); + final long streamsTwoSourceTopicCount = streamsTwoActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-input-two")).count(); + assertEquals(1, streamsTwoRepartitionTopicCount); + assertEquals(1, streamsTwoSourceTopicCount); + + if (usingStandbyReplicas) { + final TopicPartition streamsOneStandbyTopicPartition = streamsOneStandbyTopicPartitions.iterator().next(); + final TopicPartition streamsTwoStandbyTopicPartition = streamsTwoStandbyTopicPartitions.iterator().next(); + final String streamsOneStandbyTopicName = streamsOneStandbyTopicPartition.topic(); + final String streamsTwoStandbyTopicName = streamsTwoStandbyTopicPartition.topic(); + assertEquals(streamsOneStandbyTopicName, streamsTwoStandbyTopicName); + assertNotEquals(streamsOneStandbyTopicPartition.partition(), streamsTwoStandbyTopicPartition.partition()); + } + } + } + } finally { + closeCluster(); + } + } + + private static Stream<Arguments> groupProtocolParameters() { + return Stream.of(Arguments.of("streams", false, 0, "STREAMS protocol No standby"), + Arguments.of("classic", false, 0, "CLASSIC protocol No standby"), + Arguments.of("streams", true, 1, "STREAMS protocol With standby"), + Arguments.of("classic", true, 1, "CLASSIC protocol With standby")); + } + + private Properties props(final Properties extraProperties) { + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.putAll(extraProperties); + return streamsConfiguration; + } + + private Topology complexTopology() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopicTwoPartitions, Consumed.with(Serdes.String(), Serdes.String())) + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) + .groupBy((key, value) -> value, Grouped.as("IQTest")) + .count(Materialized.as(EXPECTED_STORE_NAME)) + .toStream().to(outputTopicTwoPartitions, Produced.with(Serdes.String(), Serdes.Long())); + return builder.build(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d5f67ff4fad..dcfca3ec2f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1494,6 +1494,21 @@ public class StreamThread extends Thread implements ProcessingThread { shutdownErrorHook.run(); } } + + final Map<StreamsRebalanceData.HostInfo, StreamsRebalanceData.EndpointPartitions> partitionsByEndpoint = + streamsRebalanceData.get().partitionsByHost(); + final Map<HostInfo, Set<TopicPartition>> activeHostInfoMap = new HashMap<>(); + final Map<HostInfo, Set<TopicPartition>> standbyHostInfoMap = new HashMap<>(); + + partitionsByEndpoint.forEach((hostInfo, endpointPartitions) -> { + activeHostInfoMap.put(new HostInfo(hostInfo.host(), hostInfo.port()), new HashSet<>(endpointPartitions.activePartitions())); + standbyHostInfoMap.put(new HostInfo(hostInfo.host(), hostInfo.port()), new HashSet<>(endpointPartitions.standbyPartitions())); + }); + streamsMetadataState.onChange( + activeHostInfoMap, + standbyHostInfoMap, + getTopicPartitionInfo(activeHostInfoMap) + ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 8cb2fc8cdfe..96090aa32fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -3805,6 +3805,11 @@ public class StreamThreadTest { final Runnable shutdownErrorHook = mock(Runnable.class); final Properties props = configProps(false, false, false); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); final StreamsConfig config = new StreamsConfig(props); thread = new StreamThread( new MockTime(1), @@ -3828,7 +3833,7 @@ public class StreamThreadTest { HANDLER, null, Optional.of(streamsRebalanceData), - null + streamsMetadataState ).updateThreadMetadata(adminClientId(CLIENT_ID)); thread.setState(State.STARTING); @@ -3860,6 +3865,11 @@ public class StreamThreadTest { final Properties props = configProps(false, false, false); final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, config), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); thread = new StreamThread( new MockTime(1), config, @@ -3882,7 +3892,7 @@ public class StreamThreadTest { HANDLER, null, Optional.of(streamsRebalanceData), - null + streamsMetadataState ).updateThreadMetadata(adminClientId(CLIENT_ID)); thread.setState(State.STARTING); diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java index 96d19dbb47e..2e13e3e6d9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java @@ -90,7 +90,7 @@ public class TestUtils { return safeUniqueTestName(methodName); } - private static String safeUniqueTestName(final String testName) { + public static String safeUniqueTestName(final String testName) { return sanitize(testName + Uuid.randomUuid().toString()); }