This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new 8c3ec029bf3 Complete topic metadata for automatic topic creation
(#17391)
8c3ec029bf3 is described below
commit 8c3ec029bf3d30256842a7ba9ac23b16cbde71d5
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Oct 8 17:30:48 2024 +0200
Complete topic metadata for automatic topic creation (#17391)
* KSTREAMS-6456: Complete topic metadata for automatic topic creation
This change updates the current RPCs and schemas to add the following
metadata:
- copartition groups are added to topology record and initialize RPC
- multiple regexs are added to topology record and initialize RPC
- replication factors are added for reach internal topic
We also add code to fill this information correctly from the
`InternalTopologyBuilder` object.
The fields `assignmentConfiguration` and `assignor` in the
`StreamsAssignmentInterface` are removed, because they are not
needed anymore.
* fixes
---
.../internals/StreamsAssignmentInterface.java | 31 +++-----
.../StreamsGroupInitializeRequestManager.java | 71 ++++++++++++++++--
.../message/StreamsGroupDescribeResponse.json | 10 +--
.../message/StreamsGroupInitializeRequest.json | 21 ++++--
.../StreamsGroupHeartbeatRequestManagerTest.java | 19 ++---
.../StreamsGroupInitializeRequestManagerTest.java | 35 ++++++---
.../coordinator/group/GroupMetadataManager.java | 10 ++-
.../streams/CoordinatorStreamsRecordHelpers.java | 84 ++++++++++++++++------
.../coordinator/group/streams/StreamsTopology.java | 32 +--------
.../common/message/StreamsGroupTopologyValue.json | 23 ++++--
.../CoordinatorStreamsRecordHelpersTest.java | 22 ++++++
.../group/streams/StreamsTopologyTest.java | 36 +++++++---
.../streams/processor/internals/StreamThread.java | 74 +++++++++++--------
13 files changed, 316 insertions(+), 152 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
index 1e23233f4e6..cefe9175b68 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.TopicPartition;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -41,12 +42,8 @@ public class StreamsAssignmentInterface {
private Optional<HostInfo> endpoint;
- private String assignor;
-
private Map<String, Subtopology> subtopologyMap;
- private Map<String, Object> assignmentConfiguration;
-
private Map<TaskId, Long> taskLags;
private AtomicBoolean shutdownRequested;
@@ -66,18 +63,10 @@ public class StreamsAssignmentInterface {
return endpoint;
}
- public String assignor() {
- return assignor;
- }
-
public Map<String, Subtopology> subtopologyMap() {
return subtopologyMap;
}
- public Map<String, Object> assignmentConfiguration() {
- return assignmentConfiguration;
- }
-
// TODO: This needs to be used somewhere
public Map<TaskId, Long> taskLags() {
return taskLags;
@@ -190,11 +179,14 @@ public class StreamsAssignmentInterface {
public static class TopicInfo {
public final Optional<Integer> numPartitions;
+ public final Optional<Short> replicationFactor;
public final Map<String, String> topicConfigs;
public TopicInfo(final Optional<Integer> numPartitions,
+ final Optional<Short> replicationFactor,
final Map<String, String> topicConfigs) {
this.numPartitions = numPartitions;
+ this.replicationFactor = replicationFactor;
this.topicConfigs = topicConfigs;
}
@@ -202,10 +194,10 @@ public class StreamsAssignmentInterface {
public String toString() {
return "TopicInfo{" +
"numPartitions=" + numPartitions +
+ ", replicationFactor=" + replicationFactor +
", topicConfigs=" + topicConfigs +
'}';
}
-
}
public static class TaskId {
@@ -241,15 +233,19 @@ public class StreamsAssignmentInterface {
public final Set<String> sinkTopics;
public final Map<String, TopicInfo> stateChangelogTopics;
public final Map<String, TopicInfo> repartitionSourceTopics;
+ public final Collection<Set<String>> copartitionGroups;
public Subtopology(final Set<String> sourceTopics,
final Set<String> sinkTopics,
final Map<String, TopicInfo>
repartitionSourceTopics,
- final Map<String, TopicInfo> stateChangelogTopics) {
+ final Map<String, TopicInfo> stateChangelogTopics,
+ final Collection<Set<String>> copartitionGroups
+ ) {
this.sourceTopics = sourceTopics;
this.sinkTopics = sinkTopics;
this.stateChangelogTopics = stateChangelogTopics;
this.repartitionSourceTopics = repartitionSourceTopics;
+ this.copartitionGroups = copartitionGroups;
}
@Override
@@ -259,22 +255,19 @@ public class StreamsAssignmentInterface {
", sinkTopics=" + sinkTopics +
", stateChangelogTopics=" + stateChangelogTopics +
", repartitionSourceTopics=" + repartitionSourceTopics +
+ ", copartitionGroups=" + copartitionGroups +
'}';
}
}
public StreamsAssignmentInterface(UUID processId,
Optional<HostInfo> endpoint,
- String assignor,
Map<String, Subtopology> subtopologyMap,
- Map<String, Object>
assignmentConfiguration,
Map<String, String> clientTags
) {
this.processId = processId;
this.endpoint = endpoint;
- this.assignor = assignor;
this.subtopologyMap = subtopologyMap;
- this.assignmentConfiguration = assignmentConfiguration;
this.taskLags = new HashMap<>();
this.shutdownRequested = new AtomicBoolean(false);
this.clientTags = clientTags;
@@ -285,9 +278,7 @@ public class StreamsAssignmentInterface {
return "StreamsAssignmentMetadata{" +
"processID=" + processId +
", endpoint='" + endpoint + '\'' +
- ", assignor='" + assignor + '\'' +
", subtopologyMap=" + subtopologyMap +
- ", assignmentConfiguration=" + assignmentConfiguration +
", taskLags=" + taskLags +
", clientTags=" + clientTags +
'}';
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
index b81beab78d9..8c7671efe67 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
+import
org.apache.kafka.common.message.StreamsGroupInitializeRequestData.CopartitionGroup;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.StreamsGroupInitializeRequest;
import org.apache.kafka.common.requests.StreamsGroupInitializeResponse;
@@ -27,9 +28,15 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class StreamsGroupInitializeRequestManager implements RequestManager {
@@ -72,9 +79,8 @@ public class StreamsGroupInitializeRequestManager implements
RequestManager {
streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId());
final List<StreamsGroupInitializeRequestData.Subtopology> topology =
getTopologyFromStreams();
streamsGroupInitializeRequestData.setTopology(topology);
- final StreamsGroupInitializeRequest.Builder
streamsGroupInitializeRequestBuilder = new
StreamsGroupInitializeRequest.Builder(
- streamsGroupInitializeRequestData
- );
+ final StreamsGroupInitializeRequest.Builder
streamsGroupInitializeRequestBuilder =
+ new
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequestData);
return new NetworkClientDelegate.UnsentRequest(
streamsGroupInitializeRequestBuilder,
coordinatorRequestManager.coordinator()
@@ -94,21 +100,72 @@ public class StreamsGroupInitializeRequestManager
implements RequestManager {
final StreamsAssignmentInterface.Subtopology subtopology) {
final StreamsGroupInitializeRequestData.Subtopology subtopologyData =
new StreamsGroupInitializeRequestData.Subtopology();
subtopologyData.setSubtopologyId(subtopologyName);
- subtopologyData.setSourceTopics(new
ArrayList<>(subtopology.sourceTopics));
- subtopologyData.setRepartitionSinkTopics(new
ArrayList<>(subtopology.sinkTopics));
+ ArrayList<String> sortedSourceTopics = new
ArrayList<>(subtopology.sourceTopics);
+ Collections.sort(sortedSourceTopics);
+ subtopologyData.setSourceTopics(sortedSourceTopics);
+ // TODO: We should only encode the repartition sink topics here.
+ ArrayList<String> sortedSinkTopics = new
ArrayList<>(subtopology.sinkTopics);
+ Collections.sort(sortedSinkTopics);
+ subtopologyData.setRepartitionSinkTopics(sortedSinkTopics);
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
+ subtopologyData.setCopartitionGroups(
+ getCopartitionGroupsFromStreams(subtopology.copartitionGroups,
subtopologyData));
return subtopologyData;
}
+ private static List<CopartitionGroup> getCopartitionGroupsFromStreams(
+ final Collection<Set<String>> copartitionGroups,
+ final StreamsGroupInitializeRequestData.Subtopology subtopologyData) {
+
+ final Map<String, Short> sourceTopicsMap =
+ IntStream.range(0, subtopologyData.sourceTopics().size())
+ .boxed()
+ .collect(Collectors.toMap(subtopologyData.sourceTopics()::get,
Integer::shortValue));
+
+ final Map<String, Short> repartitionSourceTopics =
+ IntStream.range(0,
subtopologyData.repartitionSourceTopics().size())
+ .boxed()
+ .collect(
+ Collectors.toMap(x ->
subtopologyData.repartitionSourceTopics().get(x).name(),
+ Integer::shortValue));
+
+ return copartitionGroups.stream()
+ .map(x -> getCopartitionGroupFromStreams(x, sourceTopicsMap,
repartitionSourceTopics))
+ .collect(Collectors.toList());
+ }
+
+ private static CopartitionGroup getCopartitionGroupFromStreams(
+ final Set<String> topicNames,
+ final Map<String, Short> sourceTopicsMap,
+ final Map<String, Short> repartitionSourceTopics) {
+ CopartitionGroup copartitionGroup = new CopartitionGroup();
+
+ topicNames.forEach(topicName -> {
+ if (sourceTopicsMap.containsKey(topicName)) {
+
copartitionGroup.sourceTopics().add(sourceTopicsMap.get(topicName));
+ } else if (repartitionSourceTopics.containsKey(topicName)) {
+ copartitionGroup.repartitionSourceTopics()
+ .add(repartitionSourceTopics.get(topicName));
+ } else {
+ throw new IllegalStateException(
+ "Source topic not found in subtopology: " + topicName);
+ }
+ });
+
+ return copartitionGroup;
+ }
+
private static List<StreamsGroupInitializeRequestData.TopicInfo>
getRepartitionTopicsInfoFromStreams(final
StreamsAssignmentInterface.Subtopology subtopologyDataFromStreams) {
final List<StreamsGroupInitializeRequestData.TopicInfo>
repartitionTopicsInfo = new ArrayList<>();
for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo>
repartitionTopic :
subtopologyDataFromStreams.repartitionSourceTopics.entrySet()) {
final StreamsGroupInitializeRequestData.TopicInfo
repartitionTopicInfo = new StreamsGroupInitializeRequestData.TopicInfo();
repartitionTopicInfo.setName(repartitionTopic.getKey());
repartitionTopic.getValue().numPartitions.ifPresent(repartitionTopicInfo::setPartitions);
+
repartitionTopic.getValue().replicationFactor.ifPresent(repartitionTopicInfo::setReplicationFactor);
repartitionTopicsInfo.add(repartitionTopicInfo);
}
+
repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupInitializeRequestData.TopicInfo::name));
return repartitionTopicsInfo;
}
@@ -117,11 +174,13 @@ public class StreamsGroupInitializeRequestManager
implements RequestManager {
for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo>
changelogTopic : subtopologyDataFromStreams.stateChangelogTopics.entrySet()) {
final StreamsGroupInitializeRequestData.TopicInfo
changelogTopicInfo = new StreamsGroupInitializeRequestData.TopicInfo();
changelogTopicInfo.setName(changelogTopic.getKey());
+
changelogTopic.getValue().replicationFactor.ifPresent(changelogTopicInfo::setReplicationFactor);
changelogTopicsInfo.add(changelogTopicInfo);
}
+
changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupInitializeRequestData.TopicInfo::name));
return changelogTopicsInfo;
}
-
+
private void onResponse(final ClientResponse response, final Throwable
exception) {
if (exception != null) {
// todo: handle error
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 227ac495c2d..71d74eee831 100644
---
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -52,14 +52,14 @@
"about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
- { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
- "about": "The regular expressions identifying topics the
topology reads from." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
"about": "The repartition topics the topology writes to." },
+ { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+ "about": "Regular expressions identifying topics the subtopology
reads from." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo",
"versions": "0+",
"about": "The set of state changelog topics associated with this
subtopology. Created automatically." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
- "about": "The set of source topics that are internally created
repartition topics. Created automatically." }
+ "about": "The set of source topics that are internally created
repartition topics." }
]
},
{ "name": "Members", "type": "[]Member", "versions": "0+",
@@ -143,7 +143,9 @@
"about": "The name of the topic." },
{ "name": "Partitions", "type": "int32", "versions": "0+",
"about": "The number of partitions in the topic. Can be 0 if no
specific number of partitions is enforced. Always 0 for changelog topics." },
- { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
"nullableVersions": "0+", "default": "null",
+ { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
+ "about": "The replication factor of the topic. Can be 0 if the default
replication factor should be used." },
+ { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
"about": "Topic-level configurations as key-value pairs."
}
]}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
index 05c4709092c..aa68fde6bf5 100644
---
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
+++
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
@@ -32,14 +32,25 @@
"about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
- { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The regular expressions identifying topics the subtopology
reads from." },
+ { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+ "about": "Regular expressions identifying topics the subtopology
reads from." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions":
"0+",
"about": "The set of state changelog topics associated with this
subtopology. Created automatically." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
"about": "The repartition topics the subtopology writes to." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
- "about": "The set of source topics that are internally created
repartition topics. Created automatically." }
+ "about": "The set of source topics that are internally created
repartition topics. Created automatically." },
+ { "name": "CopartitionGroups", "type": "[]CopartitionGroup",
"versions": "0+",
+ "about": "A subset of source topics that must be copartitioned.",
+ "fields": [
+ { "name": "SourceTopics", "type": "[]int16", "versions": "0+",
+ "about": "The topics the topology reads from. Index into the
array on the subtopology level." },
+ { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+",
+ "about": "Regular expressions identifying topics the subtopology
reads from. Index into the array on the subtopology level." },
+ { "name": "RepartitionSourceTopics", "type": "[]int16",
"versions": "0+",
+ "about": "The set of source topics that are internally created
repartition topics. Index into the array on the subtopology level." }
+ ]
+ }
]
}
],
@@ -56,7 +67,9 @@
"about": "The name of the topic." },
{ "name": "Partitions", "type": "int32", "versions": "0+",
"about": "The number of partitions in the topic. Can be 0 if no
specific number of partitions is enforced. Always 0 for changelog topics." },
- { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+",
"nullableVersions": "0+", "default": "null",
+ { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
+ "about": "The replication factor of the topic. Can be 0 if the default
replication factor should be used." },
+ { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+",
"about": "Topic-level configurations as key-value pairs."
}
]}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index f4e5d814a57..91b9b5ae45a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -107,12 +107,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
private final StreamsAssignmentInterface.HostInfo endPoint = new
StreamsAssignmentInterface.HostInfo("localhost", 8080);
- private final String assignor = "test";
-
private final Map<String, Subtopology> subtopologyMap = new HashMap<>();
- private final Map<String, Object> assignmentConfiguration = new
HashMap<>();
-
private final Map<String, String> clientTags = new HashMap<>();
private final Node coordinatorNode = new Node(1, "localhost", 9092);
@@ -122,15 +118,12 @@ class StreamsGroupHeartbeatRequestManagerTest {
config = config();
subtopologyMap.clear();
- assignmentConfiguration.clear();
clientTags.clear();
streamsAssignmentInterface =
new StreamsAssignmentInterface(
processID,
Optional.of(endPoint),
- assignor,
subtopologyMap,
- assignmentConfiguration,
clientTags
);
LogContext logContext = new LogContext("test");
@@ -205,7 +198,6 @@ class StreamsGroupHeartbeatRequestManagerTest {
@Test
void testFullStaticInformationWhenJoining() {
mockJoiningState();
- assignmentConfiguration.put("config1", "value1");
clientTags.put("clientTag1", "value2");
NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
@@ -245,7 +237,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
final Uuid uuid0 = Uuid.randomUuid();
final Uuid uuid1 = Uuid.randomUuid();
- final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(),
Collections.emptyMap());
+ final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(),
Optional.empty(), Collections.emptyMap());
when(metadata.topicIds()).thenReturn(
mkMap(
@@ -258,21 +250,24 @@ class StreamsGroupHeartbeatRequestManagerTest {
Collections.singleton("source0"),
Collections.singleton("sink0"),
Collections.singletonMap("repartition0", emptyTopicInfo),
- Collections.singletonMap("changelog0", emptyTopicInfo)
+ Collections.singletonMap("changelog0", emptyTopicInfo),
+ Collections.singletonList(mkSet("source0", "repartition0"))
));
streamsAssignmentInterface.subtopologyMap().put("1",
new Subtopology(
Collections.singleton("source1"),
Collections.singleton("sink1"),
Collections.singletonMap("repartition1", emptyTopicInfo),
- Collections.singletonMap("changelog1", emptyTopicInfo)
+ Collections.singletonMap("changelog1", emptyTopicInfo),
+ Collections.singletonList(mkSet("source1", "repartition1"))
));
streamsAssignmentInterface.subtopologyMap().put("2",
new Subtopology(
Collections.singleton("source2"),
Collections.singleton("sink2"),
Collections.singletonMap("repartition2", emptyTopicInfo),
- Collections.singletonMap("changelog2", emptyTopicInfo)
+ Collections.singletonMap("changelog2", emptyTopicInfo),
+ Collections.singletonList(mkSet("source2", "repartition2"))
));
StreamsGroupHeartbeatResponseData data = new
StreamsGroupHeartbeatResponseData()
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
index 11c92389f6a..f5bc091e7b0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
@@ -26,7 +26,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -73,19 +74,24 @@ class StreamsGroupInitializeRequestManagerTest {
final Set<String> sourceTopics = mkSet("sourceTopic1", "sourceTopic2");
final Set<String> sinkTopics = mkSet("sinkTopic1", "sinkTopic2",
"sinkTopic3");
final Map<String, StreamsAssignmentInterface.TopicInfo>
repartitionTopics = mkMap(
- mkEntry("repartitionTopic1", new
StreamsAssignmentInterface.TopicInfo(Optional.of(2), Collections.emptyMap())),
- mkEntry("repartitionTopic2", new
StreamsAssignmentInterface.TopicInfo(Optional.of(3), Collections.emptyMap()))
+ mkEntry("repartitionTopic1", new
StreamsAssignmentInterface.TopicInfo(Optional.of(2), Optional.of((short) 1),
Collections.emptyMap())),
+ mkEntry("repartitionTopic2", new
StreamsAssignmentInterface.TopicInfo(Optional.of(3), Optional.of((short) 3),
Collections.emptyMap()))
);
final Map<String, StreamsAssignmentInterface.TopicInfo>
changelogTopics = mkMap(
- mkEntry("changelogTopic1", new
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())),
- mkEntry("changelogTopic2", new
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())),
- mkEntry("changelogTopic3", new
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap()))
+ mkEntry("changelogTopic1", new
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 1),
Collections.emptyMap())),
+ mkEntry("changelogTopic2", new
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 2),
Collections.emptyMap())),
+ mkEntry("changelogTopic3", new
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 3),
Collections.emptyMap()))
+ );
+ final Collection<Set<String>> copartitionGroup = mkSet(
+ mkSet("sourceTopic1", "repartitionTopic2"),
+ mkSet("sourceTopic2", "repartitionTopic1")
);
final StreamsAssignmentInterface.Subtopology subtopology1 = new
StreamsAssignmentInterface.Subtopology(
sourceTopics,
sinkTopics,
repartitionTopics,
- changelogTopics
+ changelogTopics,
+ copartitionGroup
);
final String subtopologyName1 = "subtopology1";
when(streamsAssignmentInterface.subtopologyMap()).thenReturn(
@@ -116,17 +122,28 @@ class StreamsGroupInitializeRequestManagerTest {
assertEquals(1, subtopologies.size());
final StreamsGroupInitializeRequestData.Subtopology subtopology =
subtopologies.get(0);
assertEquals(subtopologyName1, subtopology.subtopologyId());
- assertEquals(new ArrayList<>(sourceTopics),
subtopology.sourceTopics());
- assertEquals(new ArrayList<>(sinkTopics),
subtopology.repartitionSinkTopics());
+ assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"),
subtopology.sourceTopics());
+ assertEquals(Arrays.asList("sinkTopic1", "sinkTopic2", "sinkTopic3"),
subtopology.repartitionSinkTopics());
assertEquals(repartitionTopics.size(),
subtopology.repartitionSourceTopics().size());
subtopology.repartitionSourceTopics().forEach(topicInfo -> {
final StreamsAssignmentInterface.TopicInfo repartitionTopic =
repartitionTopics.get(topicInfo.name());
assertEquals(repartitionTopic.numPartitions.get(),
topicInfo.partitions());
+ assertEquals(repartitionTopic.replicationFactor.get(),
topicInfo.replicationFactor());
});
assertEquals(changelogTopics.size(),
subtopology.stateChangelogTopics().size());
subtopology.stateChangelogTopics().forEach(topicInfo -> {
assertTrue(changelogTopics.containsKey(topicInfo.name()));
assertEquals(0, topicInfo.partitions());
+ final StreamsAssignmentInterface.TopicInfo changelogTopic =
changelogTopics.get(topicInfo.name());
+ assertEquals(changelogTopic.replicationFactor.get(),
topicInfo.replicationFactor());
});
+
+ assertEquals(2, subtopology.copartitionGroups().size());
+ final StreamsGroupInitializeRequestData.CopartitionGroup
copartitionGroupData1 = subtopology.copartitionGroups().get(0);
+ assertEquals(Collections.singletonList((short) 0),
copartitionGroupData1.sourceTopics());
+ assertEquals(Collections.singletonList((short) 1),
copartitionGroupData1.repartitionSourceTopics());
+ final StreamsGroupInitializeRequestData.CopartitionGroup
copartitionGroupData2 = subtopology.copartitionGroups().get(1);
+ assertEquals(Collections.singletonList((short) 1),
copartitionGroupData2.sourceTopics());
+ assertEquals(Collections.singletonList((short) 0),
copartitionGroupData2.repartitionSourceTopics());
}
}
\ No newline at end of file
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index bc1caaee815..905146f2631 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -205,6 +205,7 @@ import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
+import static
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.convertToStreamsGroupTopologyRecord;
import static
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
import static
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
import static
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord;
@@ -2597,6 +2598,8 @@ public class GroupMetadataManager {
}
}
+ StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(subtopologies);
+
cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId);
if (!missingTopics.isEmpty()) {
@@ -2607,9 +2610,12 @@ public class GroupMetadataManager {
return new CoordinatorResult<>(records, response);
} else {
- records.add(newStreamsGroupTopologyRecord(groupId, subtopologies));
+ records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
+
+ final Map<String, StreamsGroupTopologyValue.Subtopology>
subtopologyMap = recordValue.topology().stream()
+
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId,
x -> x));
- final StreamsTopology topology = new StreamsTopology(topologyId,
subtopologies);
+ final StreamsTopology topology = new StreamsTopology(topologyId,
subtopologyMap);
computeFirstTargetAssignmentAfterTopologyInitialization(group,
records, topology);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
index 7a7ee7f2ae2..532955f87aa 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
@@ -381,28 +381,17 @@ public class CoordinatorStreamsRecordHelpers {
*/
public static CoordinatorRecord newStreamsGroupTopologyRecord(String
groupId,
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) {
- StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
- subtopologies.forEach(subtopology -> {
- List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics
= subtopology.repartitionSourceTopics().stream()
- .map(topicInfo -> {
- List<StreamsGroupTopologyValue.TopicConfig> topicConfigs =
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
- .map(config -> new
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
- .collect(Collectors.toList()) : null;
- return new
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs)
- .setPartitions(topicInfo.partitions());
- }).collect(Collectors.toList());
-
- List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics =
subtopology.stateChangelogTopics().stream().map(topicInfo -> {
- List<StreamsGroupTopologyValue.TopicConfig> topicConfigs =
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
- .map(config -> new
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
- .collect(Collectors.toList()) : null;
- return new
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
- }).collect(Collectors.toList());
+ return newStreamsGroupTopologyRecord(groupId,
convertToStreamsGroupTopologyRecord(subtopologies));
+ }
- value.topology().add(new
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
-
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
-
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
- });
+ /**
+ * Creates a StreamsTopology record.
+ *
+ * @param groupId The consumer group id.
+ * @param value The encoded topology record value.
+ * @return The record.
+ */
+ public static CoordinatorRecord newStreamsGroupTopologyRecord(String
groupId, StreamsGroupTopologyValue value) {
return new CoordinatorRecord(new ApiMessageAndVersion(
new StreamsGroupTopologyKey()
@@ -411,6 +400,59 @@ public class CoordinatorStreamsRecordHelpers {
new ApiMessageAndVersion(value, (short) 0));
}
+ /**
+ * Encodes subtopologies from the initialize RPC to a StreamsTopology
record value.
+ *
+ * @param subtopologies The subtopologies in the new topology.
+ * @return The record value.
+ */
+ public static StreamsGroupTopologyValue
convertToStreamsGroupTopologyRecord(List<StreamsGroupInitializeRequestData.Subtopology>
subtopologies) {
+ StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
+ subtopologies.forEach(subtopology -> {
+ List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics =
+ subtopology.repartitionSourceTopics().stream()
+ .map(CoordinatorStreamsRecordHelpers::convertToTopicInfo)
+ .collect(Collectors.toList());
+
+ List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics =
+ subtopology.stateChangelogTopics().stream()
+ .map(CoordinatorStreamsRecordHelpers::convertToTopicInfo)
+ .collect(Collectors.toList());
+
+ List<StreamsGroupTopologyValue.CopartitionGroup> copartitionGroups
=
+ subtopology.copartitionGroups().stream()
+ .map(copartitionGroup -> new
StreamsGroupTopologyValue.CopartitionGroup()
+ .setSourceTopics(copartitionGroup.sourceTopics())
+
.setSourceTopicRegex(copartitionGroup.sourceTopicRegex())
+
.setRepartitionSourceTopics(copartitionGroup.repartitionSourceTopics())
+ )
+ .collect(Collectors.toList());
+
+ value.topology().add(
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId(subtopology.subtopologyId())
+ .setSourceTopics(subtopology.sourceTopics())
+ .setSourceTopicRegex(subtopology.sourceTopicRegex())
+
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
+ .setRepartitionSourceTopics(repartitionSourceTopics)
+ .setStateChangelogTopics(stateChangelogTopics)
+ .setCopartitionGroups(copartitionGroups)
+ );
+ });
+ return value;
+ }
+
+ private static StreamsGroupTopologyValue.TopicInfo
convertToTopicInfo(StreamsGroupInitializeRequestData.TopicInfo topicInfo) {
+ List<StreamsGroupTopologyValue.TopicConfig> topicConfigs =
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
+ .map(config -> new
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+ .collect(Collectors.toList()) : null;
+ return new StreamsGroupTopologyValue.TopicInfo()
+ .setName(topicInfo.name())
+ .setTopicConfigs(topicConfigs)
+ .setPartitions(topicInfo.partitions())
+ .setReplicationFactor(topicInfo.replicationFactor());
+ }
+
/**
* Creates a StreamsGroupTopology tombstone.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index 27f138be05f..408f0bfcbb4 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -17,12 +17,10 @@
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
-import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -45,35 +43,6 @@ public class StreamsTopology {
this.subtopologies = subtopologies;
}
- public StreamsTopology(final String topologyId,
- final
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) {
- this.topologyId = topologyId;
- this.subtopologies = new HashMap<>();
- subtopologies.forEach(subtopology -> {
- List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics
= subtopology.repartitionSourceTopics().stream()
- .map(topicInfo -> {
- List<StreamsGroupTopologyValue.TopicConfig> topicConfigs =
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
- .map(config -> new
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
- .collect(Collectors.toList()) : null;
- return new
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs)
- .setPartitions(topicInfo.partitions());
- }).collect(Collectors.toList());
-
- List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics =
subtopology.stateChangelogTopics().stream().map(topicInfo -> {
- List<StreamsGroupTopologyValue.TopicConfig> topicConfigs =
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
- .map(config -> new
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
- .collect(Collectors.toList()) : null;
- return new
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
- }).collect(Collectors.toList());
-
- this.subtopologies.put(
- subtopology.subtopologyId(),
- new
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
-
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
-
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
- });
- }
-
public String topologyId() {
return topologyId;
}
@@ -141,6 +110,7 @@ public class StreamsTopology {
new StreamsGroupDescribeResponseData.TopicInfo()
.setName(x.name())
.setPartitions(x.partitions())
+ .setReplicationFactor(x.replicationFactor())
.setTopicConfigs(
x.topicConfigs() != null ?
x.topicConfigs().stream().map(
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
index af1fae06aad..d89ed3ed393 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
@@ -29,14 +29,25 @@
"about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
- { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The regular expressions identifying topics the topology
reads from. null if not provided." },
+ { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+ "about": "Regular expressions identifying topics the sub-topology
reads from." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions":
"0+",
- "about": "The set of state changelog topics associated with this
subtopology. " },
+ "about": "The set of state changelog topics associated with this
sub-topology." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
"about": "The repartition topics the subtopology writes to." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
- "about": "The set of source topics that are internally created
repartition topics. " }
+ "about": "The set of source topics that are internally created
repartition topics." },
+ { "name": "CopartitionGroups", "type": "[]CopartitionGroup",
"versions": "0+",
+ "about": "A subset of source topics that must be copartitioned.",
+ "fields": [
+ { "name": "SourceTopics", "type": "[]int16", "versions": "0+",
+ "about": "The topics the topology reads from. Index into the
array on the subtopology level." },
+ { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+",
+ "about": "Regular expressions identifying topics the subtopology
reads from. Index into the array on the subtopology level." },
+ { "name": "RepartitionSourceTopics", "type": "[]int16",
"versions": "0+",
+ "about": "The set of source topics that are internally created
repartition topics. Index into the array on the subtopology level." }
+ ]
+ }
]
}
],
@@ -53,7 +64,9 @@
"about": "The name of the topic." },
{ "name": "Partitions", "type": "int32", "versions": "0+",
"about": "The number of partitions in the topic. Can be 0 if no
specific number of partitions is enforced. Always 0 for changelog topics." },
- { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+",
"nullableVersions": "0+", "default": "null",
+ { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
+ "about": "The replication factor of the topic. Can be 0 if the default
replication factor should be used." },
+ { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+",
"about": "Topic-level configurations as key-value pairs."
}
]}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
index 7ddbc832d80..65ad8d43fa8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -38,11 +39,13 @@ class CoordinatorStreamsRecordHelpersTest {
.setSubtopologyId("subtopology-id")
.setRepartitionSinkTopics(Collections.singletonList("foo"))
.setSourceTopics(Collections.singletonList("bar"))
+ .setSourceTopicRegex(Collections.singletonList("regex"))
.setRepartitionSourceTopics(
Collections.singletonList(
new StreamsGroupInitializeRequestData.TopicInfo()
.setName("repartition")
.setPartitions(4)
+ .setReplicationFactor((short) 3)
.setTopicConfigs(Collections.singletonList(
new
StreamsGroupInitializeRequestData.TopicConfig()
.setKey("config-name1")
@@ -54,6 +57,7 @@ class CoordinatorStreamsRecordHelpersTest {
Collections.singletonList(
new StreamsGroupInitializeRequestData.TopicInfo()
.setName("changelog")
+ .setReplicationFactor((short) 2)
.setTopicConfigs(Collections.singletonList(
new
StreamsGroupInitializeRequestData.TopicConfig()
.setKey("config-name2")
@@ -61,6 +65,13 @@ class CoordinatorStreamsRecordHelpersTest {
))
)
)
+ .setCopartitionGroups(Arrays.asList(
+ new StreamsGroupInitializeRequestData.CopartitionGroup()
+ .setSourceTopics(Collections.singletonList((short) 0))
+
.setRepartitionSourceTopics(Collections.singletonList((short) 0)),
+ new StreamsGroupInitializeRequestData.CopartitionGroup()
+ .setSourceTopicRegex(Collections.singletonList((short)
0))
+ ))
);
List<StreamsGroupTopologyValue.Subtopology> expectedTopology =
@@ -68,11 +79,13 @@ class CoordinatorStreamsRecordHelpersTest {
.setSubtopologyId("subtopology-id")
.setRepartitionSinkTopics(Collections.singletonList("foo"))
.setSourceTopics(Collections.singletonList("bar"))
+ .setSourceTopicRegex(Collections.singletonList("regex"))
.setRepartitionSourceTopics(
Collections.singletonList(
new StreamsGroupTopologyValue.TopicInfo()
.setName("repartition")
.setPartitions(4)
+ .setReplicationFactor((short) 3)
.setTopicConfigs(Collections.singletonList(
new StreamsGroupTopologyValue.TopicConfig()
.setKey("config-name1")
@@ -84,6 +97,7 @@ class CoordinatorStreamsRecordHelpersTest {
Collections.singletonList(
new StreamsGroupTopologyValue.TopicInfo()
.setName("changelog")
+ .setReplicationFactor((short) 2)
.setTopicConfigs(Collections.singletonList(
new StreamsGroupTopologyValue.TopicConfig()
.setKey("config-name2")
@@ -91,6 +105,13 @@ class CoordinatorStreamsRecordHelpersTest {
))
)
)
+ .setCopartitionGroups(Arrays.asList(
+ new StreamsGroupTopologyValue.CopartitionGroup()
+ .setSourceTopics(Collections.singletonList((short) 0))
+
.setRepartitionSourceTopics(Collections.singletonList((short) 0)),
+ new StreamsGroupTopologyValue.CopartitionGroup()
+ .setSourceTopicRegex(Collections.singletonList((short)
0))
+ ))
);
CoordinatorRecord expectedRecord = new CoordinatorRecord(
@@ -108,4 +129,5 @@ class CoordinatorStreamsRecordHelpersTest {
topology
));
}
+
}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
index e83abcbfe7c..7aa303603ab 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -145,40 +145,60 @@ public class StreamsTopologyTest {
public void
asStreamsGroupDescribeTopologyShouldReturnCorrectSubtopologies() {
Map<String, Subtopology> subtopologies = mkMap(
mkEntry("subtopology-1", new Subtopology()
- .setSourceTopicRegex("regex-1")
+ .setSourceTopicRegex(Collections.singletonList("regex-1"))
.setSubtopologyId("subtopology-1")
.setSourceTopics(Collections.singletonList("source-topic-1"))
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-1"))
.setRepartitionSourceTopics(
- Collections.singletonList(new
TopicInfo().setName("repartition-topic-1")))
+ Collections.singletonList(new TopicInfo()
+ .setName("repartition-topic-1")
+ .setReplicationFactor((short) 3)
+ .setPartitions(2)))
.setStateChangelogTopics(
- Collections.singletonList(new
TopicInfo().setName("changelog-topic-1")))
+ Collections.singletonList(new TopicInfo()
+ .setName("changelog-topic-1")
+ .setReplicationFactor((short) 2)
+ .setPartitions(1)))
),
mkEntry("subtopology-2", new Subtopology()
- .setSourceTopicRegex("regex-2")
+ .setSourceTopicRegex(Collections.singletonList("regex-2"))
.setSubtopologyId("subtopology-2")
.setSourceTopics(Collections.singletonList("source-topic-2"))
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-2"))
.setRepartitionSourceTopics(
- Collections.singletonList(new
TopicInfo().setName("repartition-topic-2")))
+ Collections.singletonList(new TopicInfo()
+ .setName("repartition-topic-2")
+ .setReplicationFactor((short) 3)
+ .setPartitions(2)))
.setStateChangelogTopics(
- Collections.singletonList(new
TopicInfo().setName("changelog-topic-2")))
+ Collections.singletonList(new TopicInfo()
+ .setName("changelog-topic-2")
+ .setReplicationFactor((short) 2)
+ .setPartitions(1)))
)
);
StreamsTopology topology = new StreamsTopology("topology-id",
subtopologies);
List<StreamsGroupDescribeResponseData.Subtopology> result =
topology.asStreamsGroupDescribeTopology();
assertEquals(2, result.size());
- assertEquals("regex-1", result.get(0).sourceTopicRegex());
+ assertEquals(Collections.singletonList("regex-1"),
result.get(0).sourceTopicRegex());
assertEquals("subtopology-1", result.get(0).subtopologyId());
assertEquals(Collections.singletonList("source-topic-1"),
result.get(0).sourceTopics());
assertEquals(Collections.singletonList("sink-topic-1"),
result.get(0).repartitionSinkTopics());
assertEquals("repartition-topic-1",
result.get(0).repartitionSourceTopics().get(0).name());
+ assertEquals((short) 3,
result.get(0).repartitionSourceTopics().get(0).replicationFactor());
+ assertEquals(2,
result.get(0).repartitionSourceTopics().get(0).partitions());
assertEquals("changelog-topic-1",
result.get(0).stateChangelogTopics().get(0).name());
- assertEquals("regex-2", result.get(1).sourceTopicRegex());
+ assertEquals((short) 2,
result.get(0).stateChangelogTopics().get(0).replicationFactor());
+ assertEquals(1,
result.get(0).stateChangelogTopics().get(0).partitions());
+ assertEquals(Collections.singletonList("regex-2"),
result.get(1).sourceTopicRegex());
assertEquals("subtopology-2", result.get(1).subtopologyId());
assertEquals(Collections.singletonList("source-topic-2"),
result.get(1).sourceTopics());
assertEquals(Collections.singletonList("sink-topic-2"),
result.get(1).repartitionSinkTopics());
assertEquals("repartition-topic-2",
result.get(1).repartitionSourceTopics().get(0).name());
+ assertEquals((short) 3,
result.get(1).repartitionSourceTopics().get(0).replicationFactor());
+ assertEquals(2,
result.get(1).repartitionSourceTopics().get(0).partitions());
assertEquals("changelog-topic-2",
result.get(1).stateChangelogTopics().get(0).name());
+ assertEquals((short) 2,
result.get(1).stateChangelogTopics().get(0).replicationFactor());
+ assertEquals(1,
result.get(1).stateChangelogTopics().get(0).partitions());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d2561bde898..62dbc7d052d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -72,6 +72,7 @@ import org.slf4j.Logger;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -548,9 +549,38 @@ public class StreamThread extends Thread implements
ProcessingThread {
final
TopologyMetadata topologyMetadata) {
final InternalTopologyBuilder internalTopologyBuilder =
topologyMetadata.lookupBuilderForNamedTopology(null);
+ final Map<String, Subtopology> subtopologyMap =
initBrokerTopology(config, internalTopologyBuilder);
+
+ return new StreamsAssignmentInterface(
+ processId,
+ endpoint,
+ subtopologyMap,
+ config.getClientTags()
+ );
+ }
+
+ private static Map<String, Subtopology> initBrokerTopology(final
StreamsConfig config, final InternalTopologyBuilder internalTopologyBuilder) {
+ final Map<String, String> defaultTopicConfigs = new HashMap<>();
+ for (final Map.Entry<String, Object> entry :
config.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) {
+ if (entry.getValue() != null) {
+ defaultTopicConfigs.put(entry.getKey(),
entry.getValue().toString());
+ }
+ }
+ final long windowChangeLogAdditionalRetention =
config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
+
final Map<String, Subtopology> subtopologyMap = new HashMap<>();
+ final Collection<Set<String>> copartitionGroups =
internalTopologyBuilder.copartitionGroups();
+
for (final Map.Entry<TopologyMetadata.Subtopology, TopicsInfo>
topicsInfoEntry : internalTopologyBuilder.subtopologyToTopicsInfo()
.entrySet()) {
+
+ final HashSet<String> allSourceTopics = new HashSet<>(
+ topicsInfoEntry.getValue().sourceTopics);
+ topicsInfoEntry.getValue().repartitionSourceTopics.forEach(
+ (repartitionSourceTopic, repartitionTopicInfo) -> {
+ allSourceTopics.add(repartitionSourceTopic);
+ });
+
subtopologyMap.put(
String.valueOf(topicsInfoEntry.getKey().nodeGroupId),
new Subtopology(
@@ -559,44 +589,28 @@ public class StreamThread extends Thread implements
ProcessingThread {
topicsInfoEntry.getValue().repartitionSourceTopics.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
- new
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(),
e.getValue().topicConfigs))),
+ new
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(),
+
Optional.of(config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()),
+ e.getValue().properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention)))),
topicsInfoEntry.getValue().stateChangelogTopics.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
- new
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(),
e.getValue().topicConfigs)))
-
+ new
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(),
+
Optional.of(config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()),
+ e.getValue().properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention)))),
+
copartitionGroups.stream().filter(allSourceTopics::containsAll).collect(
+ Collectors.toList())
)
);
}
- // TODO: Which of these are actually needed?
- // TODO: Maybe we want to split this into assignment properties and
internal topic configuration properties
- final HashMap<String, Object> assignmentProperties = new HashMap<>();
- assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG));
- assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG));
- assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
- assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
- config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG));
- assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG,
config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG));
-
assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
-
config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
-
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
-
config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
-
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
-
config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
-
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
- config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
-
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
-
config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+ if (subtopologyMap.values().stream().mapToInt(x ->
x.copartitionGroups.size()).sum()
+ != copartitionGroups.size()) {
+ throw new IllegalStateException(
+ "Not all copartition groups were converted to broker
topology");
+ }
- return new StreamsAssignmentInterface(
- processId,
- endpoint,
- null,
- subtopologyMap,
- assignmentProperties,
- config.getClientTags()
- );
+ return subtopologyMap;
}
private static DefaultTaskManager maybeCreateSchedulingTaskManager(final
boolean processingThreadsEnabled,