This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b18625b86ff9f7d688089313441f7f7c10a1fc6d
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Oct 8 17:30:48 2024 +0200

    Complete topic metadata for automatic topic creation (#17391)
    
    * KSTREAMS-6456: Complete topic metadata for automatic topic creation
    
    This change updates the current RPCs and schemas to add the following
    metadata:
    
     - copartition groups are added to topology record and initialize RPC
     - multiple regexs are added to topology record and initialize RPC
     - replication factors are added for reach internal topic
    
    We also add code to fill this information correctly from the
    `InternalTopologyBuilder` object.
    
    The fields `assignmentConfiguration` and `assignor` in the
    `StreamsAssignmentInterface` are removed, because they are not
    needed anymore.
    
    * fixes
---
 .../internals/StreamsAssignmentInterface.java      | 31 +++-----
 .../StreamsGroupInitializeRequestManager.java      | 71 ++++++++++++++++--
 .../message/StreamsGroupDescribeResponse.json      | 10 +--
 .../message/StreamsGroupInitializeRequest.json     | 21 ++++--
 .../StreamsGroupHeartbeatRequestManagerTest.java   | 19 ++---
 .../StreamsGroupInitializeRequestManagerTest.java  | 35 ++++++---
 .../coordinator/group/GroupMetadataManager.java    | 10 ++-
 .../streams/CoordinatorStreamsRecordHelpers.java   | 84 ++++++++++++++++------
 .../coordinator/group/streams/StreamsTopology.java | 32 +--------
 .../common/message/StreamsGroupTopologyValue.json  | 23 ++++--
 .../CoordinatorStreamsRecordHelpersTest.java       | 22 ++++++
 .../group/streams/StreamsTopologyTest.java         | 36 +++++++---
 .../streams/processor/internals/StreamThread.java  | 74 +++++++++++--------
 13 files changed, 316 insertions(+), 152 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
index 1e23233f4e6..cefe9175b68 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -41,12 +42,8 @@ public class StreamsAssignmentInterface {
 
     private Optional<HostInfo> endpoint;
 
-    private String assignor;
-
     private Map<String, Subtopology> subtopologyMap;
 
-    private Map<String, Object> assignmentConfiguration;
-
     private Map<TaskId, Long> taskLags;
 
     private AtomicBoolean shutdownRequested;
@@ -66,18 +63,10 @@ public class StreamsAssignmentInterface {
         return endpoint;
     }
 
-    public String assignor() {
-        return assignor;
-    }
-
     public Map<String, Subtopology> subtopologyMap() {
         return subtopologyMap;
     }
 
-    public Map<String, Object> assignmentConfiguration() {
-        return assignmentConfiguration;
-    }
-
     // TODO: This needs to be used somewhere
     public Map<TaskId, Long> taskLags() {
         return taskLags;
@@ -190,11 +179,14 @@ public class StreamsAssignmentInterface {
     public static class TopicInfo {
 
         public final Optional<Integer> numPartitions;
+        public final Optional<Short> replicationFactor;
         public final Map<String, String> topicConfigs;
 
         public TopicInfo(final Optional<Integer> numPartitions,
+                         final Optional<Short> replicationFactor,
                          final Map<String, String> topicConfigs) {
             this.numPartitions = numPartitions;
+            this.replicationFactor = replicationFactor;
             this.topicConfigs = topicConfigs;
         }
 
@@ -202,10 +194,10 @@ public class StreamsAssignmentInterface {
         public String toString() {
             return "TopicInfo{" +
                 "numPartitions=" + numPartitions +
+                ", replicationFactor=" + replicationFactor +
                 ", topicConfigs=" + topicConfigs +
                 '}';
         }
-
     }
 
     public static class TaskId {
@@ -241,15 +233,19 @@ public class StreamsAssignmentInterface {
         public final Set<String> sinkTopics;
         public final Map<String, TopicInfo> stateChangelogTopics;
         public final Map<String, TopicInfo> repartitionSourceTopics;
+        public final Collection<Set<String>> copartitionGroups;
 
         public Subtopology(final Set<String> sourceTopics,
                            final Set<String> sinkTopics,
                            final Map<String, TopicInfo> 
repartitionSourceTopics,
-                           final Map<String, TopicInfo> stateChangelogTopics) {
+                           final Map<String, TopicInfo> stateChangelogTopics,
+                           final Collection<Set<String>> copartitionGroups
+        ) {
             this.sourceTopics = sourceTopics;
             this.sinkTopics = sinkTopics;
             this.stateChangelogTopics = stateChangelogTopics;
             this.repartitionSourceTopics = repartitionSourceTopics;
+            this.copartitionGroups = copartitionGroups;
         }
 
         @Override
@@ -259,22 +255,19 @@ public class StreamsAssignmentInterface {
                 ", sinkTopics=" + sinkTopics +
                 ", stateChangelogTopics=" + stateChangelogTopics +
                 ", repartitionSourceTopics=" + repartitionSourceTopics +
+                ", copartitionGroups=" + copartitionGroups +
                 '}';
         }
     }
 
     public StreamsAssignmentInterface(UUID processId,
                                       Optional<HostInfo> endpoint,
-                                      String assignor,
                                       Map<String, Subtopology> subtopologyMap,
-                                      Map<String, Object> 
assignmentConfiguration,
                                       Map<String, String> clientTags
     ) {
         this.processId = processId;
         this.endpoint = endpoint;
-        this.assignor = assignor;
         this.subtopologyMap = subtopologyMap;
-        this.assignmentConfiguration = assignmentConfiguration;
         this.taskLags = new HashMap<>();
         this.shutdownRequested = new AtomicBoolean(false);
         this.clientTags = clientTags;
@@ -285,9 +278,7 @@ public class StreamsAssignmentInterface {
         return "StreamsAssignmentMetadata{" +
             "processID=" + processId +
             ", endpoint='" + endpoint + '\'' +
-            ", assignor='" + assignor + '\'' +
             ", subtopologyMap=" + subtopologyMap +
-            ", assignmentConfiguration=" + assignmentConfiguration +
             ", taskLags=" + taskLags +
             ", clientTags=" + clientTags +
             '}';
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
index b81beab78d9..8c7671efe67 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupInitializeRequestData.CopartitionGroup;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.StreamsGroupInitializeRequest;
 import org.apache.kafka.common.requests.StreamsGroupInitializeResponse;
@@ -27,9 +28,15 @@ import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class StreamsGroupInitializeRequestManager implements RequestManager {
 
@@ -72,9 +79,8 @@ public class StreamsGroupInitializeRequestManager implements 
RequestManager {
         
streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId());
         final List<StreamsGroupInitializeRequestData.Subtopology> topology = 
getTopologyFromStreams();
         streamsGroupInitializeRequestData.setTopology(topology);
-        final StreamsGroupInitializeRequest.Builder 
streamsGroupInitializeRequestBuilder = new 
StreamsGroupInitializeRequest.Builder(
-            streamsGroupInitializeRequestData
-        );
+        final StreamsGroupInitializeRequest.Builder 
streamsGroupInitializeRequestBuilder =
+            new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequestData);
         return new NetworkClientDelegate.UnsentRequest(
             streamsGroupInitializeRequestBuilder,
             coordinatorRequestManager.coordinator()
@@ -94,21 +100,72 @@ public class StreamsGroupInitializeRequestManager 
implements RequestManager {
                                                                                
            final StreamsAssignmentInterface.Subtopology subtopology) {
         final StreamsGroupInitializeRequestData.Subtopology subtopologyData = 
new StreamsGroupInitializeRequestData.Subtopology();
         subtopologyData.setSubtopologyId(subtopologyName);
-        subtopologyData.setSourceTopics(new 
ArrayList<>(subtopology.sourceTopics));
-        subtopologyData.setRepartitionSinkTopics(new 
ArrayList<>(subtopology.sinkTopics));
+        ArrayList<String> sortedSourceTopics = new 
ArrayList<>(subtopology.sourceTopics);
+        Collections.sort(sortedSourceTopics);
+        subtopologyData.setSourceTopics(sortedSourceTopics);
+        // TODO: We should only encode the repartition sink topics here.
+        ArrayList<String> sortedSinkTopics = new 
ArrayList<>(subtopology.sinkTopics);
+        Collections.sort(sortedSinkTopics);
+        subtopologyData.setRepartitionSinkTopics(sortedSinkTopics);
         
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
         
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
+        subtopologyData.setCopartitionGroups(
+            getCopartitionGroupsFromStreams(subtopology.copartitionGroups, 
subtopologyData));
         return subtopologyData;
     }
 
+    private static List<CopartitionGroup> getCopartitionGroupsFromStreams(
+        final Collection<Set<String>> copartitionGroups,
+        final StreamsGroupInitializeRequestData.Subtopology subtopologyData) {
+
+        final Map<String, Short> sourceTopicsMap =
+            IntStream.range(0, subtopologyData.sourceTopics().size())
+                .boxed()
+                .collect(Collectors.toMap(subtopologyData.sourceTopics()::get, 
Integer::shortValue));
+
+        final Map<String, Short> repartitionSourceTopics =
+            IntStream.range(0, 
subtopologyData.repartitionSourceTopics().size())
+                .boxed()
+                .collect(
+                    Collectors.toMap(x -> 
subtopologyData.repartitionSourceTopics().get(x).name(),
+                        Integer::shortValue));
+
+        return copartitionGroups.stream()
+            .map(x -> getCopartitionGroupFromStreams(x, sourceTopicsMap, 
repartitionSourceTopics))
+            .collect(Collectors.toList());
+    }
+
+    private static CopartitionGroup getCopartitionGroupFromStreams(
+        final Set<String> topicNames,
+        final Map<String, Short> sourceTopicsMap,
+        final Map<String, Short> repartitionSourceTopics) {
+        CopartitionGroup copartitionGroup = new CopartitionGroup();
+
+        topicNames.forEach(topicName -> {
+            if (sourceTopicsMap.containsKey(topicName)) {
+                
copartitionGroup.sourceTopics().add(sourceTopicsMap.get(topicName));
+            } else if (repartitionSourceTopics.containsKey(topicName)) {
+                copartitionGroup.repartitionSourceTopics()
+                    .add(repartitionSourceTopics.get(topicName));
+            } else {
+                throw new IllegalStateException(
+                    "Source topic not found in subtopology: " + topicName);
+            }
+        });
+
+        return copartitionGroup;
+    }
+
     private static List<StreamsGroupInitializeRequestData.TopicInfo> 
getRepartitionTopicsInfoFromStreams(final 
StreamsAssignmentInterface.Subtopology subtopologyDataFromStreams) {
         final List<StreamsGroupInitializeRequestData.TopicInfo> 
repartitionTopicsInfo = new ArrayList<>();
         for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> 
repartitionTopic : 
subtopologyDataFromStreams.repartitionSourceTopics.entrySet()) {
             final StreamsGroupInitializeRequestData.TopicInfo 
repartitionTopicInfo = new StreamsGroupInitializeRequestData.TopicInfo();
             repartitionTopicInfo.setName(repartitionTopic.getKey());
             
repartitionTopic.getValue().numPartitions.ifPresent(repartitionTopicInfo::setPartitions);
+            
repartitionTopic.getValue().replicationFactor.ifPresent(repartitionTopicInfo::setReplicationFactor);
             repartitionTopicsInfo.add(repartitionTopicInfo);
         }
+        
repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupInitializeRequestData.TopicInfo::name));
         return repartitionTopicsInfo;
     }
 
@@ -117,11 +174,13 @@ public class StreamsGroupInitializeRequestManager 
implements RequestManager {
         for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> 
changelogTopic : subtopologyDataFromStreams.stateChangelogTopics.entrySet()) {
             final StreamsGroupInitializeRequestData.TopicInfo 
changelogTopicInfo = new StreamsGroupInitializeRequestData.TopicInfo();
             changelogTopicInfo.setName(changelogTopic.getKey());
+            
changelogTopic.getValue().replicationFactor.ifPresent(changelogTopicInfo::setReplicationFactor);
             changelogTopicsInfo.add(changelogTopicInfo);
         }
+        
changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupInitializeRequestData.TopicInfo::name));
         return changelogTopicsInfo;
     }
-    
+
     private void onResponse(final ClientResponse response, final Throwable 
exception) {
         if (exception != null) {
             // todo: handle error
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 227ac495c2d..71d74eee831 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -52,14 +52,14 @@
               "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
             { "name": "SourceTopics", "type": "[]string", "versions": "0+",
               "about": "The topics the topology reads from." },
-            { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
-              "about": "The regular expressions identifying topics the 
topology reads from." },
             { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
               "about": "The repartition topics the topology writes to." },
+            { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+              "about": "Regular expressions identifying topics the subtopology 
reads from." },
             { "name": "StateChangelogTopics", "type": "[]TopicInfo", 
"versions": "0+",
               "about": "The set of state changelog topics associated with this 
subtopology. Created automatically." },
             { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
-              "about": "The set of source topics that are internally created 
repartition topics. Created automatically." }
+              "about": "The set of source topics that are internally created 
repartition topics." }
           ]
         },
         { "name": "Members", "type": "[]Member", "versions": "0+",
@@ -143,7 +143,9 @@
         "about": "The name of the topic." },
       { "name": "Partitions", "type": "int32", "versions": "0+",
         "about": "The number of partitions in the topic. Can be 0 if no 
specific number of partitions is enforced. Always 0 for changelog topics." },
-      { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
+        "about": "The replication factor of the topic. Can be 0 if the default 
replication factor should be used." },
+      { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
         "about": "Topic-level configurations as key-value pairs."
       }
     ]}
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
index 05c4709092c..aa68fde6bf5 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
@@ -32,14 +32,25 @@
           "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
         { "name": "SourceTopics", "type": "[]string", "versions": "0+",
           "about": "The topics the topology reads from." },
-        { "name": "SourceTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-          "about": "The regular expressions identifying topics the subtopology 
reads from." },
+        { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+          "about": "Regular expressions identifying topics the subtopology 
reads from." },
         { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": 
"0+",
           "about": "The set of state changelog topics associated with this 
subtopology. Created automatically." },
         { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
           "about": "The repartition topics the subtopology writes to." },
         { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
-          "about": "The set of source topics that are internally created 
repartition topics. Created automatically." }
+          "about": "The set of source topics that are internally created 
repartition topics. Created automatically." },
+        { "name": "CopartitionGroups", "type": "[]CopartitionGroup", 
"versions": "0+",
+          "about": "A subset of source topics that must be copartitioned.",
+          "fields": [
+            { "name": "SourceTopics", "type": "[]int16", "versions": "0+",
+              "about": "The topics the topology reads from. Index into the 
array on the subtopology level." },
+            { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+",
+              "about": "Regular expressions identifying topics the subtopology 
reads from. Index into the array on the subtopology level." },
+            { "name": "RepartitionSourceTopics", "type": "[]int16", 
"versions": "0+",
+              "about": "The set of source topics that are internally created 
repartition topics. Index into the array on the subtopology level." }
+          ]
+        }
       ]
     }
   ],
@@ -56,7 +67,9 @@
         "about": "The name of the topic." },
       { "name": "Partitions", "type": "int32", "versions": "0+",
         "about": "The number of partitions in the topic. Can be 0 if no 
specific number of partitions is enforced. Always 0 for changelog topics." },
-      { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
+        "about": "The replication factor of the topic. Can be 0 if the default 
replication factor should be used." },
+      { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+",
         "about": "Topic-level configurations as key-value pairs."
       }
     ]}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index f4e5d814a57..91b9b5ae45a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -107,12 +107,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
 
     private final StreamsAssignmentInterface.HostInfo endPoint = new 
StreamsAssignmentInterface.HostInfo("localhost", 8080);
 
-    private final String assignor = "test";
-
     private final Map<String, Subtopology> subtopologyMap = new HashMap<>();
 
-    private final Map<String, Object> assignmentConfiguration = new 
HashMap<>();
-
     private final Map<String, String> clientTags = new HashMap<>();
 
     private final Node coordinatorNode = new Node(1, "localhost", 9092);
@@ -122,15 +118,12 @@ class StreamsGroupHeartbeatRequestManagerTest {
         config = config();
 
         subtopologyMap.clear();
-        assignmentConfiguration.clear();
         clientTags.clear();
         streamsAssignmentInterface =
             new StreamsAssignmentInterface(
                 processID,
                 Optional.of(endPoint),
-                assignor,
                 subtopologyMap,
-                assignmentConfiguration,
                 clientTags
             );
         LogContext logContext = new LogContext("test");
@@ -205,7 +198,6 @@ class StreamsGroupHeartbeatRequestManagerTest {
     @Test
     void testFullStaticInformationWhenJoining() {
         mockJoiningState();
-        assignmentConfiguration.put("config1", "value1");
         clientTags.put("clientTag1", "value2");
 
         NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
@@ -245,7 +237,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
         final Uuid uuid0 = Uuid.randomUuid();
         final Uuid uuid1 = Uuid.randomUuid();
 
-        final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(), 
Collections.emptyMap());
+        final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(), 
Optional.empty(), Collections.emptyMap());
 
         when(metadata.topicIds()).thenReturn(
             mkMap(
@@ -258,21 +250,24 @@ class StreamsGroupHeartbeatRequestManagerTest {
                 Collections.singleton("source0"),
                 Collections.singleton("sink0"),
                 Collections.singletonMap("repartition0", emptyTopicInfo),
-                Collections.singletonMap("changelog0", emptyTopicInfo)
+                Collections.singletonMap("changelog0", emptyTopicInfo),
+                Collections.singletonList(mkSet("source0", "repartition0"))
             ));
         streamsAssignmentInterface.subtopologyMap().put("1",
             new Subtopology(
                 Collections.singleton("source1"),
                 Collections.singleton("sink1"),
                 Collections.singletonMap("repartition1", emptyTopicInfo),
-                Collections.singletonMap("changelog1", emptyTopicInfo)
+                Collections.singletonMap("changelog1", emptyTopicInfo),
+                Collections.singletonList(mkSet("source1", "repartition1"))
             ));
         streamsAssignmentInterface.subtopologyMap().put("2",
             new Subtopology(
                 Collections.singleton("source2"),
                 Collections.singleton("sink2"),
                 Collections.singletonMap("repartition2", emptyTopicInfo),
-                Collections.singletonMap("changelog2", emptyTopicInfo)
+                Collections.singletonMap("changelog2", emptyTopicInfo),
+                Collections.singletonList(mkSet("source2", "repartition2"))
             ));
 
         StreamsGroupHeartbeatResponseData data = new 
StreamsGroupHeartbeatResponseData()
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
index 11c92389f6a..f5bc091e7b0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
@@ -26,7 +26,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -73,19 +74,24 @@ class StreamsGroupInitializeRequestManagerTest {
         final Set<String> sourceTopics = mkSet("sourceTopic1", "sourceTopic2");
         final Set<String> sinkTopics = mkSet("sinkTopic1", "sinkTopic2", 
"sinkTopic3");
         final Map<String, StreamsAssignmentInterface.TopicInfo> 
repartitionTopics = mkMap(
-            mkEntry("repartitionTopic1", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(2), Collections.emptyMap())),
-            mkEntry("repartitionTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(3), Collections.emptyMap()))
+            mkEntry("repartitionTopic1", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(2), Optional.of((short) 1), 
Collections.emptyMap())),
+            mkEntry("repartitionTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(3), Optional.of((short) 3), 
Collections.emptyMap()))
         );
         final Map<String, StreamsAssignmentInterface.TopicInfo> 
changelogTopics = mkMap(
-            mkEntry("changelogTopic1", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())),
-            mkEntry("changelogTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())),
-            mkEntry("changelogTopic3", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap()))
+            mkEntry("changelogTopic1", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 1), 
Collections.emptyMap())),
+            mkEntry("changelogTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 2), 
Collections.emptyMap())),
+            mkEntry("changelogTopic3", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Optional.of((short) 3), 
Collections.emptyMap()))
+        );
+        final Collection<Set<String>> copartitionGroup = mkSet(
+            mkSet("sourceTopic1", "repartitionTopic2"),
+            mkSet("sourceTopic2", "repartitionTopic1")
         );
         final StreamsAssignmentInterface.Subtopology subtopology1 = new 
StreamsAssignmentInterface.Subtopology(
             sourceTopics,
             sinkTopics,
             repartitionTopics,
-            changelogTopics
+            changelogTopics,
+            copartitionGroup
         );
         final String subtopologyName1 = "subtopology1";
         when(streamsAssignmentInterface.subtopologyMap()).thenReturn(
@@ -116,17 +122,28 @@ class StreamsGroupInitializeRequestManagerTest {
         assertEquals(1, subtopologies.size());
         final StreamsGroupInitializeRequestData.Subtopology subtopology = 
subtopologies.get(0);
         assertEquals(subtopologyName1, subtopology.subtopologyId());
-        assertEquals(new ArrayList<>(sourceTopics), 
subtopology.sourceTopics());
-        assertEquals(new ArrayList<>(sinkTopics), 
subtopology.repartitionSinkTopics());
+        assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), 
subtopology.sourceTopics());
+        assertEquals(Arrays.asList("sinkTopic1", "sinkTopic2", "sinkTopic3"), 
subtopology.repartitionSinkTopics());
         assertEquals(repartitionTopics.size(), 
subtopology.repartitionSourceTopics().size());
         subtopology.repartitionSourceTopics().forEach(topicInfo -> {
             final StreamsAssignmentInterface.TopicInfo repartitionTopic = 
repartitionTopics.get(topicInfo.name());
             assertEquals(repartitionTopic.numPartitions.get(), 
topicInfo.partitions());
+            assertEquals(repartitionTopic.replicationFactor.get(), 
topicInfo.replicationFactor());
         });
         assertEquals(changelogTopics.size(), 
subtopology.stateChangelogTopics().size());
         subtopology.stateChangelogTopics().forEach(topicInfo -> {
             assertTrue(changelogTopics.containsKey(topicInfo.name()));
             assertEquals(0, topicInfo.partitions());
+            final StreamsAssignmentInterface.TopicInfo changelogTopic = 
changelogTopics.get(topicInfo.name());
+            assertEquals(changelogTopic.replicationFactor.get(), 
topicInfo.replicationFactor());
         });
+
+        assertEquals(2, subtopology.copartitionGroups().size());
+        final StreamsGroupInitializeRequestData.CopartitionGroup 
copartitionGroupData1 = subtopology.copartitionGroups().get(0);
+        assertEquals(Collections.singletonList((short) 0), 
copartitionGroupData1.sourceTopics());
+        assertEquals(Collections.singletonList((short) 1), 
copartitionGroupData1.repartitionSourceTopics());
+        final StreamsGroupInitializeRequestData.CopartitionGroup 
copartitionGroupData2 = subtopology.copartitionGroups().get(1);
+        assertEquals(Collections.singletonList((short) 1), 
copartitionGroupData2.sourceTopics());
+        assertEquals(Collections.singletonList((short) 0), 
copartitionGroupData2.repartitionSourceTopics());
     }
 }
\ No newline at end of file
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a8f742f0d26..f613d11c8f3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -216,6 +216,7 @@ import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
+import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.convertToStreamsGroupTopologyRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord;
@@ -2696,6 +2697,8 @@ public class GroupMetadataManager {
             }
         }
 
+        StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(subtopologies);
+
         cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId);
 
         if (!missingTopics.isEmpty()) {
@@ -2706,9 +2709,12 @@ public class GroupMetadataManager {
 
             return new CoordinatorResult<>(records, response);
         } else {
-            records.add(newStreamsGroupTopologyRecord(groupId, subtopologies));
+            records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
+
+            final Map<String, StreamsGroupTopologyValue.Subtopology> 
subtopologyMap = recordValue.topology().stream()
+                
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, 
x -> x));
 
-            final StreamsTopology topology = new StreamsTopology(topologyId, 
subtopologies);
+            final StreamsTopology topology = new StreamsTopology(topologyId, 
subtopologyMap);
 
             computeFirstTargetAssignmentAfterTopologyInitialization(group, 
records, topology);
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
index 7a7ee7f2ae2..532955f87aa 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
@@ -381,28 +381,17 @@ public class CoordinatorStreamsRecordHelpers {
      */
     public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId,
                                                                   
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) {
-        StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
-        subtopologies.forEach(subtopology -> {
-            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics 
= subtopology.repartitionSourceTopics().stream()
-                .map(topicInfo -> {
-                    List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
 topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
-                        .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
-                        .collect(Collectors.toList()) : null;
-                    return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs)
-                        .setPartitions(topicInfo.partitions());
-                }).collect(Collectors.toList());
-
-            List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = 
subtopology.stateChangelogTopics().stream().map(topicInfo -> {
-                List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
-                    .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
-                    .collect(Collectors.toList()) : null;
-                return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
-            }).collect(Collectors.toList());
+        return newStreamsGroupTopologyRecord(groupId, 
convertToStreamsGroupTopologyRecord(subtopologies));
+    }
 
-            value.topology().add(new 
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
-                
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
-                
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
-        });
+    /**
+     * Creates a StreamsTopology record.
+     *
+     * @param groupId       The consumer group id.
+     * @param value         The encoded topology record value.
+     * @return The record.
+     */
+    public static CoordinatorRecord newStreamsGroupTopologyRecord(String 
groupId, StreamsGroupTopologyValue value) {
 
         return new CoordinatorRecord(new ApiMessageAndVersion(
             new StreamsGroupTopologyKey()
@@ -411,6 +400,59 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(value, (short) 0));
     }
 
+    /**
+     * Encodes subtopologies from the initialize RPC to a StreamsTopology 
record value.
+     *
+     * @param subtopologies The subtopologies in the new topology.
+     * @return The record value.
+     */
+    public static StreamsGroupTopologyValue 
convertToStreamsGroupTopologyRecord(List<StreamsGroupInitializeRequestData.Subtopology>
 subtopologies) {
+        StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
+        subtopologies.forEach(subtopology -> {
+            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics =
+                subtopology.repartitionSourceTopics().stream()
+                    .map(CoordinatorStreamsRecordHelpers::convertToTopicInfo)
+                    .collect(Collectors.toList());
+
+            List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics =
+                subtopology.stateChangelogTopics().stream()
+                    .map(CoordinatorStreamsRecordHelpers::convertToTopicInfo)
+                    .collect(Collectors.toList());
+
+            List<StreamsGroupTopologyValue.CopartitionGroup> copartitionGroups 
=
+                subtopology.copartitionGroups().stream()
+                    .map(copartitionGroup -> new 
StreamsGroupTopologyValue.CopartitionGroup()
+                        .setSourceTopics(copartitionGroup.sourceTopics())
+                        
.setSourceTopicRegex(copartitionGroup.sourceTopicRegex())
+                        
.setRepartitionSourceTopics(copartitionGroup.repartitionSourceTopics())
+                    )
+                    .collect(Collectors.toList());
+
+            value.topology().add(
+                new StreamsGroupTopologyValue.Subtopology()
+                    .setSubtopologyId(subtopology.subtopologyId())
+                    .setSourceTopics(subtopology.sourceTopics())
+                    .setSourceTopicRegex(subtopology.sourceTopicRegex())
+                    
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
+                    .setRepartitionSourceTopics(repartitionSourceTopics)
+                    .setStateChangelogTopics(stateChangelogTopics)
+                    .setCopartitionGroups(copartitionGroups)
+            );
+        });
+        return value;
+    }
+
+    private static StreamsGroupTopologyValue.TopicInfo 
convertToTopicInfo(StreamsGroupInitializeRequestData.TopicInfo topicInfo) {
+        List<StreamsGroupTopologyValue.TopicConfig> topicConfigs =  
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
+            .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+            .collect(Collectors.toList()) : null;
+        return new StreamsGroupTopologyValue.TopicInfo()
+            .setName(topicInfo.name())
+            .setTopicConfigs(topicConfigs)
+            .setPartitions(topicInfo.partitions())
+            .setReplicationFactor(topicInfo.replicationFactor());
+    }
+
     /**
      * Creates a StreamsGroupTopology tombstone.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index 27f138be05f..408f0bfcbb4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -17,12 +17,10 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
-import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -45,35 +43,6 @@ public class StreamsTopology {
         this.subtopologies = subtopologies;
     }
 
-    public StreamsTopology(final String topologyId,
-                           final 
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) {
-        this.topologyId = topologyId;
-        this.subtopologies = new HashMap<>();
-        subtopologies.forEach(subtopology -> {
-            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics 
= subtopology.repartitionSourceTopics().stream()
-                .map(topicInfo -> {
-                    List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
 topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
-                        .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
-                        .collect(Collectors.toList()) : null;
-                    return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs)
-                        .setPartitions(topicInfo.partitions());
-                }).collect(Collectors.toList());
-
-            List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = 
subtopology.stateChangelogTopics().stream().map(topicInfo -> {
-                List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
-                    .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
-                    .collect(Collectors.toList()) : null;
-                return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
-            }).collect(Collectors.toList());
-
-            this.subtopologies.put(
-                subtopology.subtopologyId(),
-                new 
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
-                    
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
-                    
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
-        });
-    }
-
     public String topologyId() {
         return topologyId;
     }
@@ -141,6 +110,7 @@ public class StreamsTopology {
             new StreamsGroupDescribeResponseData.TopicInfo()
                 .setName(x.name())
                 .setPartitions(x.partitions())
+                .setReplicationFactor(x.replicationFactor())
                 .setTopicConfigs(
                     x.topicConfigs() != null ?
                         x.topicConfigs().stream().map(
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
index af1fae06aad..d89ed3ed393 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
@@ -29,14 +29,25 @@
           "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
         { "name": "SourceTopics", "type": "[]string", "versions": "0+",
           "about": "The topics the topology reads from." },
-        { "name": "SourceTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-          "about": "The regular expressions identifying topics the topology 
reads from. null if not provided." },
+        { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+          "about": "Regular expressions identifying topics the sub-topology 
reads from." },
         { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": 
"0+",
-          "about": "The set of state changelog topics associated with this 
subtopology. " },
+          "about": "The set of state changelog topics associated with this 
sub-topology." },
         { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
           "about": "The repartition topics the subtopology writes to." },
         { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
-          "about": "The set of source topics that are internally created 
repartition topics. " }
+          "about": "The set of source topics that are internally created 
repartition topics." },
+        { "name": "CopartitionGroups", "type": "[]CopartitionGroup", 
"versions": "0+",
+          "about": "A subset of source topics that must be copartitioned.",
+          "fields": [
+            { "name": "SourceTopics", "type": "[]int16", "versions": "0+",
+              "about": "The topics the topology reads from. Index into the 
array on the subtopology level." },
+            { "name": "SourceTopicRegex", "type": "[]int16", "versions": "0+",
+              "about": "Regular expressions identifying topics the subtopology 
reads from. Index into the array on the subtopology level." },
+            { "name": "RepartitionSourceTopics", "type": "[]int16", 
"versions": "0+",
+              "about": "The set of source topics that are internally created 
repartition topics. Index into the array on the subtopology level." }
+          ]
+        }
       ]
     }
   ],
@@ -53,7 +64,9 @@
         "about": "The name of the topic." },
       { "name": "Partitions", "type": "int32", "versions": "0+",
         "about": "The number of partitions in the topic. Can be 0 if no 
specific number of partitions is enforced. Always 0 for changelog topics." },
-      { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
+        "about": "The replication factor of the topic. Can be 0 if the default 
replication factor should be used." },
+      { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+",
         "about": "Topic-level configurations as key-value pairs."
       }
     ]}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
index 7ddbc832d80..65ad8d43fa8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -38,11 +39,13 @@ class CoordinatorStreamsRecordHelpersTest {
                 .setSubtopologyId("subtopology-id")
                 .setRepartitionSinkTopics(Collections.singletonList("foo"))
                 .setSourceTopics(Collections.singletonList("bar"))
+                .setSourceTopicRegex(Collections.singletonList("regex"))
                 .setRepartitionSourceTopics(
                     Collections.singletonList(
                         new StreamsGroupInitializeRequestData.TopicInfo()
                             .setName("repartition")
                             .setPartitions(4)
+                            .setReplicationFactor((short) 3)
                             .setTopicConfigs(Collections.singletonList(
                                 new 
StreamsGroupInitializeRequestData.TopicConfig()
                                     .setKey("config-name1")
@@ -54,6 +57,7 @@ class CoordinatorStreamsRecordHelpersTest {
                     Collections.singletonList(
                         new StreamsGroupInitializeRequestData.TopicInfo()
                             .setName("changelog")
+                            .setReplicationFactor((short) 2)
                             .setTopicConfigs(Collections.singletonList(
                                 new 
StreamsGroupInitializeRequestData.TopicConfig()
                                     .setKey("config-name2")
@@ -61,6 +65,13 @@ class CoordinatorStreamsRecordHelpersTest {
                             ))
                     )
                 )
+                .setCopartitionGroups(Arrays.asList(
+                    new StreamsGroupInitializeRequestData.CopartitionGroup()
+                        .setSourceTopics(Collections.singletonList((short) 0))
+                        
.setRepartitionSourceTopics(Collections.singletonList((short) 0)),
+                    new StreamsGroupInitializeRequestData.CopartitionGroup()
+                        .setSourceTopicRegex(Collections.singletonList((short) 
0))
+                ))
             );
 
         List<StreamsGroupTopologyValue.Subtopology> expectedTopology =
@@ -68,11 +79,13 @@ class CoordinatorStreamsRecordHelpersTest {
                 .setSubtopologyId("subtopology-id")
                 .setRepartitionSinkTopics(Collections.singletonList("foo"))
                 .setSourceTopics(Collections.singletonList("bar"))
+                .setSourceTopicRegex(Collections.singletonList("regex"))
                 .setRepartitionSourceTopics(
                     Collections.singletonList(
                         new StreamsGroupTopologyValue.TopicInfo()
                             .setName("repartition")
                             .setPartitions(4)
+                            .setReplicationFactor((short) 3)
                             .setTopicConfigs(Collections.singletonList(
                                 new StreamsGroupTopologyValue.TopicConfig()
                                     .setKey("config-name1")
@@ -84,6 +97,7 @@ class CoordinatorStreamsRecordHelpersTest {
                     Collections.singletonList(
                         new StreamsGroupTopologyValue.TopicInfo()
                             .setName("changelog")
+                            .setReplicationFactor((short) 2)
                             .setTopicConfigs(Collections.singletonList(
                                 new StreamsGroupTopologyValue.TopicConfig()
                                     .setKey("config-name2")
@@ -91,6 +105,13 @@ class CoordinatorStreamsRecordHelpersTest {
                             ))
                     )
                 )
+                .setCopartitionGroups(Arrays.asList(
+                    new StreamsGroupTopologyValue.CopartitionGroup()
+                        .setSourceTopics(Collections.singletonList((short) 0))
+                        
.setRepartitionSourceTopics(Collections.singletonList((short) 0)),
+                    new StreamsGroupTopologyValue.CopartitionGroup()
+                        .setSourceTopicRegex(Collections.singletonList((short) 
0))
+                ))
             );
 
         CoordinatorRecord expectedRecord = new CoordinatorRecord(
@@ -108,4 +129,5 @@ class CoordinatorStreamsRecordHelpersTest {
             topology
         ));
     }
+
 }
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
index e83abcbfe7c..7aa303603ab 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -145,40 +145,60 @@ public class StreamsTopologyTest {
     public void 
asStreamsGroupDescribeTopologyShouldReturnCorrectSubtopologies() {
         Map<String, Subtopology> subtopologies = mkMap(
             mkEntry("subtopology-1", new Subtopology()
-                .setSourceTopicRegex("regex-1")
+                .setSourceTopicRegex(Collections.singletonList("regex-1"))
                 .setSubtopologyId("subtopology-1")
                 .setSourceTopics(Collections.singletonList("source-topic-1"))
                 
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-1"))
                 .setRepartitionSourceTopics(
-                    Collections.singletonList(new 
TopicInfo().setName("repartition-topic-1")))
+                    Collections.singletonList(new TopicInfo()
+                        .setName("repartition-topic-1")
+                        .setReplicationFactor((short) 3)
+                        .setPartitions(2)))
                 .setStateChangelogTopics(
-                    Collections.singletonList(new 
TopicInfo().setName("changelog-topic-1")))
+                    Collections.singletonList(new TopicInfo()
+                        .setName("changelog-topic-1")
+                        .setReplicationFactor((short) 2)
+                        .setPartitions(1)))
             ),
             mkEntry("subtopology-2", new Subtopology()
-                .setSourceTopicRegex("regex-2")
+                .setSourceTopicRegex(Collections.singletonList("regex-2"))
                 .setSubtopologyId("subtopology-2")
                 .setSourceTopics(Collections.singletonList("source-topic-2"))
                 
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-2"))
                 .setRepartitionSourceTopics(
-                    Collections.singletonList(new 
TopicInfo().setName("repartition-topic-2")))
+                    Collections.singletonList(new TopicInfo()
+                        .setName("repartition-topic-2")
+                        .setReplicationFactor((short) 3)
+                        .setPartitions(2)))
                 .setStateChangelogTopics(
-                    Collections.singletonList(new 
TopicInfo().setName("changelog-topic-2")))
+                    Collections.singletonList(new TopicInfo()
+                        .setName("changelog-topic-2")
+                        .setReplicationFactor((short) 2)
+                        .setPartitions(1)))
             )
         );
         StreamsTopology topology = new StreamsTopology("topology-id", 
subtopologies);
         List<StreamsGroupDescribeResponseData.Subtopology> result = 
topology.asStreamsGroupDescribeTopology();
         assertEquals(2, result.size());
-        assertEquals("regex-1", result.get(0).sourceTopicRegex());
+        assertEquals(Collections.singletonList("regex-1"), 
result.get(0).sourceTopicRegex());
         assertEquals("subtopology-1", result.get(0).subtopologyId());
         assertEquals(Collections.singletonList("source-topic-1"), 
result.get(0).sourceTopics());
         assertEquals(Collections.singletonList("sink-topic-1"), 
result.get(0).repartitionSinkTopics());
         assertEquals("repartition-topic-1", 
result.get(0).repartitionSourceTopics().get(0).name());
+        assertEquals((short) 3, 
result.get(0).repartitionSourceTopics().get(0).replicationFactor());
+        assertEquals(2, 
result.get(0).repartitionSourceTopics().get(0).partitions());
         assertEquals("changelog-topic-1", 
result.get(0).stateChangelogTopics().get(0).name());
-        assertEquals("regex-2", result.get(1).sourceTopicRegex());
+        assertEquals((short) 2, 
result.get(0).stateChangelogTopics().get(0).replicationFactor());
+        assertEquals(1, 
result.get(0).stateChangelogTopics().get(0).partitions());
+        assertEquals(Collections.singletonList("regex-2"), 
result.get(1).sourceTopicRegex());
         assertEquals("subtopology-2", result.get(1).subtopologyId());
         assertEquals(Collections.singletonList("source-topic-2"), 
result.get(1).sourceTopics());
         assertEquals(Collections.singletonList("sink-topic-2"), 
result.get(1).repartitionSinkTopics());
         assertEquals("repartition-topic-2", 
result.get(1).repartitionSourceTopics().get(0).name());
+        assertEquals((short) 3, 
result.get(1).repartitionSourceTopics().get(0).replicationFactor());
+        assertEquals(2, 
result.get(1).repartitionSourceTopics().get(0).partitions());
         assertEquals("changelog-topic-2", 
result.get(1).stateChangelogTopics().get(0).name());
+        assertEquals((short) 2, 
result.get(1).stateChangelogTopics().get(0).replicationFactor());
+        assertEquals(1, 
result.get(1).stateChangelogTopics().get(0).partitions());
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0b69a099663..27e709490be 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -73,6 +73,7 @@ import org.slf4j.Logger;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -558,9 +559,38 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                                                                       final 
TopologyMetadata topologyMetadata) {
         final InternalTopologyBuilder internalTopologyBuilder = 
topologyMetadata.lookupBuilderForNamedTopology(null);
 
+        final Map<String, Subtopology> subtopologyMap = 
initBrokerTopology(config, internalTopologyBuilder);
+
+        return new StreamsAssignmentInterface(
+            processId,
+            endpoint,
+            subtopologyMap,
+            config.getClientTags()
+        );
+    }
+
+    private static Map<String, Subtopology> initBrokerTopology(final 
StreamsConfig config, final InternalTopologyBuilder internalTopologyBuilder) {
+        final Map<String, String> defaultTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Object> entry : 
config.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) {
+            if (entry.getValue() != null) {
+                defaultTopicConfigs.put(entry.getKey(), 
entry.getValue().toString());
+            }
+        }
+        final long windowChangeLogAdditionalRetention = 
config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
+
         final Map<String, Subtopology> subtopologyMap = new HashMap<>();
+        final Collection<Set<String>> copartitionGroups = 
internalTopologyBuilder.copartitionGroups();
+
         for (final Map.Entry<TopologyMetadata.Subtopology, TopicsInfo> 
topicsInfoEntry : internalTopologyBuilder.subtopologyToTopicsInfo()
             .entrySet()) {
+
+            final HashSet<String> allSourceTopics = new HashSet<>(
+                topicsInfoEntry.getValue().sourceTopics);
+            topicsInfoEntry.getValue().repartitionSourceTopics.forEach(
+                (repartitionSourceTopic, repartitionTopicInfo) -> {
+                    allSourceTopics.add(repartitionSourceTopic);
+                });
+
             subtopologyMap.put(
                 String.valueOf(topicsInfoEntry.getKey().nodeGroupId),
                 new Subtopology(
@@ -569,44 +599,28 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                     
topicsInfoEntry.getValue().repartitionSourceTopics.entrySet()
                         .stream()
                         .collect(Collectors.toMap(Map.Entry::getKey, e ->
-                            new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), 
e.getValue().topicConfigs))),
+                            new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(),
+                                
Optional.of(config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()),
+                                e.getValue().properties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention)))),
                     topicsInfoEntry.getValue().stateChangelogTopics.entrySet()
                         .stream()
                         .collect(Collectors.toMap(Map.Entry::getKey, e ->
-                            new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), 
e.getValue().topicConfigs)))
-
+                            new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(),
+                                
Optional.of(config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue()),
+                                e.getValue().properties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention)))),
+                    
copartitionGroups.stream().filter(allSourceTopics::containsAll).collect(
+                        Collectors.toList())
                 )
             );
         }
 
-        // TODO: Which of these are actually needed?
-        // TODO: Maybe we want to split this into assignment properties and 
internal topic configuration properties
-        final HashMap<String, Object> assignmentProperties = new HashMap<>();
-        assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG));
-        assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, 
config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG));
-        assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
-        assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
-            config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG));
-        assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 
config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG));
-        
assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
-            
config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
-            
config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
-            
config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
-            config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
-            
config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+        if (subtopologyMap.values().stream().mapToInt(x -> 
x.copartitionGroups.size()).sum()
+            != copartitionGroups.size()) {
+            throw new IllegalStateException(
+                "Not all copartition groups were converted to broker 
topology");
+        }
 
-        return new StreamsAssignmentInterface(
-            processId,
-            endpoint,
-            null,
-            subtopologyMap,
-            assignmentProperties,
-            config.getClientTags()
-        );
+        return subtopologyMap;
     }
 
     private static DefaultTaskManager maybeCreateSchedulingTaskManager(final 
boolean processingThreadsEnabled,

Reply via email to