This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b18625b86ff9f7d688089313441f7f7c10a1fc6d Author: Lucas Brutschy <[email protected]> AuthorDate: Tue Oct 8 17:30:48 2024 +0200 Complete topic metadata for automatic topic creation (#17391) * KSTREAMS-6456: Complete topic metadata for automatic topic creation This change updates the current RPCs and schemas to add the following metadata: - copartition groups are added to topology record and initialize RPC - multiple regexs are added to topology record and initialize RPC - replication factors are added for reach internal topic We also add code to fill this information correctly from the `InternalTopologyBuilder` object. The fields `assignmentConfiguration` and `assignor` in the `StreamsAssignmentInterface` are removed, because they are not needed anymore. * fixes --- .../internals/StreamsAssignmentInterface.java | 31 +++----- .../StreamsGroupInitializeRequestManager.java | 71 ++++++++++++++++-- .../message/StreamsGroupDescribeResponse.json | 10 +-- .../message/StreamsGroupInitializeRequest.json | 21 ++++-- .../StreamsGroupHeartbeatRequestManagerTest.java | 19 ++--- .../StreamsGroupInitializeRequestManagerTest.java | 35 ++++++--- .../coordinator/group/GroupMetadataManager.java | 10 ++- .../streams/CoordinatorStreamsRecordHelpers.java | 84 ++++++++++++++++------ .../coordinator/group/streams/StreamsTopology.java | 32 +-------- .../common/message/StreamsGroupTopologyValue.json | 23 ++++-- .../CoordinatorStreamsRecordHelpersTest.java | 22 ++++++ .../group/streams/StreamsTopologyTest.java | 36 +++++++--- .../streams/processor/internals/StreamThread.java | 74 +++++++++++-------- 13 files changed, 316 insertions(+), 152 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java index 1e23233f4e6..cefe9175b68 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.common.TopicPartition; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -41,12 +42,8 @@ public class StreamsAssignmentInterface { private Optional<HostInfo> endpoint; - private String assignor; - private Map<String, Subtopology> subtopologyMap; - private Map<String, Object> assignmentConfiguration; - private Map<TaskId, Long> taskLags; private AtomicBoolean shutdownRequested; @@ -66,18 +63,10 @@ public class StreamsAssignmentInterface { return endpoint; } - public String assignor() { - return assignor; - } - public Map<String, Subtopology> subtopologyMap() { return subtopologyMap; } - public Map<String, Object> assignmentConfiguration() { - return assignmentConfiguration; - } - // TODO: This needs to be used somewhere public Map<TaskId, Long> taskLags() { return taskLags; @@ -190,11 +179,14 @@ public class StreamsAssignmentInterface { public static class TopicInfo { public final Optional<Integer> numPartitions; + public final Optional<Short> replicationFactor; public final Map<String, String> topicConfigs; public TopicInfo(final Optional<Integer> numPartitions, + final Optional<Short> replicationFactor, final Map<String, String> topicConfigs) { this.numPartitions = numPartitions; + this.replicationFactor = replicationFactor; this.topicConfigs = topicConfigs; } @@ -202,10 +194,10 @@ public class StreamsAssignmentInterface { public String toString() { return "TopicInfo{" + "numPartitions=" + numPartitions + + ", replicationFactor=" + replicationFactor + ", topicConfigs=" + topicConfigs + '}'; } - } public static class TaskId { @@ -241,15 +233,19 @@ public class StreamsAssignmentInterface { public final Set<String> sinkTopics; public final Map<String, TopicInfo> stateChangelogTopics; public final Map<String, TopicInfo> repartitionSourceTopics; + public final Collection<Set<String>> copartitionGroups; public Subtopology(final Set<String> sourceTopics, final Set<String> sinkTopics, final Map<String, TopicInfo> repartitionSourceTopics, - final Map<String, TopicInfo> stateChangelogTopics) { + final Map<String, TopicInfo> stateChangelogTopics, + final Collection<Set<String>> copartitionGroups + ) { this.sourceTopics = sourceTopics; this.sinkTopics = sinkTopics; this.stateChangelogTopics = stateChangelogTopics; this.repartitionSourceTopics = repartitionSourceTopics; + this.copartitionGroups = copartitionGroups; } @Override @@ -259,22 +255,19 @@ public class StreamsAssignmentInterface { ", sinkTopics=" + sinkTopics + ", stateChangelogTopics=" + stateChangelogTopics + ", repartitionSourceTopics=" + repartitionSourceTopics + + ", copartitionGroups=" + copartitionGroups + '}'; } } public StreamsAssignmentInterface(UUID processId, Optional<HostInfo> endpoint, - String assignor, Map<String, Subtopology> subtopologyMap, - Map<String, Object> assignmentConfiguration, Map<String, String> clientTags ) { this.processId = processId; this.endpoint = endpoint; - this.assignor = assignor; this.subtopologyMap = subtopologyMap; - this.assignmentConfiguration = assignmentConfiguration; this.taskLags = new HashMap<>(); this.shutdownRequested = new AtomicBoolean(false); this.clientTags = clientTags; @@ -285,9 +278,7 @@ public class StreamsAssignmentInterface { return "StreamsAssignmentMetadata{" + "processID=" + processId + ", endpoint='" + endpoint + '\'' + - ", assignor='" + assignor + '\'' + ", subtopologyMap=" + subtopologyMap + - ", assignmentConfiguration=" + assignmentConfiguration + ", taskLags=" + taskLags + ", clientTags=" + clientTags + '}'; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java index b81beab78d9..8c7671efe67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; +import org.apache.kafka.common.message.StreamsGroupInitializeRequestData.CopartitionGroup; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.StreamsGroupInitializeRequest; import org.apache.kafka.common.requests.StreamsGroupInitializeResponse; @@ -27,9 +28,15 @@ import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class StreamsGroupInitializeRequestManager implements RequestManager { @@ -72,9 +79,8 @@ public class StreamsGroupInitializeRequestManager implements RequestManager { streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId()); final List<StreamsGroupInitializeRequestData.Subtopology> topology = getTopologyFromStreams(); streamsGroupInitializeRequestData.setTopology(topology); - final StreamsGroupInitializeRequest.Builder streamsGroupInitializeRequestBuilder = new StreamsGroupInitializeRequest.Builder( - streamsGroupInitializeRequestData - ); + final StreamsGroupInitializeRequest.Builder streamsGroupInitializeRequestBuilder = + new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequestData); return new NetworkClientDelegate.UnsentRequest( streamsGroupInitializeRequestBuilder, coordinatorRequestManager.coordinator() @@ -94,21 +100,72 @@ public class StreamsGroupInitializeRequestManager implements RequestManager { final StreamsAssignmentInterface.Subtopology subtopology) { final StreamsGroupInitializeRequestData.Subtopology subtopologyData = new StreamsGroupInitializeRequestData.Subtopology(); subtopologyData.setSubtopologyId(subtopologyName); - subtopologyData.setSourceTopics(new ArrayList<>(subtopology.sourceTopics)); - subtopologyData.setRepartitionSinkTopics(new ArrayList<>(subtopology.sinkTopics)); + ArrayList<String> sortedSourceTopics = new ArrayList<>(subtopology.sourceTopics); + Collections.sort(sortedSourceTopics); + subtopologyData.setSourceTopics(sortedSourceTopics); + // TODO: We should only encode the repartition sink topics here. + ArrayList<String> sortedSinkTopics = new ArrayList<>(subtopology.sinkTopics); + Collections.sort(sortedSinkTopics); + subtopologyData.setRepartitionSinkTopics(sortedSinkTopics); subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology)); subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology)); + subtopologyData.setCopartitionGroups( + getCopartitionGroupsFromStreams(subtopology.copartitionGroups, subtopologyData)); return subtopologyData; } + private static List<CopartitionGroup> getCopartitionGroupsFromStreams( + final Collection<Set<String>> copartitionGroups, + final StreamsGroupInitializeRequestData.Subtopology subtopologyData) { + + final Map<String, Short> sourceTopicsMap = + IntStream.range(0, subtopologyData.sourceTopics().size()) + .boxed() + .collect(Collectors.toMap(subtopologyData.sourceTopics()::get, Integer::shortValue)); + + final Map<String, Short> repartitionSourceTopics = + IntStream.range(0, subtopologyData.repartitionSourceTopics().size()) + .boxed() + .collect( + Collectors.toMap(x -> subtopologyData.repartitionSourceTopics().get(x).name(), + Integer::shortValue)); + + return copartitionGroups.stream() + .map(x -> getCopartitionGroupFromStreams(x, sourceTopicsMap, repartitionSourceTopics)) + .collect(Collectors.toList()); + } + + private static CopartitionGroup getCopartitionGroupFromStreams( + final Set<String> topicNames, + final Map<String, Short> sourceTopicsMap, + final Map<String, Short> repartitionSourceTopics) { + CopartitionGroup copartitionGroup = new CopartitionGroup(); + + topicNames.forEach(topicName -> { + if (sourceTopicsMap.containsKey(topicName)) { + copartitionGroup.sourceTopics().add(sourceTopicsMap.get(topicName)); + } else if (repartitionSourceTopics.containsKey(topicName)) { + copartitionGroup.repartitionSourceTopics() + .add(repartitionSourceTopics.get(topicName)); + } else { + throw new IllegalStateException( + "Source topic not found in subtopology: " + topicName); + } + }); + + return copartitionGroup; + } + private static List<StreamsGroupInitializeRequestData.TopicInfo> getRepartitionTopicsInfoFromStreams(final StreamsAssignmentInterface.Subtopology subtopologyDataFromStreams) { final List<StreamsGroupInitializeRequestData.TopicInfo> repartitionTopicsInfo = new ArrayList<>(); for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> repartitionTopic : subtopologyDataFromStreams.repartitionSourceTopics.entrySet()) { final StreamsGroupInitializeRequestData.TopicInfo repartitionTopicInfo = new StreamsGroupInitializeRequestData.TopicInfo(); repartitionTopicInfo.setName(repartitionTopic.getKey()); repartitionTopic.getValue().numPartitions.ifPresent(repartitionTopicInfo::setPartitions); + repartitionTopic.getValue().replicationFactor.ifPresent(repartitionTopicInfo::setReplicationFactor); repartitionTopicsInfo.add(repartitionTopicInfo); } + repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupInitializeRequestData.TopicInfo::name)); return repartitionTopicsInfo; } @@ -117,11 +174,13 @@ public class StreamsGroupInitializeRequestManager implements RequestManager { for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> changelogTopic : subtopologyDataFromStreams.stateChangelogTopics.entrySet()) { final StreamsGroupInitializeRequestData.TopicInfo changelogTopicInfo = new StreamsGroupInitializeRequestData.TopicInfo(); changelogTopicInfo.setName(changelogTopic.getKey()); + changelogTopic.getValue().replicationFactor.ifPresent(changelogTopicInfo::setReplicationFactor); changelogTopicsInfo.add(changelogTopicInfo); } + changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupInitializeRequestData.TopicInfo::name)); return changelogTopicsInfo; } - + private void onResponse(final ClientResponse response, final Throwable exception) { if (exception != null) { // todo: handle error diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index 227ac495c2d..71d74eee831 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -52,14 +52,14 @@ "about": "String to uniquely identify the subtopology. Deterministically generated from the topology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, - { "name": "SourceTopicRegex", "type": "string", "versions": "0+", - "about": "The regular expressions identifying topics the topology reads from." }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", "about": "The repartition topics the topology writes to." }, + { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+", + "about": "Regular expressions identifying topics the subtopology reads from." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of source topics that are internally created repartition topics. Created automatically." } + "about": "The set of source topics that are internally created repartition topics." } ] }, { "name": "Members", "type": "[]Member", "versions": "0+", @@ -143,7 +143,9 @@ "about": "The name of the topic." }, { "name": "Partitions", "type": "int32", "versions": "0+", "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, - { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "ReplicationFactor", "type": "int16", "versions": "0+", + "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, + { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } ]} diff --git a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json index 05c4709092c..aa68fde6bf5 100644 --- a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json @@ -32,14 +32,25 @@ "about": "String to uniquely identify the subtopology. Deterministically generated from the topology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, - { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The regular expressions identifying topics the subtopology reads from." }, + { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+", + "about": "Regular expressions identifying topics the subtopology reads from." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", "about": "The repartition topics the subtopology writes to." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of source topics that are internally created repartition topics. Created automatically." } + "about": "The set of source topics that are internally created repartition topics. Created automatically." }, + { "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+", + "about": "A subset of source topics that must be copartitioned.", + "fields": [ + { "name": "SourceTopics", "type": "[]int16", "versions": "0+", + "about": "The topics the topology reads from. Index into the array on the subtopology level." }, + { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+", + "about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." }, + { "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+", + "about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." } + ] + } ] } ], @@ -56,7 +67,9 @@ "about": "The name of the topic." }, { "name": "Partitions", "type": "int32", "versions": "0+", "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, - { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "ReplicationFactor", "type": "int16", "versions": "0+", + "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, + { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } ]} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index f4e5d814a57..91b9b5ae45a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -107,12 +107,8 @@ class StreamsGroupHeartbeatRequestManagerTest { private final StreamsAssignmentInterface.HostInfo endPoint = new StreamsAssignmentInterface.HostInfo("localhost", 8080); - private final String assignor = "test"; - private final Map<String, Subtopology> subtopologyMap = new HashMap<>(); - private final Map<String, Object> assignmentConfiguration = new HashMap<>(); - private final Map<String, String> clientTags = new HashMap<>(); private final Node coordinatorNode = new Node(1, "localhost", 9092); @@ -122,15 +118,12 @@ class StreamsGroupHeartbeatRequestManagerTest { config = config(); subtopologyMap.clear(); - assignmentConfiguration.clear(); clientTags.clear(); streamsAssignmentInterface = new StreamsAssignmentInterface( processID, Optional.of(endPoint), - assignor, subtopologyMap, - assignmentConfiguration, clientTags ); LogContext logContext = new LogContext("test"); @@ -205,7 +198,6 @@ class StreamsGroupHeartbeatRequestManagerTest { @Test void testFullStaticInformationWhenJoining() { mockJoiningState(); - assignmentConfiguration.put("config1", "value1"); clientTags.put("clientTag1", "value2"); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); @@ -245,7 +237,7 @@ class StreamsGroupHeartbeatRequestManagerTest { final Uuid uuid0 = Uuid.randomUuid(); final Uuid uuid1 = Uuid.randomUuid(); - final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(), Collections.emptyMap()); + final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(), Optional.empty(), Collections.emptyMap()); when(metadata.topicIds()).thenReturn( mkMap( @@ -258,21 +250,24 @@ class StreamsGroupHeartbeatRequestManagerTest { Collections.singleton("source0"), Collections.singleton("sink0"), Collections.singletonMap("repartition0", emptyTopicInfo), - Collections.singletonMap("changelog0", emptyTopicInfo) + Collections.singletonMap("changelog0", emptyTopicInfo), + Collections.singletonList(mkSet("source0", "repartition0")) )); streamsAssignmentInterface.subtopologyMap().put("1", new Subtopology( Collections.singleton("source1"), Collections.singleton("sink1"), Collections.singletonMap("repartition1", emptyTopicInfo), - Collections.singletonMap("changelog1", emptyTopicInfo) + Collections.singletonMap("changelog1", emptyTopicInfo), + Collections.singletonList(mkSet("source1", "repartition1")) )); streamsAssignmentInterface.subtopologyMap().put("2", new Subtopology( Collections.singleton("source2"), Collections.singleton("sink2"), Collections.singletonMap("repartition2", emptyTopicInfo), - Collections.singletonMap("changelog2", emptyTopicInfo) + Collections.singletonMap("changelog2", emptyTopicInfo), + Collections.singletonList(mkSet("source2", "repartition2")) )); StreamsGroupHeartbeatResponseData data = new StreamsGroupHeartbeatResponseData() diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java index 11c92389f6a..f5bc091e7b0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java @@ -26,7 +26,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -73,19 +74,24 @@ class StreamsGroupInitializeRequestManagerTest { final Set<String> sourceTopics = mkSet("sourceTopic1", "sourceTopic2"); final Set<String> sinkTopics = mkSet("sinkTopic1", "sinkTopic2", "sinkTopic3"); final Map<String, StreamsAssignmentInterface.TopicInfo> repartitionTopics = mkMap( - mkEntry("repartitionTopic1", new StreamsAssignmentInterface.TopicInfo(Optional.of(2), Collections.emptyMap())), - mkEntry("repartitionTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.of(3), Collections.emptyMap())) + mkEntry("repartitionTopic1", new StreamsAssignmentInterface.TopicInfo(Optional.of(2), Optional.of((short) 1), Collections.emptyMap())), + mkEntry("repartitionTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.of(3), Optional.of((short) 3), Collections.emptyMap())) ); final Map<String, StreamsAssignmentInterface.TopicInfo> changelogTopics = mkMap( - mkEntry("changelogTopic1", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())), - mkEntry("changelogTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())), - mkEntry("changelogTopic3", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())) + mkEntry("changelogTopic1", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 1), Collections.emptyMap())), + mkEntry("changelogTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 2), Collections.emptyMap())), + mkEntry("changelogTopic3", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 3), Collections.emptyMap())) + ); + final Collection<Set<String>> copartitionGroup = mkSet( + mkSet("sourceTopic1", "repartitionTopic2"), + mkSet("sourceTopic2", "repartitionTopic1") ); final StreamsAssignmentInterface.Subtopology subtopology1 = new StreamsAssignmentInterface.Subtopology( sourceTopics, sinkTopics, repartitionTopics, - changelogTopics + changelogTopics, + copartitionGroup ); final String subtopologyName1 = "subtopology1"; when(streamsAssignmentInterface.subtopologyMap()).thenReturn( @@ -116,17 +122,28 @@ class StreamsGroupInitializeRequestManagerTest { assertEquals(1, subtopologies.size()); final StreamsGroupInitializeRequestData.Subtopology subtopology = subtopologies.get(0); assertEquals(subtopologyName1, subtopology.subtopologyId()); - assertEquals(new ArrayList<>(sourceTopics), subtopology.sourceTopics()); - assertEquals(new ArrayList<>(sinkTopics), subtopology.repartitionSinkTopics()); + assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), subtopology.sourceTopics()); + assertEquals(Arrays.asList("sinkTopic1", "sinkTopic2", "sinkTopic3"), subtopology.repartitionSinkTopics()); assertEquals(repartitionTopics.size(), subtopology.repartitionSourceTopics().size()); subtopology.repartitionSourceTopics().forEach(topicInfo -> { final StreamsAssignmentInterface.TopicInfo repartitionTopic = repartitionTopics.get(topicInfo.name()); assertEquals(repartitionTopic.numPartitions.get(), topicInfo.partitions()); + assertEquals(repartitionTopic.replicationFactor.get(), topicInfo.replicationFactor()); }); assertEquals(changelogTopics.size(), subtopology.stateChangelogTopics().size()); subtopology.stateChangelogTopics().forEach(topicInfo -> { assertTrue(changelogTopics.containsKey(topicInfo.name())); assertEquals(0, topicInfo.partitions()); + final StreamsAssignmentInterface.TopicInfo changelogTopic = changelogTopics.get(topicInfo.name()); + assertEquals(changelogTopic.replicationFactor.get(), topicInfo.replicationFactor()); }); + + assertEquals(2, subtopology.copartitionGroups().size()); + final StreamsGroupInitializeRequestData.CopartitionGroup copartitionGroupData1 = subtopology.copartitionGroups().get(0); + assertEquals(Collections.singletonList((short) 0), copartitionGroupData1.sourceTopics()); + assertEquals(Collections.singletonList((short) 1), copartitionGroupData1.repartitionSourceTopics()); + final StreamsGroupInitializeRequestData.CopartitionGroup copartitionGroupData2 = subtopology.copartitionGroups().get(1); + assertEquals(Collections.singletonList((short) 1), copartitionGroupData2.sourceTopics()); + assertEquals(Collections.singletonList((short) 0), copartitionGroupData2.repartitionSourceTopics()); } } \ No newline at end of file diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index a8f742f0d26..f613d11c8f3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -216,6 +216,7 @@ import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged; +import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.convertToStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord; @@ -2696,6 +2697,8 @@ public class GroupMetadataManager { } } + StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(subtopologies); + cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId); if (!missingTopics.isEmpty()) { @@ -2706,9 +2709,12 @@ public class GroupMetadataManager { return new CoordinatorResult<>(records, response); } else { - records.add(newStreamsGroupTopologyRecord(groupId, subtopologies)); + records.add(newStreamsGroupTopologyRecord(groupId, recordValue)); + + final Map<String, StreamsGroupTopologyValue.Subtopology> subtopologyMap = recordValue.topology().stream() + .collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x)); - final StreamsTopology topology = new StreamsTopology(topologyId, subtopologies); + final StreamsTopology topology = new StreamsTopology(topologyId, subtopologyMap); computeFirstTargetAssignmentAfterTopologyInitialization(group, records, topology); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java index 7a7ee7f2ae2..532955f87aa 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java @@ -381,28 +381,17 @@ public class CoordinatorStreamsRecordHelpers { */ public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) { - StreamsGroupTopologyValue value = new StreamsGroupTopologyValue(); - subtopologies.forEach(subtopology -> { - List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = subtopology.repartitionSourceTopics().stream() - .map(topicInfo -> { - List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() - .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) - .collect(Collectors.toList()) : null; - return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs) - .setPartitions(topicInfo.partitions()); - }).collect(Collectors.toList()); - - List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = subtopology.stateChangelogTopics().stream().map(topicInfo -> { - List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() - .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) - .collect(Collectors.toList()) : null; - return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs); - }).collect(Collectors.toList()); + return newStreamsGroupTopologyRecord(groupId, convertToStreamsGroupTopologyRecord(subtopologies)); + } - value.topology().add(new StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId()) - .setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) - .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); - }); + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param value The encoded topology record value. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, StreamsGroupTopologyValue value) { return new CoordinatorRecord(new ApiMessageAndVersion( new StreamsGroupTopologyKey() @@ -411,6 +400,59 @@ public class CoordinatorStreamsRecordHelpers { new ApiMessageAndVersion(value, (short) 0)); } + /** + * Encodes subtopologies from the initialize RPC to a StreamsTopology record value. + * + * @param subtopologies The subtopologies in the new topology. + * @return The record value. + */ + public static StreamsGroupTopologyValue convertToStreamsGroupTopologyRecord(List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) { + StreamsGroupTopologyValue value = new StreamsGroupTopologyValue(); + subtopologies.forEach(subtopology -> { + List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = + subtopology.repartitionSourceTopics().stream() + .map(CoordinatorStreamsRecordHelpers::convertToTopicInfo) + .collect(Collectors.toList()); + + List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = + subtopology.stateChangelogTopics().stream() + .map(CoordinatorStreamsRecordHelpers::convertToTopicInfo) + .collect(Collectors.toList()); + + List<StreamsGroupTopologyValue.CopartitionGroup> copartitionGroups = + subtopology.copartitionGroups().stream() + .map(copartitionGroup -> new StreamsGroupTopologyValue.CopartitionGroup() + .setSourceTopics(copartitionGroup.sourceTopics()) + .setSourceTopicRegex(copartitionGroup.sourceTopicRegex()) + .setRepartitionSourceTopics(copartitionGroup.repartitionSourceTopics()) + ) + .collect(Collectors.toList()); + + value.topology().add( + new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId(subtopology.subtopologyId()) + .setSourceTopics(subtopology.sourceTopics()) + .setSourceTopicRegex(subtopology.sourceTopicRegex()) + .setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) + .setRepartitionSourceTopics(repartitionSourceTopics) + .setStateChangelogTopics(stateChangelogTopics) + .setCopartitionGroups(copartitionGroups) + ); + }); + return value; + } + + private static StreamsGroupTopologyValue.TopicInfo convertToTopicInfo(StreamsGroupInitializeRequestData.TopicInfo topicInfo) { + List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() + .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) + .collect(Collectors.toList()) : null; + return new StreamsGroupTopologyValue.TopicInfo() + .setName(topicInfo.name()) + .setTopicConfigs(topicConfigs) + .setPartitions(topicInfo.partitions()) + .setReplicationFactor(topicInfo.replicationFactor()); + } + /** * Creates a StreamsGroupTopology tombstone. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java index 27f138be05f..408f0bfcbb4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java @@ -17,12 +17,10 @@ package org.apache.kafka.coordinator.group.streams; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; -import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,35 +43,6 @@ public class StreamsTopology { this.subtopologies = subtopologies; } - public StreamsTopology(final String topologyId, - final List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) { - this.topologyId = topologyId; - this.subtopologies = new HashMap<>(); - subtopologies.forEach(subtopology -> { - List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = subtopology.repartitionSourceTopics().stream() - .map(topicInfo -> { - List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() - .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) - .collect(Collectors.toList()) : null; - return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs) - .setPartitions(topicInfo.partitions()); - }).collect(Collectors.toList()); - - List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = subtopology.stateChangelogTopics().stream().map(topicInfo -> { - List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() - .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) - .collect(Collectors.toList()) : null; - return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs); - }).collect(Collectors.toList()); - - this.subtopologies.put( - subtopology.subtopologyId(), - new StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId()) - .setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) - .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); - }); - } - public String topologyId() { return topologyId; } @@ -141,6 +110,7 @@ public class StreamsTopology { new StreamsGroupDescribeResponseData.TopicInfo() .setName(x.name()) .setPartitions(x.partitions()) + .setReplicationFactor(x.replicationFactor()) .setTopicConfigs( x.topicConfigs() != null ? x.topicConfigs().stream().map( diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json index af1fae06aad..d89ed3ed393 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json @@ -29,14 +29,25 @@ "about": "String to uniquely identify the subtopology. Deterministically generated from the topology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, - { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The regular expressions identifying topics the topology reads from. null if not provided." }, + { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+", + "about": "Regular expressions identifying topics the sub-topology reads from." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of state changelog topics associated with this subtopology. " }, + "about": "The set of state changelog topics associated with this sub-topology." }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", "about": "The repartition topics the subtopology writes to." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of source topics that are internally created repartition topics. " } + "about": "The set of source topics that are internally created repartition topics." }, + { "name": "CopartitionGroups", "type": "[]CopartitionGroup", "versions": "0+", + "about": "A subset of source topics that must be copartitioned.", + "fields": [ + { "name": "SourceTopics", "type": "[]int16", "versions": "0+", + "about": "The topics the topology reads from. Index into the array on the subtopology level." }, + { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+", + "about": "Regular expressions identifying topics the subtopology reads from. Index into the array on the subtopology level." }, + { "name": "RepartitionSourceTopics", "type": "[]int16", "versions": "0+", + "about": "The set of source topics that are internally created repartition topics. Index into the array on the subtopology level." } + ] + } ] } ], @@ -53,7 +64,9 @@ "about": "The name of the topic." }, { "name": "Partitions", "type": "int32", "versions": "0+", "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, - { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "ReplicationFactor", "type": "int16", "versions": "0+", + "about": "The replication factor of the topic. Can be 0 if the default replication factor should be used." }, + { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", "about": "Topic-level configurations as key-value pairs." } ]} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java index 7ddbc832d80..65ad8d43fa8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -38,11 +39,13 @@ class CoordinatorStreamsRecordHelpersTest { .setSubtopologyId("subtopology-id") .setRepartitionSinkTopics(Collections.singletonList("foo")) .setSourceTopics(Collections.singletonList("bar")) + .setSourceTopicRegex(Collections.singletonList("regex")) .setRepartitionSourceTopics( Collections.singletonList( new StreamsGroupInitializeRequestData.TopicInfo() .setName("repartition") .setPartitions(4) + .setReplicationFactor((short) 3) .setTopicConfigs(Collections.singletonList( new StreamsGroupInitializeRequestData.TopicConfig() .setKey("config-name1") @@ -54,6 +57,7 @@ class CoordinatorStreamsRecordHelpersTest { Collections.singletonList( new StreamsGroupInitializeRequestData.TopicInfo() .setName("changelog") + .setReplicationFactor((short) 2) .setTopicConfigs(Collections.singletonList( new StreamsGroupInitializeRequestData.TopicConfig() .setKey("config-name2") @@ -61,6 +65,13 @@ class CoordinatorStreamsRecordHelpersTest { )) ) ) + .setCopartitionGroups(Arrays.asList( + new StreamsGroupInitializeRequestData.CopartitionGroup() + .setSourceTopics(Collections.singletonList((short) 0)) + .setRepartitionSourceTopics(Collections.singletonList((short) 0)), + new StreamsGroupInitializeRequestData.CopartitionGroup() + .setSourceTopicRegex(Collections.singletonList((short) 0)) + )) ); List<StreamsGroupTopologyValue.Subtopology> expectedTopology = @@ -68,11 +79,13 @@ class CoordinatorStreamsRecordHelpersTest { .setSubtopologyId("subtopology-id") .setRepartitionSinkTopics(Collections.singletonList("foo")) .setSourceTopics(Collections.singletonList("bar")) + .setSourceTopicRegex(Collections.singletonList("regex")) .setRepartitionSourceTopics( Collections.singletonList( new StreamsGroupTopologyValue.TopicInfo() .setName("repartition") .setPartitions(4) + .setReplicationFactor((short) 3) .setTopicConfigs(Collections.singletonList( new StreamsGroupTopologyValue.TopicConfig() .setKey("config-name1") @@ -84,6 +97,7 @@ class CoordinatorStreamsRecordHelpersTest { Collections.singletonList( new StreamsGroupTopologyValue.TopicInfo() .setName("changelog") + .setReplicationFactor((short) 2) .setTopicConfigs(Collections.singletonList( new StreamsGroupTopologyValue.TopicConfig() .setKey("config-name2") @@ -91,6 +105,13 @@ class CoordinatorStreamsRecordHelpersTest { )) ) ) + .setCopartitionGroups(Arrays.asList( + new StreamsGroupTopologyValue.CopartitionGroup() + .setSourceTopics(Collections.singletonList((short) 0)) + .setRepartitionSourceTopics(Collections.singletonList((short) 0)), + new StreamsGroupTopologyValue.CopartitionGroup() + .setSourceTopicRegex(Collections.singletonList((short) 0)) + )) ); CoordinatorRecord expectedRecord = new CoordinatorRecord( @@ -108,4 +129,5 @@ class CoordinatorStreamsRecordHelpersTest { topology )); } + } \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java index e83abcbfe7c..7aa303603ab 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java @@ -145,40 +145,60 @@ public class StreamsTopologyTest { public void asStreamsGroupDescribeTopologyShouldReturnCorrectSubtopologies() { Map<String, Subtopology> subtopologies = mkMap( mkEntry("subtopology-1", new Subtopology() - .setSourceTopicRegex("regex-1") + .setSourceTopicRegex(Collections.singletonList("regex-1")) .setSubtopologyId("subtopology-1") .setSourceTopics(Collections.singletonList("source-topic-1")) .setRepartitionSinkTopics(Collections.singletonList("sink-topic-1")) .setRepartitionSourceTopics( - Collections.singletonList(new TopicInfo().setName("repartition-topic-1"))) + Collections.singletonList(new TopicInfo() + .setName("repartition-topic-1") + .setReplicationFactor((short) 3) + .setPartitions(2))) .setStateChangelogTopics( - Collections.singletonList(new TopicInfo().setName("changelog-topic-1"))) + Collections.singletonList(new TopicInfo() + .setName("changelog-topic-1") + .setReplicationFactor((short) 2) + .setPartitions(1))) ), mkEntry("subtopology-2", new Subtopology() - .setSourceTopicRegex("regex-2") + .setSourceTopicRegex(Collections.singletonList("regex-2")) .setSubtopologyId("subtopology-2") .setSourceTopics(Collections.singletonList("source-topic-2")) .setRepartitionSinkTopics(Collections.singletonList("sink-topic-2")) .setRepartitionSourceTopics( - Collections.singletonList(new TopicInfo().setName("repartition-topic-2"))) + Collections.singletonList(new TopicInfo() + .setName("repartition-topic-2") + .setReplicationFactor((short) 3) + .setPartitions(2))) .setStateChangelogTopics( - Collections.singletonList(new TopicInfo().setName("changelog-topic-2"))) + Collections.singletonList(new TopicInfo() + .setName("changelog-topic-2") + .setReplicationFactor((short) 2) + .setPartitions(1))) ) ); StreamsTopology topology = new StreamsTopology("topology-id", subtopologies); List<StreamsGroupDescribeResponseData.Subtopology> result = topology.asStreamsGroupDescribeTopology(); assertEquals(2, result.size()); - assertEquals("regex-1", result.get(0).sourceTopicRegex()); + assertEquals(Collections.singletonList("regex-1"), result.get(0).sourceTopicRegex()); assertEquals("subtopology-1", result.get(0).subtopologyId()); assertEquals(Collections.singletonList("source-topic-1"), result.get(0).sourceTopics()); assertEquals(Collections.singletonList("sink-topic-1"), result.get(0).repartitionSinkTopics()); assertEquals("repartition-topic-1", result.get(0).repartitionSourceTopics().get(0).name()); + assertEquals((short) 3, result.get(0).repartitionSourceTopics().get(0).replicationFactor()); + assertEquals(2, result.get(0).repartitionSourceTopics().get(0).partitions()); assertEquals("changelog-topic-1", result.get(0).stateChangelogTopics().get(0).name()); - assertEquals("regex-2", result.get(1).sourceTopicRegex()); + assertEquals((short) 2, result.get(0).stateChangelogTopics().get(0).replicationFactor()); + assertEquals(1, result.get(0).stateChangelogTopics().get(0).partitions()); + assertEquals(Collections.singletonList("regex-2"), result.get(1).sourceTopicRegex()); assertEquals("subtopology-2", result.get(1).subtopologyId()); assertEquals(Collections.singletonList("source-topic-2"), result.get(1).sourceTopics()); assertEquals(Collections.singletonList("sink-topic-2"), result.get(1).repartitionSinkTopics()); assertEquals("repartition-topic-2", result.get(1).repartitionSourceTopics().get(0).name()); + assertEquals((short) 3, result.get(1).repartitionSourceTopics().get(0).replicationFactor()); + assertEquals(2, result.get(1).repartitionSourceTopics().get(0).partitions()); assertEquals("changelog-topic-2", result.get(1).stateChangelogTopics().get(0).name()); + assertEquals((short) 2, result.get(1).stateChangelogTopics().get(0).replicationFactor()); + assertEquals(1, result.get(1).stateChangelogTopics().get(0).partitions()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 0b69a099663..27e709490be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -73,6 +73,7 @@ import org.slf4j.Logger; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -558,9 +559,38 @@ public class StreamThread extends Thread implements ProcessingThread { final TopologyMetadata topologyMetadata) { final InternalTopologyBuilder internalTopologyBuilder = topologyMetadata.lookupBuilderForNamedTopology(null); + final Map<String, Subtopology> subtopologyMap = initBrokerTopology(config, internalTopologyBuilder); + + return new StreamsAssignmentInterface( + processId, + endpoint, + subtopologyMap, + config.getClientTags() + ); + } + + private static Map<String, Subtopology> initBrokerTopology(final StreamsConfig config, final InternalTopologyBuilder internalTopologyBuilder) { + final Map<String, String> defaultTopicConfigs = new HashMap<>(); + for (final Map.Entry<String, Object> entry : config.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) { + if (entry.getValue() != null) { + defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString()); + } + } + final long windowChangeLogAdditionalRetention = config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG); + final Map<String, Subtopology> subtopologyMap = new HashMap<>(); + final Collection<Set<String>> copartitionGroups = internalTopologyBuilder.copartitionGroups(); + for (final Map.Entry<TopologyMetadata.Subtopology, TopicsInfo> topicsInfoEntry : internalTopologyBuilder.subtopologyToTopicsInfo() .entrySet()) { + + final HashSet<String> allSourceTopics = new HashSet<>( + topicsInfoEntry.getValue().sourceTopics); + topicsInfoEntry.getValue().repartitionSourceTopics.forEach( + (repartitionSourceTopic, repartitionTopicInfo) -> { + allSourceTopics.add(repartitionSourceTopic); + }); + subtopologyMap.put( String.valueOf(topicsInfoEntry.getKey().nodeGroupId), new Subtopology( @@ -569,44 +599,28 @@ public class StreamThread extends Thread implements ProcessingThread { topicsInfoEntry.getValue().repartitionSourceTopics.entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> - new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))), + new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), + Optional.of(config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()), + e.getValue().properties(defaultTopicConfigs, windowChangeLogAdditionalRetention)))), topicsInfoEntry.getValue().stateChangelogTopics.entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> - new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))) - + new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), + Optional.of(config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()), + e.getValue().properties(defaultTopicConfigs, windowChangeLogAdditionalRetention)))), + copartitionGroups.stream().filter(allSourceTopics::containsAll).collect( + Collectors.toList()) ) ); } - // TODO: Which of these are actually needed? - // TODO: Maybe we want to split this into assignment properties and internal topic configuration properties - final HashMap<String, Object> assignmentProperties = new HashMap<>(); - assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG)); - assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)); - assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); - assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, - config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG)); - assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG)); - assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, - config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, - config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, - config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, - config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, - config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); + if (subtopologyMap.values().stream().mapToInt(x -> x.copartitionGroups.size()).sum() + != copartitionGroups.size()) { + throw new IllegalStateException( + "Not all copartition groups were converted to broker topology"); + } - return new StreamsAssignmentInterface( - processId, - endpoint, - null, - subtopologyMap, - assignmentProperties, - config.getClientTags() - ); + return subtopologyMap; } private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled,
