This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f5f901771ce1b5d53f65d76046638898b622f968
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Oct 22 15:11:40 2024 +0200

    Resolve conflicts from trunk rebase - Port tools for topic configuration 
(#17371)
    
    * KSTREAMS-6456: Port tools for topic configuration
    
    Ports several tools for topic configuration from the client side, to the
    broker-side. Several things are refactored:
    
     - Decoupling. On the client side, for example, RepartitionTopics was
       using the CopartitionedTopicEnforcer, the InternalTopicCreator, the
       TopologyMetadata and the Cluster objects. All of those are mocked
       with Mockito during testing. This points to bad coupling. We
       refactored all classes to be mostly self-sufficient, only relying on
       themselves and simple interfaces.
     - Tests only use JUnit5, not hamcrast matchers and no other streams
       utilities, to not pollute the group coordinator module.
     - All classes only modify the configurations -- the code does not
       actually call into the AdminClient anymore.
     - We map all errors to new errors in the broker, in particular,
       the error for missing topics, inconsistent internal topics, and
       invalid topologies.
    
    We include the internal, mutable datastructures, that are set- and
    map-based for effiecient algorithms. They are distinctly different
    from the data represented in `StreamsGroupTopologyValue` and
    `StreamsGroupInitializeRequest`, since regular expressions must
    be resolved at this point.
    
    Both the topic creation and internal topic validation will be
    based on this code, the basics of this are implemented in the
    `InternalTopicManager`.
    
    Every time, either the broker-side topology or the topic metadata
    on the broker changes, we reconfigure the internal topics, check
    consistency with the current topics on the broker, and possibly
    trigger creation of the missing internal topics. These changes
    will be built on top of this change.
    
    * formatting fixes
---
 checkstyle/import-control-group-coordinator.xml    |   1 +
 ...StreamsInconsistentInternalTopicsException.java |  23 ++
 .../org/apache/kafka/common/protocol/Errors.java   |  11 +-
 .../group/streams/topics/ChangelogTopics.java      |  82 +++++++
 .../streams/topics/ConfiguredInternalTopic.java    | 138 ++++++++++++
 .../streams/topics/ConfiguredSubtopology.java      | 144 +++++++++++++
 .../topics/CopartitionedTopicsEnforcer.java        | 191 ++++++++++++++++
 .../group/streams/topics/InternalTopicManager.java | 231 ++++++++++++++++++++
 .../group/streams/topics/RepartitionTopics.java    | 180 ++++++++++++++++
 .../group/streams/topics/ChangelogTopicsTest.java  | 136 ++++++++++++
 .../topics/ConfiguredInternalTopicTest.java        | 104 +++++++++
 .../streams/topics/ConfiguredSubtopologyTest.java  | 137 ++++++++++++
 .../topics/CopartitionedTopicsEnforcerTest.java    | 230 ++++++++++++++++++++
 .../streams/topics/InternalTopicManagerTest.java   | 178 +++++++++++++++
 .../streams/topics/RepartitionTopicsTest.java      | 239 +++++++++++++++++++++
 15 files changed, 2020 insertions(+), 5 deletions(-)

diff --git a/checkstyle/import-control-group-coordinator.xml 
b/checkstyle/import-control-group-coordinator.xml
index 0619ea444d5..0f54365322c 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -63,6 +63,7 @@
             <allow pkg="org.apache.kafka.deferred" />
             <allow pkg="org.apache.kafka.image"/>
             <allow pkg="org.apache.kafka.server.common"/>
+            <allow pkg="org.apache.kafka.server.immutable"/>
             <allow pkg="org.apache.kafka.server.record"/>
             <allow pkg="org.apache.kafka.server.util"/>
             <allow pkg="org.apache.kafka.storage.internals.log"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentInternalTopicsException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentInternalTopicsException.java
new file mode 100644
index 00000000000..fbf81f14260
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentInternalTopicsException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class StreamsInconsistentInternalTopicsException extends ApiException {
+    public StreamsInconsistentInternalTopicsException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index db5194d4a2b..e8687314808 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -120,6 +120,7 @@ import 
org.apache.kafka.common.errors.SnapshotNotFoundException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.StreamsGroupUninitializedException;
+import 
org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException;
 import org.apache.kafka.common.errors.StreamsInconsistentTopologyException;
 import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
 import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
@@ -425,12 +426,12 @@ public enum Errors {
         StreamsInconsistentTopologyException::new),
     STREAMS_MISSING_SOURCE_TOPICS(132, "One or more source topics are 
missing.",
         StreamsMissingSourceTopicsException::new),
-    STREAMS_MISSING_INTERNAL_TOPICS(133, "One or more internal topics are 
missing.",
-        StreamsMissingInternalTopicsException::new),
-    STREAMS_GROUP_UNINITIALIZED(134, "The group is not (fully) initialized, 
broker-side topology information or internal topics are missing.",
+    STREAMS_GROUP_UNINITIALIZED(133, "The group is not (fully) initialized, 
broker-side topology information or internal topics are missing.",
             StreamsGroupUninitializedException::new),
-    STREAMS_SHUTDOWN_APPLICATION(135, "A client requested the shutdown of the 
whole application.",
-            StreamsShutdownApplicationException::new);
+    STREAMS_SHUTDOWN_APPLICATION(134, "A client requested the shutdown of the 
whole application.",
+            StreamsShutdownApplicationException::new),
+    STREAMS_INCONSISTENT_INTERNAL_TOPICS(135, "One or more internal topics are 
missing.",
+        StreamsInconsistentInternalTopicsException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
new file mode 100644
index 00000000000..298c8abbfa3
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.function.Function;
+
+/**
+ * This class is responsible for setting up the changelog topics for a 
topology.
+ */
+public class ChangelogTopics {
+
+    private final Map<String, ConfiguredSubtopology> subtopologies;
+    private final Function<String, Integer> topicPartitionCountProvider;
+    private final Logger log;
+
+    public ChangelogTopics(
+        final LogContext logContext,
+        final Map<String, ConfiguredSubtopology> subtopologies,
+        final Function<String, Integer> topicPartitionCountProvider
+    ) {
+        this.log = logContext.logger(getClass());
+        this.subtopologies = subtopologies;
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }
+
+    /**
+     * Modifies the provided ConfiguredSubtopology to set the number of 
partitions for each changelog topic.
+     *
+     * @return the map of changelog topics for the requested topology that are 
internal and may need to be created.
+     */
+    public Map<String, ConfiguredInternalTopic> setup() {
+        final Map<String, ConfiguredInternalTopic> changelogTopicMetadata = 
new HashMap<>();
+        for (final Map.Entry<String, ConfiguredSubtopology> entry : 
subtopologies.entrySet()) {
+            final ConfiguredSubtopology configuredSubtopology = 
entry.getValue();
+
+            final OptionalInt maxNumPartitions =
+                
configuredSubtopology.sourceTopics().stream().mapToInt(this::getPartitionCountOrFail).max();
+
+            if (!maxNumPartitions.isPresent()) {
+                throw new StreamsInvalidTopologyException("No source topics 
found for subtopology " + entry.getKey());
+            }
+            for (final ConfiguredInternalTopic topicConfig : 
configuredSubtopology.nonSourceChangelogTopics()) {
+                changelogTopicMetadata.put(topicConfig.name(), topicConfig);
+                topicConfig.setNumberOfPartitions(maxNumPartitions.getAsInt());
+            }
+        }
+
+        log.debug("Expecting state changelog topics {} for the requested 
topology.", changelogTopicMetadata.values());
+        return changelogTopicMetadata;
+    }
+
+    private int getPartitionCountOrFail(String topic) {
+        final Integer topicPartitionCount = 
topicPartitionCountProvider.apply(topic);
+        if (topicPartitionCount == null) {
+            throw new StreamsMissingSourceTopicsException("No partition count 
for source topic " + topic);
+        }
+        return topicPartitionCount;
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
new file mode 100644
index 00000000000..875b3192ea1
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * ConfiguredInternalTopic captures the properties required for configuring 
the internal topics we create for change-logs and repartitioning
+ * etc.
+ * <p>
+ * It is derived from the topology sent by the client, and the current state 
of the topics inside the broker. If the topics on the broker
+ * changes, the internal topic may need to be reconfigured.
+ */
+public class ConfiguredInternalTopic {
+
+    private final String name;
+    private final Map<String, String> topicConfigs;
+    private final Optional<Short> replicationFactor;
+    private final boolean enforceNumberOfPartitions;
+    private Optional<Integer> numberOfPartitions;
+
+    public ConfiguredInternalTopic(final String name) {
+        this(name, Collections.emptyMap(), Optional.empty(), Optional.empty());
+    }
+
+    public ConfiguredInternalTopic(final String name,
+                                   final Map<String, String> topicConfigs) {
+        this(name, topicConfigs, Optional.empty(), Optional.empty());
+    }
+
+    public ConfiguredInternalTopic(final String name,
+                                   final Map<String, String> topicConfigs,
+                                   final Optional<Integer> numberOfPartitions,
+                                   final Optional<Short> replicationFactor) {
+        this.name = Objects.requireNonNull(name, "name can't be null");
+        Topic.validate(name);
+        
numberOfPartitions.ifPresent(ConfiguredInternalTopic::validateNumberOfPartitions);
+        this.topicConfigs = Objects.requireNonNull(topicConfigs, "topicConfigs 
can't be null");
+        this.numberOfPartitions = numberOfPartitions;
+        this.replicationFactor = replicationFactor;
+        this.enforceNumberOfPartitions = numberOfPartitions.isPresent();
+    }
+
+    private static void validateNumberOfPartitions(final int 
numberOfPartitions) {
+        if (numberOfPartitions < 1) {
+            throw new IllegalArgumentException("Number of partitions must be 
at least 1.");
+        }
+    }
+
+    public Map<String, String> topicConfigs() {
+        return topicConfigs;
+    }
+
+    public boolean hasEnforcedNumberOfPartitions() {
+        return enforceNumberOfPartitions;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public Optional<Integer> numberOfPartitions() {
+        return numberOfPartitions;
+    }
+
+    public Optional<Short> replicationFactor() {
+        return replicationFactor;
+    }
+
+    public ConfiguredInternalTopic setNumberOfPartitions(final int 
numberOfPartitions) {
+        if (this.hasEnforcedNumberOfPartitions()
+            && this.numberOfPartitions.isPresent()
+            && this.numberOfPartitions.get() != numberOfPartitions) {
+            throw new UnsupportedOperationException(
+                "number of partitions are enforced on topic " + name() + " and 
can't be altered.");
+        }
+
+        validateNumberOfPartitions(numberOfPartitions);
+
+        this.numberOfPartitions = Optional.of(numberOfPartitions);
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "ConfiguredInternalTopic(" +
+            "name=" + name +
+            ", topicConfigs=" + topicConfigs +
+            ", numberOfPartitions=" + numberOfPartitions +
+            ", replicationFactor=" + replicationFactor +
+            ", enforceNumberOfPartitions=" + enforceNumberOfPartitions +
+            ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ConfiguredInternalTopic that = (ConfiguredInternalTopic) o;
+        return enforceNumberOfPartitions == that.enforceNumberOfPartitions
+            && Objects.equals(name, that.name)
+            && Objects.equals(topicConfigs, that.topicConfigs)
+            && Objects.equals(numberOfPartitions, that.numberOfPartitions)
+            && Objects.equals(replicationFactor, that.replicationFactor);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name,
+            topicConfigs,
+            numberOfPartitions,
+            replicationFactor,
+            enforceNumberOfPartitions);
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
new file mode 100644
index 00000000000..1faba6c38ba
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Internal representation of a subtopology.
+ * <p>
+ * The subtopology is configured according to the number of partitions 
available in the source topics. It has regular expressions already
+ * resolved and defined exactly the information that is being used by streams 
groups assignment reconciliation.
+ * <p>
+ * Configured subtopologies may be recreated every time the input topics used 
by the subtopology are modified.
+ */
+public class ConfiguredSubtopology {
+
+    private Set<String> repartitionSinkTopics;
+    private Set<String> sourceTopics;
+    private Map<String, ConfiguredInternalTopic> stateChangelogTopics;
+    private Map<String, ConfiguredInternalTopic> repartitionSourceTopics;
+
+    public ConfiguredSubtopology() {
+        this.repartitionSinkTopics = new HashSet<>();
+        this.sourceTopics = new HashSet<>();
+        this.stateChangelogTopics = new HashMap<>();
+        this.repartitionSourceTopics = new HashMap<>();
+    }
+
+    public ConfiguredSubtopology(
+        final Set<String> repartitionSinkTopics,
+        final Set<String> sourceTopics,
+        final Map<String, ConfiguredInternalTopic> repartitionSourceTopics,
+        final Map<String, ConfiguredInternalTopic> stateChangelogTopics
+    ) {
+        this.repartitionSinkTopics = repartitionSinkTopics;
+        this.sourceTopics = sourceTopics;
+        this.stateChangelogTopics = stateChangelogTopics;
+        this.repartitionSourceTopics = repartitionSourceTopics;
+    }
+
+    public Set<String> repartitionSinkTopics() {
+        return repartitionSinkTopics;
+    }
+
+    public Set<String> sourceTopics() {
+        return sourceTopics;
+    }
+
+    public Map<String, ConfiguredInternalTopic> stateChangelogTopics() {
+        return stateChangelogTopics;
+    }
+
+    public Map<String, ConfiguredInternalTopic> repartitionSourceTopics() {
+        return repartitionSourceTopics;
+    }
+
+    public ConfiguredSubtopology setRepartitionSinkTopics(final Set<String> 
repartitionSinkTopics) {
+        this.repartitionSinkTopics = repartitionSinkTopics;
+        return this;
+    }
+
+    public ConfiguredSubtopology setSourceTopics(final Set<String> 
sourceTopics) {
+        this.sourceTopics = sourceTopics;
+        return this;
+    }
+
+    public ConfiguredSubtopology setStateChangelogTopics(
+        final Map<String, ConfiguredInternalTopic> stateChangelogTopics
+    ) {
+        this.stateChangelogTopics = stateChangelogTopics;
+        return this;
+    }
+
+    public ConfiguredSubtopology setRepartitionSourceTopics(
+        final Map<String, ConfiguredInternalTopic> repartitionSourceTopics
+    ) {
+        this.repartitionSourceTopics = repartitionSourceTopics;
+        return this;
+    }
+
+    /**
+     * Returns the config for any changelogs that must be prepared for this 
topic group, ie excluding any source topics that are reused as a
+     * changelog
+     */
+    public Set<ConfiguredInternalTopic> nonSourceChangelogTopics() {
+        final Set<ConfiguredInternalTopic> topicConfigs = new HashSet<>();
+        for (final Map.Entry<String, ConfiguredInternalTopic> 
changelogTopicEntry : stateChangelogTopics.entrySet()) {
+            if (!sourceTopics.contains(changelogTopicEntry.getKey())) {
+                topicConfigs.add(changelogTopicEntry.getValue());
+            }
+        }
+        return topicConfigs;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final ConfiguredSubtopology that = (ConfiguredSubtopology) o;
+        return Objects.equals(repartitionSinkTopics, 
that.repartitionSinkTopics)
+            && Objects.equals(sourceTopics, that.sourceTopics)
+            && Objects.equals(stateChangelogTopics, that.stateChangelogTopics)
+            && Objects.equals(repartitionSourceTopics, 
that.repartitionSourceTopics);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(repartitionSinkTopics, sourceTopics,
+            stateChangelogTopics, repartitionSourceTopics);
+    }
+
+    @Override
+    public String toString() {
+        return "ConfiguredSubtopology{" +
+            "repartitionSinkTopics=" + repartitionSinkTopics +
+            ", sourceTopics=" + sourceTopics +
+            ", stateChangelogTopics=" + stateChangelogTopics +
+            ", repartitionSourceTopics=" + repartitionSourceTopics +
+            '}';
+    }
+
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java
new file mode 100644
index 00000000000..a74cff5d33e
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import 
org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for enforcing the number of partitions in 
copartitioned topics.
+ */
+public class CopartitionedTopicsEnforcer {
+
+    private final Logger log;
+    private final Function<String, Integer> topicPartitionCountProvider;
+
+    public CopartitionedTopicsEnforcer(final LogContext logContext,
+                                       final Function<String, Integer> 
topicPartitionCountProvider) {
+        this.log = logContext.logger(getClass());
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }
+
+    private static void maybeSetNumberOfPartitionsForInternalTopic(final int 
numPartitionsToUseForRepartitionTopics,
+                                                                   final 
ConfiguredInternalTopic config) {
+        if (!config.hasEnforcedNumberOfPartitions()) {
+            
config.setNumberOfPartitions(numPartitionsToUseForRepartitionTopics);
+        }
+    }
+
+    private static Supplier<StreamsInvalidTopologyException> 
emptyNumberOfPartitionsExceptionSupplier(final String topic) {
+        return () -> new StreamsInvalidTopologyException("Number of partitions 
is not set for topic: " + topic);
+    }
+
+    /**
+     * Enforces the number of partitions for copartitioned topics.
+     *
+     * @param copartitionedTopics the set of copartitioned topics
+     * @param repartitionTopics   a map from repartition topics to their 
internal topic configs
+     */
+    public void enforce(final Set<String> copartitionedTopics,
+                        final Map<String, ConfiguredInternalTopic> 
repartitionTopics) {
+        if (copartitionedTopics.isEmpty()) {
+            return;
+        }
+
+        final Map<Object, ConfiguredInternalTopic> repartitionTopicConfigs =
+            copartitionedTopics.stream()
+                .filter(repartitionTopics::containsKey)
+                .collect(Collectors.toMap(topic -> topic, 
repartitionTopics::get));
+
+        final Map<String, Integer> nonRepartitionTopicPartitions =
+            copartitionedTopics.stream().filter(topic -> 
!repartitionTopics.containsKey(topic))
+                .collect(Collectors.toMap(topic -> topic, topic -> {
+                    final Integer topicPartitionCount = 
topicPartitionCountProvider.apply(topic);
+                    if (topicPartitionCount == null) {
+                        final String str = String.format("Topic not found: 
%s", topic);
+                        log.error(str);
+                        throw new StreamsInvalidTopologyException(str);
+                    } else {
+                        return topicPartitionCount;
+                    }
+                }));
+
+        final int numPartitionsToUseForRepartitionTopics;
+        final Collection<ConfiguredInternalTopic> configuredInternalTopics = 
repartitionTopicConfigs.values();
+
+        if (copartitionedTopics.equals(repartitionTopicConfigs.keySet())) {
+            final Collection<ConfiguredInternalTopic> 
configuredConfiguredInternalTopicsWithEnforcedNumberOfPartitions =
+                configuredInternalTopics
+                    .stream()
+                    
.filter(ConfiguredInternalTopic::hasEnforcedNumberOfPartitions)
+                    .collect(Collectors.toList());
+
+            // if there's at least one repartition topic with enforced number 
of partitions
+            // validate that they all have same number of partitions
+            if 
(!configuredConfiguredInternalTopicsWithEnforcedNumberOfPartitions.isEmpty()) {
+                numPartitionsToUseForRepartitionTopics = 
validateAndGetNumOfPartitions(
+                    repartitionTopicConfigs,
+                    
configuredConfiguredInternalTopicsWithEnforcedNumberOfPartitions
+                );
+            } else {
+                // If all topics for this co-partition group are repartition 
topics,
+                // then set the number of partitions to be the maximum of the 
number of partitions.
+                numPartitionsToUseForRepartitionTopics = 
getMaxPartitions(repartitionTopicConfigs);
+            }
+        } else {
+            // Otherwise, use the number of partitions from external topics 
(which must all be the same)
+            numPartitionsToUseForRepartitionTopics = 
getSamePartitions(nonRepartitionTopicPartitions);
+        }
+
+        // coerce all the repartition topics to use the decided number of 
partitions.
+        for (final ConfiguredInternalTopic config : configuredInternalTopics) {
+            
maybeSetNumberOfPartitionsForInternalTopic(numPartitionsToUseForRepartitionTopics,
 config);
+
+            final int numberOfPartitionsOfInternalTopic = config
+                .numberOfPartitions()
+                
.orElseThrow(emptyNumberOfPartitionsExceptionSupplier(config.name()));
+
+            if (numberOfPartitionsOfInternalTopic != 
numPartitionsToUseForRepartitionTopics) {
+                final String msg = String.format("Number of partitions [%d] of 
repartition topic [%s] " +
+                        "doesn't match number of partitions [%d] of the source 
topic.",
+                    numberOfPartitionsOfInternalTopic,
+                    config.name(),
+                    numPartitionsToUseForRepartitionTopics);
+                throw new StreamsInconsistentInternalTopicsException(msg);
+            }
+        }
+    }
+
+    private int validateAndGetNumOfPartitions(final Map<Object, 
ConfiguredInternalTopic> repartitionTopicConfigs,
+                                              final 
Collection<ConfiguredInternalTopic> configuredInternalTopics) {
+        final ConfiguredInternalTopic firstConfiguredInternalTopic = 
configuredInternalTopics.iterator().next();
+
+        final int firstNumberOfPartitionsOfInternalTopic = 
firstConfiguredInternalTopic
+            .numberOfPartitions()
+            
.orElseThrow(emptyNumberOfPartitionsExceptionSupplier(firstConfiguredInternalTopic.name()));
+
+        for (final ConfiguredInternalTopic configuredInternalTopic : 
configuredInternalTopics) {
+            final Integer numberOfPartitions = configuredInternalTopic
+                .numberOfPartitions()
+                
.orElseThrow(emptyNumberOfPartitionsExceptionSupplier(configuredInternalTopic.name()));
+
+            if (numberOfPartitions != firstNumberOfPartitionsOfInternalTopic) {
+                final Map<Object, Integer> repartitionTopics = 
repartitionTopicConfigs
+                    .entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(Entry::getKey, entry -> 
entry.getValue().numberOfPartitions().get()));
+
+                final String msg = String.format("Following topics do not have 
the same number of partitions: [%s]",
+                    new TreeMap<>(repartitionTopics));
+                throw new StreamsInconsistentInternalTopicsException(msg);
+            }
+        }
+
+        return firstNumberOfPartitionsOfInternalTopic;
+    }
+
+    private int getSamePartitions(final Map<String, Integer> 
nonRepartitionTopicsInCopartitionGroup) {
+        final int partitions = 
nonRepartitionTopicsInCopartitionGroup.values().iterator().next();
+        for (final Entry<String, Integer> entry : 
nonRepartitionTopicsInCopartitionGroup.entrySet()) {
+            if (entry.getValue() != partitions) {
+                final TreeMap<String, Integer> sorted = new 
TreeMap<>(nonRepartitionTopicsInCopartitionGroup);
+                throw new StreamsInconsistentInternalTopicsException(
+                    String.format("Topics not co-partitioned: [%s]", sorted)
+                );
+            }
+        }
+        return partitions;
+    }
+
+    private int getMaxPartitions(final Map<Object, ConfiguredInternalTopic> 
repartitionTopicsInCopartitionGroup) {
+        int maxPartitions = 0;
+
+        for (final ConfiguredInternalTopic config : 
repartitionTopicsInCopartitionGroup.values()) {
+            final Optional<Integer> partitions = config.numberOfPartitions();
+            maxPartitions = Integer.max(maxPartitions, 
partitions.orElse(maxPartitions));
+        }
+        if (maxPartitions == 0) {
+            throw new StreamsInvalidTopologyException("All topics in the 
copartition group had undefined partition number: " +
+                repartitionTopicsInCopartitionGroup.keySet());
+        }
+        return maxPartitions;
+    }
+
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
new file mode 100644
index 00000000000..9485617261f
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class InternalTopicManager {
+
+    public static Map<String, CreatableTopic> missingTopics(Map<String, 
ConfiguredSubtopology> subtopologyMap,
+                                                            MetadataImage 
metadataImage) {
+
+        final Map<String, CreatableTopic> topicsToCreate = new HashMap<>();
+        for (ConfiguredSubtopology subtopology : subtopologyMap.values()) {
+            subtopology.repartitionSourceTopics().values()
+                .forEach(x -> topicsToCreate.put(x.name(), 
toCreatableTopic(x)));
+            subtopology.stateChangelogTopics().values()
+                .forEach(x -> topicsToCreate.put(x.name(), 
toCreatableTopic(x)));
+        }
+        // TODO: Validate if existing topics are compatible with the new topics
+        for (String topic : metadataImage.topics().topicsByName().keySet()) {
+            topicsToCreate.remove(topic);
+        }
+        return topicsToCreate;
+    }
+
+
+    public static Map<String, ConfiguredSubtopology> 
configureTopics(LogContext logContext,
+                                                                     
List<StreamsGroupTopologyValue.Subtopology> subtopologyList,
+                                                                     
MetadataImage metadataImage) {
+
+        final Logger log = logContext.logger(InternalTopicManager.class);
+
+        final Map<String, ConfiguredSubtopology> configuredSubtopologies =
+            subtopologyList.stream()
+                .collect(Collectors.toMap(
+                    StreamsGroupTopologyValue.Subtopology::subtopologyId,
+                    InternalTopicManager::fromPersistedSubtopology)
+                );
+
+        final Map<String, Collection<Set<String>>> 
copartitionGroupsBySubtopology =
+            subtopologyList.stream()
+                .collect(Collectors.toMap(
+                    StreamsGroupTopologyValue.Subtopology::subtopologyId,
+                    
InternalTopicManager::copartitionGroupsFromPersistedSubtopology)
+                );
+
+        final Map<String, ConfiguredInternalTopic> configuredInternalTopics =
+            configuredSubtopologies.values().stream().flatMap(x ->
+                Stream.concat(
+                    x.repartitionSourceTopics().values().stream(),
+                    x.stateChangelogTopics().values().stream()
+                )
+            ).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x));
+
+        final Function<String, Integer> topicPartitionCountProvider =
+            topic -> getPartitionCount(metadataImage, topic, 
configuredInternalTopics);
+
+        configureRepartitionTopics(logContext, configuredSubtopologies, 
topicPartitionCountProvider);
+        enforceCopartitioning(logContext, configuredSubtopologies, 
copartitionGroupsBySubtopology, topicPartitionCountProvider, log);
+        configureChangelogTopics(logContext, configuredSubtopologies, 
topicPartitionCountProvider);
+
+        return configuredSubtopologies;
+    }
+
+
+    private static void configureRepartitionTopics(LogContext logContext,
+                                                   Map<String, 
ConfiguredSubtopology> configuredSubtopologies,
+                                                   Function<String, Integer> 
topicPartitionCountProvider) {
+        final RepartitionTopics repartitionTopics = new 
RepartitionTopics(logContext,
+            configuredSubtopologies,
+            topicPartitionCountProvider);
+        repartitionTopics.setup();
+    }
+
+    private static void enforceCopartitioning(LogContext logContext,
+                                              Map<String, 
ConfiguredSubtopology> configuredSubtopologies,
+                                              Map<String, 
Collection<Set<String>>> copartitionGroupsBySubtopology,
+                                              Function<String, Integer> 
topicPartitionCountProvider,
+                                              Logger log) {
+        final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = new 
CopartitionedTopicsEnforcer(
+            logContext, topicPartitionCountProvider);
+
+        final Map<String, ConfiguredInternalTopic> repartitionTopicConfigs =
+            configuredSubtopologies.values().stream().flatMap(x ->
+                x.repartitionSourceTopics().values().stream()
+            ).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x));
+
+        if (repartitionTopicConfigs.isEmpty()) {
+            log.info("Skipping the repartition topic validation since there 
are no repartition topics.");
+        } else {
+            // ensure the co-partitioning topics within the group have the 
same number of partitions,
+            // and enforce the number of partitions for those repartition 
topics to be the same if they
+            // are co-partitioned as well.
+            for (Collection<Set<String>> copartitionGroups : 
copartitionGroupsBySubtopology.values()) {
+                for (Set<String> copartitionGroup : copartitionGroups) {
+                    copartitionedTopicsEnforcer.enforce(copartitionGroup, 
repartitionTopicConfigs);
+                }
+            }
+        }
+    }
+
+    private static void configureChangelogTopics(LogContext logContext,
+                                                 Map<String, 
ConfiguredSubtopology> configuredSubtopologies,
+                                                 Function<String, Integer> 
topicPartitionCountProvider) {
+        final ChangelogTopics changelogTopics = new ChangelogTopics(logContext,
+            configuredSubtopologies, topicPartitionCountProvider);
+        changelogTopics.setup();
+    }
+
+    private static Integer getPartitionCount(MetadataImage metadataImage,
+                                             String topic,
+                                             Map<String, 
ConfiguredInternalTopic> configuredInternalTopics) {
+        final TopicImage topicImage = metadataImage.topics().getTopic(topic);
+        if (topicImage == null) {
+            if (configuredInternalTopics.containsKey(topic) && 
configuredInternalTopics.get(topic).numberOfPartitions().isPresent()) {
+                return 
configuredInternalTopics.get(topic).numberOfPartitions().get();
+            } else {
+                return null;
+            }
+        } else {
+            return topicImage.partitions().size();
+        }
+    }
+
+    private static CreatableTopic toCreatableTopic(final 
ConfiguredInternalTopic config) {
+
+        final CreatableTopic creatableTopic = new CreatableTopic();
+
+        creatableTopic.setName(config.name());
+
+        if (!config.numberOfPartitions().isPresent()) {
+            throw new IllegalStateException(
+                "Number of partitions must be set for topic " + config.name());
+        } else {
+            creatableTopic.setNumPartitions(config.numberOfPartitions().get());
+        }
+
+        if (config.replicationFactor().isPresent() && 
config.replicationFactor().get() != 0) {
+            
creatableTopic.setReplicationFactor(config.replicationFactor().get());
+        } else {
+            creatableTopic.setReplicationFactor((short) -1);
+        }
+
+        final CreatableTopicConfigCollection topicConfigs = new 
CreatableTopicConfigCollection();
+
+        config.topicConfigs().forEach((k, v) -> {
+            final CreatableTopicConfig topicConfig = new 
CreatableTopicConfig();
+            topicConfig.setName(k);
+            topicConfig.setValue(v);
+            topicConfigs.add(topicConfig);
+        });
+
+        creatableTopic.setConfigs(topicConfigs);
+
+        return creatableTopic;
+    }
+
+    private static ConfiguredSubtopology fromPersistedSubtopology(
+        final StreamsGroupTopologyValue.Subtopology subtopology) {
+        // TODO: Need to resolve regular expressions here.
+        return new ConfiguredSubtopology(
+            new HashSet<>(subtopology.repartitionSinkTopics()),
+            new HashSet<>(subtopology.sourceTopics()),
+            subtopology.repartitionSourceTopics().stream()
+                .map(InternalTopicManager::fromPersistedTopicInfo)
+                .collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> 
x)),
+            subtopology.stateChangelogTopics().stream()
+                .map(InternalTopicManager::fromPersistedTopicInfo)
+                .collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> 
x))
+        );
+    }
+
+    private static ConfiguredInternalTopic fromPersistedTopicInfo(
+        final StreamsGroupTopologyValue.TopicInfo topicInfo) {
+        return new ConfiguredInternalTopic(
+            topicInfo.name(),
+            topicInfo.topicConfigs() != null ? 
topicInfo.topicConfigs().stream()
+                
.collect(Collectors.toMap(StreamsGroupTopologyValue.TopicConfig::key,
+                    StreamsGroupTopologyValue.TopicConfig::value))
+                : Collections.emptyMap(),
+            topicInfo.partitions() == 0 ? Optional.empty() : 
Optional.of(topicInfo.partitions()),
+            topicInfo.replicationFactor() == 0 ? Optional.empty()
+                : Optional.of(topicInfo.replicationFactor()));
+    }
+
+    private static Collection<Set<String>> 
copartitionGroupsFromPersistedSubtopology(
+        final StreamsGroupTopologyValue.Subtopology subtopology) {
+        return subtopology.copartitionGroups().stream().map(copartitionGroup ->
+            Stream.concat(
+                copartitionGroup.sourceTopics().stream()
+                    .map(i -> subtopology.sourceTopics().get(i)),
+                copartitionGroup.repartitionSourceTopics().stream()
+                    .map(i -> 
subtopology.repartitionSourceTopics().get(i).name())
+            ).collect(Collectors.toSet())
+        ).collect(Collectors.toList());
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
new file mode 100644
index 00000000000..022e7a3a5e6
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for configuring the number of partitions in 
repartitioning topics.
+ */
+public class RepartitionTopics {
+
+    private final Logger log;
+    private final Map<String, ConfiguredSubtopology> 
subtopologyToConfiguredSubtopology;
+    private final Function<String, Integer> topicPartitionCountProvider;
+
+    private final Map<String, Set<String>> missingInputTopicsBySubtopology = 
new HashMap<>();
+
+    public RepartitionTopics(final LogContext logContext,
+                             final Map<String, ConfiguredSubtopology> 
subtopologyToConfiguredSubtopology,
+                             final Function<String, Integer> 
topicPartitionCountProvider) {
+        this.log = logContext.logger(getClass());
+        this.subtopologyToConfiguredSubtopology = 
subtopologyToConfiguredSubtopology;
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }
+
+    /**
+     * Modifies the provided ConfiguredSubtopology to set the number of 
partitions for each repartition topic.
+     *
+     * @return the map of repartition topics for the requested topology that 
are internal and may need to be created.
+     */
+    public Map<String, ConfiguredInternalTopic> setup() {
+        final Set<String> missingSourceTopicsForTopology = new HashSet<>();
+        final Map<String, ConfiguredInternalTopic> configuredRepartitionTopics 
= new HashMap<>();
+
+        for (final Map.Entry<String, ConfiguredSubtopology> subtopologyEntry : 
subtopologyToConfiguredSubtopology.entrySet()) {
+            final ConfiguredSubtopology configuredSubtopology = 
subtopologyEntry.getValue();
+
+            configuredRepartitionTopics.putAll(
+                configuredSubtopology.repartitionSourceTopics()
+                    .values()
+                    .stream()
+                    .collect(Collectors.toMap(ConfiguredInternalTopic::name, 
topicConfig -> topicConfig)));
+
+            final Set<String> missingSourceTopicsForSubtopology = 
computeMissingExternalSourceTopics(configuredSubtopology);
+            
missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
+            if (!missingSourceTopicsForSubtopology.isEmpty()) {
+                final String subtopologyId = subtopologyEntry.getKey();
+                missingInputTopicsBySubtopology.put(subtopologyId, 
missingSourceTopicsForSubtopology);
+                log.error("Subtopology {} has missing source topics {} and 
will be excluded from the current assignment, "
+                        + "this can be due to the consumer client's metadata 
being stale or because they have "
+                        + "not been created yet. Please verify that you have 
created all input topics; if they "
+                        + "do exist, you just need to wait for the metadata to 
be updated, at which time a new "
+                        + "rebalance will be kicked off automatically and the 
topology will be retried at that time.",
+                    subtopologyId, missingSourceTopicsForSubtopology);
+            }
+        }
+
+        if (missingSourceTopicsForTopology.isEmpty()) {
+            
setRepartitionSourceTopicPartitionCount(configuredRepartitionTopics);
+            return configuredRepartitionTopics;
+        } else {
+            Set<String> missingSourceTopics = 
missingInputTopicsBySubtopology.values().stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.toSet());
+            throw new 
StreamsMissingSourceTopicsException(String.format("Missing source topics: %s",
+                String.join(", ", missingSourceTopics)));
+        }
+    }
+
+    private Set<String> computeMissingExternalSourceTopics(final 
ConfiguredSubtopology configuredSubtopology) {
+        final Set<String> missingExternalSourceTopics = new 
HashSet<>(configuredSubtopology.sourceTopics());
+        
missingExternalSourceTopics.removeAll(configuredSubtopology.repartitionSourceTopics().keySet());
+        missingExternalSourceTopics.removeIf(x -> 
topicPartitionCountProvider.apply(x) != null);
+        return missingExternalSourceTopics;
+    }
+
+    /**
+     * Computes the number of partitions and sets it for each repartition 
topic in repartitionTopicMetadata
+     */
+    private void setRepartitionSourceTopicPartitionCount(final Map<String, 
ConfiguredInternalTopic> repartitionTopicMetadata) {
+        boolean partitionCountNeeded;
+        do {
+            partitionCountNeeded = false;
+            // avoid infinitely looping without making any progress on unknown 
repartitions
+            boolean progressMadeThisIteration = false;
+
+            for (final ConfiguredSubtopology configuredSubtopology : 
subtopologyToConfiguredSubtopology.values()) {
+                for (final String repartitionSourceTopic : 
configuredSubtopology.repartitionSourceTopics()
+                    .keySet()) {
+                    final Optional<Integer> 
repartitionSourceTopicPartitionCount =
+                        
repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions();
+
+                    if (!repartitionSourceTopicPartitionCount.isPresent()) {
+                        final Integer numPartitions = computePartitionCount(
+                            repartitionTopicMetadata,
+                            repartitionSourceTopic
+                        );
+
+                        if (numPartitions == null) {
+                            partitionCountNeeded = true;
+                            log.trace("Unable to determine number of 
partitions for {}, another iteration is needed",
+                                repartitionSourceTopic);
+                        } else {
+                            log.trace("Determined number of partitions for {} 
to be {}",
+                                repartitionSourceTopic,
+                                numPartitions);
+                            
repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
+                            progressMadeThisIteration = true;
+                        }
+                    }
+                }
+            }
+            if (!progressMadeThisIteration && partitionCountNeeded) {
+                throw new StreamsMissingSourceTopicsException("Failed to 
compute number of partitions for all " +
+                    "repartition topics, make sure all user input topics are 
created and all pattern subscriptions " +
+                    "match at least one topic in the cluster");
+            }
+        } while (partitionCountNeeded);
+    }
+
+    private Integer computePartitionCount(final Map<String, 
ConfiguredInternalTopic> repartitionTopicMetadata,
+                                          final String repartitionSourceTopic) 
{
+        Integer partitionCount = null;
+        // try set the number of partitions for this repartition topic if it 
is not set yet
+        for (final ConfiguredSubtopology configuredSubtopology : 
subtopologyToConfiguredSubtopology.values()) {
+            final Set<String> repartitionSinkTopics = 
configuredSubtopology.repartitionSinkTopics();
+
+            if (repartitionSinkTopics.contains(repartitionSourceTopic)) {
+                // if this topic is one of the sink topics of this topology,
+                // use the maximum of all its source topic partitions as the 
number of partitions
+                for (final String upstreamSourceTopic : 
configuredSubtopology.sourceTopics()) {
+                    Integer numPartitionsCandidate = null;
+                    // It is possible the sourceTopic is another internal 
topic, i.e,
+                    // map().join().join(map())
+                    if 
(repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
+                        if 
(repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent())
 {
+                            numPartitionsCandidate =
+                                
repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
+                        }
+                    } else {
+                        numPartitionsCandidate = 
topicPartitionCountProvider.apply(upstreamSourceTopic);
+                    }
+
+                    if (numPartitionsCandidate != null) {
+                        if (partitionCount == null || numPartitionsCandidate > 
partitionCount) {
+                            partitionCount = numPartitionsCandidate;
+                        }
+                    }
+                }
+            }
+        }
+        return partitionCount;
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java
new file mode 100644
index 00000000000..19584c53fef
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ChangelogTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME = "source";
+    private static final String SINK_TOPIC_NAME = "sink";
+    private static final String REPARTITION_TOPIC_NAME = "repartition";
+    private static final String CHANGELOG_TOPIC_NAME1 = "changelog1";
+    private static final Map<String, String> TOPIC_CONFIG = 
Collections.singletonMap("config1", "val1");
+    private static final ConfiguredInternalTopic REPARTITION_TOPIC_CONFIG =
+        new ConfiguredInternalTopic(REPARTITION_TOPIC_NAME, TOPIC_CONFIG);
+    private static final ConfiguredSubtopology TOPICS_INFO_NOSOURCE = new 
ConfiguredSubtopology(
+        Collections.singleton(SINK_TOPIC_NAME),
+        Collections.emptySet(),
+        mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)),
+        mkMap()
+    );
+    private static final ConfiguredSubtopology TOPICS_INFO_STATELESS = new 
ConfiguredSubtopology(
+        Collections.singleton(SINK_TOPIC_NAME),
+        Collections.singleton(SOURCE_TOPIC_NAME),
+        mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)),
+        mkMap()
+    );
+    private static final ConfiguredInternalTopic CHANGELOG_TOPIC_CONFIG =
+        new ConfiguredInternalTopic(CHANGELOG_TOPIC_NAME1, TOPIC_CONFIG);
+    private static final ConfiguredSubtopology TOPICS_INFO_STATEFUL = new 
ConfiguredSubtopology(
+        Collections.singleton(SINK_TOPIC_NAME),
+        Collections.singleton(SOURCE_TOPIC_NAME),
+        mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)),
+        mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))
+    );
+    private static final ConfiguredSubtopology TOPICS_INFO_SOURCE_CHANGELOG = 
new ConfiguredSubtopology(
+        Collections.singleton(SINK_TOPIC_NAME),
+        Collections.singleton(SOURCE_TOPIC_NAME),
+        mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)),
+        mkMap(mkEntry(SOURCE_TOPIC_NAME, CHANGELOG_TOPIC_CONFIG))
+    );
+    private static final ConfiguredSubtopology TOPICS_INFO_BOTH = new 
ConfiguredSubtopology(
+        Collections.singleton(SINK_TOPIC_NAME),
+        Collections.singleton(SOURCE_TOPIC_NAME),
+        mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)),
+        mkMap(mkEntry(SOURCE_TOPIC_NAME, null), mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG))
+    );
+
+    private static Integer topicPartitionProvider(String s) {
+        return 3;
+    }
+
+    @Test
+    public void shouldFailIfNoSourceTopics() {
+        final Map<String, ConfiguredSubtopology> subtopologies = 
mkMap(mkEntry("subtopology_0", TOPICS_INFO_NOSOURCE));
+
+        final ChangelogTopics changelogTopics =
+            new ChangelogTopics(LOG_CONTEXT, subtopologies, 
ChangelogTopicsTest::topicPartitionProvider);
+        StreamsInvalidTopologyException e = 
assertThrows(StreamsInvalidTopologyException.class, () -> 
changelogTopics.setup());
+
+        assertTrue(e.getMessage().contains("No source topics found for 
subtopology"));
+    }
+
+    @Test
+    public void shouldNotContainChangelogsForStatelessTasks() {
+        final Map<String, ConfiguredSubtopology> subtopologies = 
mkMap(mkEntry("subtopology_0", TOPICS_INFO_STATELESS));
+
+        final ChangelogTopics changelogTopics =
+            new ChangelogTopics(LOG_CONTEXT, subtopologies, 
ChangelogTopicsTest::topicPartitionProvider);
+        Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup();
+
+        assertEquals(Collections.emptyMap(), setup);
+    }
+
+    @Test
+    public void shouldContainNonSourceBasedChangelogs() {
+        final Map<String, ConfiguredSubtopology> subtopologies = 
mkMap(mkEntry("subtopology_0", TOPICS_INFO_STATEFUL));
+
+        final ChangelogTopics changelogTopics =
+            new ChangelogTopics(LOG_CONTEXT, subtopologies, 
ChangelogTopicsTest::topicPartitionProvider);
+        Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup();
+
+        assertEquals(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG)), setup);
+        assertEquals(3, 
CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE));
+    }
+
+    @Test
+    public void shouldNotContainSourceBasedChangelogs() {
+        final Map<String, ConfiguredSubtopology> subtopologies = 
mkMap(mkEntry("subtopology_0", TOPICS_INFO_SOURCE_CHANGELOG));
+
+        final ChangelogTopics changelogTopics =
+            new ChangelogTopics(LOG_CONTEXT, subtopologies, 
ChangelogTopicsTest::topicPartitionProvider);
+        Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup();
+
+        assertEquals(Collections.emptyMap(), setup);
+    }
+
+    @Test
+    public void shouldContainBothTypesOfPreExistingChangelogs() {
+        final Map<String, ConfiguredSubtopology> subtopologies = 
mkMap(mkEntry("subtopology_0", TOPICS_INFO_BOTH));
+
+        final ChangelogTopics changelogTopics =
+            new ChangelogTopics(LOG_CONTEXT, subtopologies, 
ChangelogTopicsTest::topicPartitionProvider);
+        Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup();
+
+        assertEquals(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG)), setup);
+        assertEquals(3, 
CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE));
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
new file mode 100644
index 00000000000..12c8adb0a3b
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ConfiguredInternalTopicTest {
+
+    @Test
+    public void testConstructorAndGetters() {
+        Map<String, String> topicConfigs = new HashMap<>();
+        topicConfigs.put("retention.ms", "1000");
+        topicConfigs.put("message.timestamp.type", "LogAppendTime");
+
+        ConfiguredInternalTopic config = new 
ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), 
Optional.of((short) 2));
+
+        assertEquals("test-topic", config.name());
+        assertEquals(topicConfigs, config.topicConfigs());
+        assertEquals(Optional.of(3), config.numberOfPartitions());
+        assertEquals(Optional.of((short) 2), config.replicationFactor());
+    }
+
+    @Test
+    public void testConstructorWithNullName() {
+        assertThrows(NullPointerException.class, () -> new 
ConfiguredInternalTopic(null, Collections.emptyMap()));
+    }
+
+    @Test
+    public void testConstructorWithInvalidName() {
+        assertThrows(InvalidTopicException.class, () -> new 
ConfiguredInternalTopic("invalid topic name", Collections.emptyMap()));
+    }
+
+    @Test
+    public void testConstructorWithNullTopicConfigs() {
+        assertThrows(NullPointerException.class, () -> new 
ConfiguredInternalTopic("test-topic", null));
+    }
+
+    @Test
+    public void testSetNumberOfPartitions() {
+        ConfiguredInternalTopic config = new 
ConfiguredInternalTopic("test-topic", Collections.emptyMap());
+        config.setNumberOfPartitions(3);
+        assertEquals(Optional.of(3), config.numberOfPartitions());
+    }
+
+    @Test
+    public void testSetNumberOfPartitionsInvalid() {
+        ConfiguredInternalTopic config = new 
ConfiguredInternalTopic("test-topic", Collections.emptyMap());
+        assertThrows(IllegalArgumentException.class, () -> 
config.setNumberOfPartitions(0));
+    }
+
+    @Test
+    public void testSetNumberOfPartitionsUnsupportedOperation() {
+        ConfiguredInternalTopic config = new 
ConfiguredInternalTopic("test-topic", Collections.emptyMap(), Optional.of(3),
+            Optional.empty());
+        assertThrows(UnsupportedOperationException.class, () -> 
config.setNumberOfPartitions(4));
+    }
+
+    @Test
+    public void testEqualsAndHashCode() {
+        Map<String, String> topicConfigs = new HashMap<>();
+        topicConfigs.put("retention.ms", "1000");
+
+        ConfiguredInternalTopic config1 = new 
ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), 
Optional.of((short) 2));
+        ConfiguredInternalTopic config2 = new 
ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), 
Optional.of((short) 2));
+
+        assertEquals(config1, config2);
+        assertEquals(config1.hashCode(), config2.hashCode());
+    }
+
+    @Test
+    public void testToString() {
+        Map<String, String> topicConfigs = new HashMap<>();
+        topicConfigs.put("retention.ms", "1000");
+
+        ConfiguredInternalTopic config = new 
ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), 
Optional.of((short) 2));
+        String expectedString = "ConfiguredInternalTopic(name=test-topic, 
topicConfigs={retention.ms=1000}, numberOfPartitions=Optional[3], 
replicationFactor=Optional[2], enforceNumberOfPartitions=true)";
+
+        assertEquals(expectedString, config.toString());
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
new file mode 100644
index 00000000000..71d05ce3d2a
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConfiguredSubtopologyTest {
+
+    @Test
+    public void testConstructorAndGetters() {
+        Set<String> repartitionSinkTopics = Set.of("repartitionSinkTopic1", 
"repartitionSinkTopic2");
+        Set<String> sourceTopics = Set.of("sourceTopic1", "sourceTopic2");
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new 
HashMap<>();
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = new 
HashMap<>();
+
+        ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(repartitionSinkTopics, sourceTopics,
+            repartitionSourceTopics, stateChangelogTopics);
+
+        assertEquals(repartitionSinkTopics, 
configuredSubtopology.repartitionSinkTopics());
+        assertEquals(sourceTopics, configuredSubtopology.sourceTopics());
+        assertEquals(repartitionSourceTopics, 
configuredSubtopology.repartitionSourceTopics());
+        assertEquals(stateChangelogTopics, 
configuredSubtopology.stateChangelogTopics());
+    }
+
+    @Test
+    public void testSetters() {
+        ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology();
+
+        Set<String> repartitionSinkTopics = Set.of("repartitionSinkTopic1", 
"repartitionSinkTopic2");
+        configuredSubtopology.setRepartitionSinkTopics(repartitionSinkTopics);
+        assertEquals(repartitionSinkTopics, 
configuredSubtopology.repartitionSinkTopics());
+
+        Set<String> sourceTopics = Set.of("sourceTopic1", "sourceTopic2");
+        configuredSubtopology.setSourceTopics(sourceTopics);
+        assertEquals(sourceTopics, configuredSubtopology.sourceTopics());
+
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new 
HashMap<>();
+        
configuredSubtopology.setRepartitionSourceTopics(repartitionSourceTopics);
+        assertEquals(repartitionSourceTopics, 
configuredSubtopology.repartitionSourceTopics());
+
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = new 
HashMap<>();
+        configuredSubtopology.setStateChangelogTopics(stateChangelogTopics);
+        assertEquals(stateChangelogTopics, 
configuredSubtopology.stateChangelogTopics());
+    }
+
+    @Test
+    public void testNonSourceChangelogTopics() {
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = new 
HashMap<>();
+        stateChangelogTopics.put("changelogTopic1", new 
ConfiguredInternalTopic("changelogTopic1"));
+        stateChangelogTopics.put("sourceTopic1", new 
ConfiguredInternalTopic("sourceTopic1"));
+
+        ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(
+            Collections.emptySet(),
+            Collections.singleton("sourceTopic1"),
+            Collections.emptyMap(),
+            stateChangelogTopics
+        );
+
+        Set<ConfiguredInternalTopic> nonSourceChangelogTopics = 
configuredSubtopology.nonSourceChangelogTopics();
+        assertEquals(1, nonSourceChangelogTopics.size());
+        assertTrue(nonSourceChangelogTopics.contains(new 
ConfiguredInternalTopic("changelogTopic1")));
+    }
+
+    @Test
+    public void testEquals() {
+        Set<String> repartitionSinkTopics = new 
HashSet<>(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2"));
+        Set<String> sourceTopics = new HashSet<>(Arrays.asList("sourceTopic1", 
"sourceTopic2"));
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new 
HashMap<>();
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = new 
HashMap<>();
+
+        ConfiguredSubtopology configuredSubtopology1 = new 
ConfiguredSubtopology(repartitionSinkTopics, sourceTopics,
+            repartitionSourceTopics, stateChangelogTopics);
+        ConfiguredSubtopology configuredSubtopology2 = new 
ConfiguredSubtopology(repartitionSinkTopics, sourceTopics,
+            repartitionSourceTopics, stateChangelogTopics);
+
+        assertEquals(configuredSubtopology1, configuredSubtopology2);
+    }
+
+    @Test
+    public void testHashCode() {
+        Set<String> repartitionSinkTopics = new 
HashSet<>(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2"));
+        Set<String> sourceTopics = new HashSet<>(Arrays.asList("sourceTopic1", 
"sourceTopic2"));
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new 
HashMap<>();
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = new 
HashMap<>();
+
+        ConfiguredSubtopology configuredSubtopology1 = new 
ConfiguredSubtopology(repartitionSinkTopics, sourceTopics,
+            repartitionSourceTopics, stateChangelogTopics);
+        ConfiguredSubtopology configuredSubtopology2 = new 
ConfiguredSubtopology(repartitionSinkTopics, sourceTopics,
+            repartitionSourceTopics, stateChangelogTopics);
+
+        assertEquals(configuredSubtopology1.hashCode(), 
configuredSubtopology2.hashCode());
+    }
+
+    @Test
+    public void testToString() {
+        Set<String> repartitionSinkTopics = new 
HashSet<>(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2"));
+        Set<String> sourceTopics = new HashSet<>(Arrays.asList("sourceTopic1", 
"sourceTopic2"));
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new 
HashMap<>();
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = new 
HashMap<>();
+
+        ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(repartitionSinkTopics, sourceTopics,
+            repartitionSourceTopics, stateChangelogTopics);
+
+        String expectedString = "ConfiguredSubtopology{" +
+            "repartitionSinkTopics=" + repartitionSinkTopics +
+            ", sourceTopics=" + sourceTopics +
+            ", stateChangelogTopics=" + stateChangelogTopics +
+            ", repartitionSourceTopics=" + repartitionSourceTopics +
+            '}';
+
+        assertEquals(expectedString, configuredSubtopology.toString());
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java
new file mode 100644
index 00000000000..1e476247c49
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import 
org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@SuppressWarnings("OptionalGetWithoutIsPresent")
+public class CopartitionedTopicsEnforcerTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+
+    private static Integer emptyTopicPartitionProvider(String topic) {
+        return null;
+    }
+
+    private static Integer firstSecondTopicConsistent(String topic) {
+        if (topic.equals("first") || topic.equals("second")) {
+            return 2;
+        }
+        return null;
+    }
+
+    private static Integer firstSecondTopicInconsistent(String topic) {
+        if (topic.equals("first")) {
+            return 2;
+        }
+        if (topic.equals("second")) {
+            return 1;
+        }
+        return null;
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider);
+        assertThrows(StreamsInvalidTopologyException.class, () -> 
validator.enforce(Collections.singleton("topic"),
+            Collections.emptyMap()));
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent);
+        assertThrows(StreamsInconsistentInternalTopicsException.class, () -> 
validator.enforce(Set.of("first", "second"),
+            Collections.emptyMap()));
+    }
+
+
+    @Test
+    public void shouldEnforceCopartitioningOnRepartitionTopics() {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic config = 
createTopicConfig("repartitioned", 10);
+
+        validator.enforce(Set.of("first", "second", config.name()),
+            Collections.singletonMap(config.name(), config));
+
+        assertEquals(Optional.of(2), config.numberOfPartitions());
+    }
+
+
+    @Test
+    public void 
shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider);
+        final ConfiguredInternalTopic one = createTopicConfig("one", 1);
+        final ConfiguredInternalTopic two = createTopicConfig("two", 15);
+        final ConfiguredInternalTopic three = createTopicConfig("three", 5);
+        final Map<String, ConfiguredInternalTopic> configuredInternalTopics = 
new HashMap<>();
+
+        configuredInternalTopics.put(one.name(), one);
+        configuredInternalTopics.put(two.name(), two);
+        configuredInternalTopics.put(three.name(), three);
+
+        validator.enforce(Set.of(
+                one.name(),
+                two.name(),
+                three.name()
+            ),
+            configuredInternalTopics
+        );
+
+        assertEquals(Optional.of(15), one.numberOfPartitions());
+        assertEquals(Optional.of(15), two.numberOfPartitions());
+        assertEquals(Optional.of(15), three.numberOfPartitions());
+    }
+
+    @Test
+    public void 
shouldThrowAnExceptionIfConfiguredInternalTopicsWithEnforcedNumOfPartitionsHaveDifferentNumOfPartitions()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic topic1 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 
10);
+        final ConfiguredInternalTopic topic2 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-2", 
5);
+
+        final StreamsInconsistentInternalTopicsException ex = assertThrows(
+            StreamsInconsistentInternalTopicsException.class,
+            () -> validator.enforce(Set.of(topic1.name(), topic2.name()),
+                Utils.mkMap(
+                    Utils.mkEntry(topic1.name(), topic1),
+                    Utils.mkEntry(topic2.name(), topic2)
+                )
+            )
+        );
+
+        final TreeMap<String, Integer> sorted = new TreeMap<>(
+            Utils.mkMap(Utils.mkEntry(topic1.name(), 
topic1.numberOfPartitions().get()),
+                Utils.mkEntry(topic2.name(), 
topic2.numberOfPartitions().get()))
+        );
+
+        assertEquals(String.format(
+            "Following topics do not have the same number of partitions: " +
+                "[%s]", sorted), ex.getMessage());
+    }
+
+    @Test
+    public void 
shouldNotThrowAnExceptionWhenConfiguredInternalTopicsWithEnforcedNumOfPartitionsAreValid()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic topic1 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 
10);
+        final ConfiguredInternalTopic topic2 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-2", 
10);
+
+        validator.enforce(Set.of(topic1.name(), topic2.name()),
+            Utils.mkMap(
+                Utils.mkEntry(topic1.name(), topic1),
+                Utils.mkEntry(topic2.name(), topic2)
+            )
+        );
+
+        assertEquals(Optional.of(10), topic1.numberOfPartitions());
+        assertEquals(Optional.of(10), topic2.numberOfPartitions());
+    }
+
+    @Test
+    public void 
shouldThrowAnExceptionWhenNumberOfPartitionsOfNonRepartitionTopicAndRepartitionTopicWithEnforcedNumOfPartitionsDoNotMatch()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic topic1 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 
10);
+
+        final StreamsInconsistentInternalTopicsException ex = assertThrows(
+            StreamsInconsistentInternalTopicsException.class,
+            () -> validator.enforce(Set.of(topic1.name(), "second"),
+                Utils.mkMap(Utils.mkEntry(topic1.name(), topic1)))
+        );
+
+        assertEquals(String.format("Number of partitions [%s] " +
+                "of repartition topic [%s] " +
+                "doesn't match number of partitions [%s] of the source topic.",
+            topic1.numberOfPartitions().get(), topic1.name(), 2), 
ex.getMessage());
+    }
+
+    @Test
+    public void 
shouldNotThrowAnExceptionWhenNumberOfPartitionsOfNonRepartitionTopicAndRepartitionTopicWithEnforcedNumOfPartitionsMatch()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic topic1 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 
2);
+
+        validator.enforce(Set.of(topic1.name(), "second"),
+            Utils.mkMap(Utils.mkEntry(topic1.name(), topic1)));
+
+        assertEquals(Optional.of(2), topic1.numberOfPartitions());
+    }
+
+    @Test
+    public void 
shouldDeductNumberOfPartitionsFromRepartitionTopicWithEnforcedNumberOfPartitions()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT,
+            CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic topic1 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 
2);
+        final ConfiguredInternalTopic topic2 = 
createTopicConfig("repartitioned-2", 5);
+        final ConfiguredInternalTopic topic3 = 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-3", 
2);
+
+        validator.enforce(Set.of(topic1.name(), topic2.name()),
+            Utils.mkMap(
+                Utils.mkEntry(topic1.name(), topic1),
+                Utils.mkEntry(topic2.name(), topic2),
+                Utils.mkEntry(topic3.name(), topic3)
+            )
+        );
+
+        assertEquals(topic1.numberOfPartitions(), topic2.numberOfPartitions());
+        assertEquals(topic2.numberOfPartitions(), topic3.numberOfPartitions());
+    }
+
+    private ConfiguredInternalTopic createTopicConfig(final String 
repartitionTopic,
+                                                      final int partitions) {
+        final ConfiguredInternalTopic config =
+            new ConfiguredInternalTopic(repartitionTopic, 
Collections.emptyMap());
+
+        config.setNumberOfPartitions(partitions);
+        return config;
+    }
+
+    private ConfiguredInternalTopic 
createConfiguredInternalTopicWithEnforcedNumberOfPartitions(final String 
repartitionTopic,
+                                                                               
                 final int partitions) {
+        return new ConfiguredInternalTopic(repartitionTopic,
+            Collections.emptyMap(),
+            Optional.of(partitions),
+            Optional.empty());
+    }
+
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
new file mode 100644
index 00000000000..e6ef6fee5d3
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class InternalTopicManagerTest {
+
+    @Test
+    void testMissingTopics() {
+        MetadataImage metadataImage = mock(MetadataImage.class);
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        when(metadataImage.topics()).thenReturn(topicsImage);
+        final TopicImage sourceTopic1 =
+            new TopicImage("source_topic1", Uuid.randomUuid(), 
mkMap(mkEntry(0, null), mkEntry(1, null)));
+        final TopicImage sourceTopic2 =
+            new TopicImage("source_topic2", Uuid.randomUuid(), 
mkMap(mkEntry(0, null), mkEntry(1, null)));
+        final TopicImage stateChangelogTopic2 = new 
TopicImage("state_changelog_topic2", Uuid.randomUuid(),
+            mkMap(mkEntry(0, null), mkEntry(1, null)));
+        when(topicsImage.topicsByName()).thenReturn(
+            ImmutableMap.singleton("source_topic1", sourceTopic1)
+                .updated("source_topic2", sourceTopic2)
+                .updated("state_changelog_topic2", stateChangelogTopic2)
+        );
+        
when(topicsImage.getTopic(eq("source_topic1"))).thenReturn(sourceTopic1);
+        
when(topicsImage.getTopic(eq("source_topic2"))).thenReturn(sourceTopic2);
+        
when(topicsImage.getTopic(eq("state_changelog_topic2"))).thenReturn(stateChangelogTopic2);
+        Map<String, ConfiguredSubtopology> subtopologyMap = 
makeExpectedConfiguredTopology();
+
+        Map<String, CreatableTopic> missingTopics = 
InternalTopicManager.missingTopics(subtopologyMap, metadataImage);
+
+        assertEquals(2, missingTopics.size());
+        assertEquals(
+            new CreatableTopic()
+                .setName("repartition_topic")
+                .setNumPartitions(2)
+                .setReplicationFactor((short) 3),
+            missingTopics.get("repartition_topic")
+        );
+        assertEquals(
+            new CreatableTopic()
+                .setName("state_changelog_topic1")
+                .setNumPartitions(2)
+                .setReplicationFactor((short) -1)
+                .setConfigs(
+                    new CreatableTopicConfigCollection(
+                        Collections.singletonList(new 
CreatableTopicConfig().setName("cleanup.policy").setValue("compact")).iterator())
+                ),
+            missingTopics.get("state_changelog_topic1"));
+    }
+
+    @Test
+    void testConfigureTopics() {
+        MetadataImage metadataImage = mock(MetadataImage.class);
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        when(metadataImage.topics()).thenReturn(topicsImage);
+        when(topicsImage.getTopic(eq("source_topic1"))).thenReturn(
+            new TopicImage("source_topic1", Uuid.randomUuid(), 
mkMap(mkEntry(0, null), mkEntry(1, null))));
+        when(topicsImage.getTopic(eq("source_topic2"))).thenReturn(
+            new TopicImage("source_topic2", Uuid.randomUuid(), 
mkMap(mkEntry(0, null), mkEntry(1, null))));
+        List<Subtopology> subtopologyList = makeTestTopology();
+
+        Map<String, ConfiguredSubtopology> configuredSubtopologies =
+            InternalTopicManager.configureTopics(new LogContext(), 
subtopologyList, metadataImage);
+
+        Map<String, ConfiguredSubtopology> expectedConfiguredSubtopologyMap = 
makeExpectedConfiguredTopology();
+        assertEquals(expectedConfiguredSubtopologyMap, 
configuredSubtopologies);
+    }
+
+    private static Map<String, ConfiguredSubtopology> 
makeExpectedConfiguredTopology() {
+        return mkMap(
+            mkEntry("subtopology1",
+                new ConfiguredSubtopology()
+                    .setSourceTopics(Set.of("source_topic1"))
+                    
.setStateChangelogTopics(Collections.singletonMap("state_changelog_topic1",
+                        new ConfiguredInternalTopic("state_changelog_topic1",
+                            Collections.singletonMap("cleanup.policy", 
"compact"),
+                            Optional.empty(),
+                            Optional.empty()
+                        ).setNumberOfPartitions(2)))
+                    .setRepartitionSinkTopics(Set.of("repartition_topic"))
+            ),
+            mkEntry("subtopology2",
+                new ConfiguredSubtopology()
+                    .setSourceTopics(Set.of("source_topic2"))
+                    
.setRepartitionSourceTopics(Collections.singletonMap("repartition_topic",
+                        new ConfiguredInternalTopic("repartition_topic",
+                            Collections.emptyMap(),
+                            Optional.empty(),
+                            Optional.of((short) 3)
+                        ).setNumberOfPartitions(2)
+                    ))
+                    
.setStateChangelogTopics(Collections.singletonMap("state_changelog_topic2",
+                        new ConfiguredInternalTopic("state_changelog_topic2",
+                            Collections.emptyMap(),
+                            Optional.empty(),
+                            Optional.empty()
+                        ).setNumberOfPartitions(2)))
+            )
+        );
+    }
+
+    private static List<Subtopology> makeTestTopology() {
+        // Create a subtopology source -> repartition
+        Subtopology subtopology1 = new Subtopology()
+            .setSubtopologyId("subtopology1")
+            .setSourceTopics(Collections.singletonList("source_topic1"))
+            
.setRepartitionSinkTopics(Collections.singletonList("repartition_topic"))
+            .setStateChangelogTopics(Collections.singletonList(
+                new StreamsGroupTopologyValue.TopicInfo()
+                    .setName("state_changelog_topic1")
+                    .setTopicConfigs(Collections.singletonList(
+                        new StreamsGroupTopologyValue.TopicConfig()
+                            .setKey("cleanup.policy")
+                            .setValue("compact")
+                    ))
+            ));
+        // Create a subtopology repartition/source2 -> sink (copartitioned)
+        Subtopology subtopology2 = new Subtopology()
+            .setSubtopologyId("subtopology2")
+            .setSourceTopics(Collections.singletonList("source_topic2"))
+            .setRepartitionSourceTopics(Collections.singletonList(
+                new StreamsGroupTopologyValue.TopicInfo()
+                    .setName("repartition_topic")
+                    .setReplicationFactor((short) 3)
+            ))
+            .setStateChangelogTopics(Collections.singletonList(
+                new StreamsGroupTopologyValue.TopicInfo()
+                    .setName("state_changelog_topic2")
+            ))
+            .setCopartitionGroups(Collections.singletonList(
+                new StreamsGroupTopologyValue.CopartitionGroup()
+                    .setSourceTopics(Collections.singletonList((short) 0))
+                    
.setRepartitionSourceTopics(Collections.singletonList((short) 0))
+            ));
+        return Arrays.asList(subtopology1, subtopology2);
+    }
+
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
new file mode 100644
index 00000000000..54f91508d2d
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final String SINK_TOPIC_NAME1 = "sink1";
+    private static final String SINK_TOPIC_NAME2 = "sink2";
+    private static final String REPARTITION_TOPIC_NAME1 = "repartition1";
+    private static final String REPARTITION_TOPIC_NAME2 = "repartition2";
+    private static final String REPARTITION_WITHOUT_PARTITION_COUNT = 
"repartitionWithoutPartitionCount";
+    private static final Map<String, String> TOPIC_CONFIG1 = 
Collections.singletonMap("config1", "val1");
+    private static final Map<String, String> TOPIC_CONFIG2 = 
Collections.singletonMap("config2", "val2");
+    private static final Map<String, String> TOPIC_CONFIG5 = 
Collections.singletonMap("config5", "val5");
+    private static final ConfiguredInternalTopic REPARTITION_TOPIC_CONFIG1 =
+        new ConfiguredInternalTopic(REPARTITION_TOPIC_NAME1, TOPIC_CONFIG1, 
Optional.of(4), Optional.empty());
+    private static final ConfiguredInternalTopic REPARTITION_TOPIC_CONFIG2 =
+        new ConfiguredInternalTopic(REPARTITION_TOPIC_NAME2, TOPIC_CONFIG2, 
Optional.of(2), Optional.empty());
+    private static final ConfiguredInternalTopic 
REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT =
+        new ConfiguredInternalTopic(REPARTITION_WITHOUT_PARTITION_COUNT, 
TOPIC_CONFIG5);
+    private static final ConfiguredSubtopology 
TOPICS_INFO_WITHOUT_PARTITION_COUNT = new ConfiguredSubtopology(
+        Collections.singleton(SINK_TOPIC_NAME2),
+        Set.of(REPARTITION_TOPIC_NAME1, REPARTITION_WITHOUT_PARTITION_COUNT),
+        mkMap(
+            mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
+            mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT)
+        ),
+        Collections.emptyMap()
+    );
+    private static final ConfiguredSubtopology TOPICS_INFO1 = new 
ConfiguredSubtopology(
+        Collections.singleton(REPARTITION_TOPIC_NAME1),
+        Set.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2),
+        mkMap(
+            mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
+            mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2)
+        ),
+        Collections.emptyMap()
+    );
+    private static final ConfiguredSubtopology TOPICS_INFO2 = new 
ConfiguredSubtopology(
+        Collections.singleton(SINK_TOPIC_NAME1),
+        Collections.singleton(REPARTITION_TOPIC_NAME1),
+        mkMap(mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)),
+        Collections.emptyMap()
+    );
+    private static final Set<String> TOPICS = Set.of(
+        SOURCE_TOPIC_NAME1,
+        SOURCE_TOPIC_NAME2,
+        SINK_TOPIC_NAME1,
+        SINK_TOPIC_NAME2,
+        REPARTITION_TOPIC_NAME1,
+        REPARTITION_TOPIC_NAME2
+    );
+
+    @Test
+    public void shouldSetupRepartitionTopics() {
+        Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology 
= mkMap(mkEntry("subtopology_0", TOPICS_INFO1),
+            mkEntry("subtopology_1", TOPICS_INFO2));
+        Function<String, Integer> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? 3 : null;
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToConfiguredSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
+                mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2)
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+        Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology 
= mkMap(mkEntry("subtopology_0", TOPICS_INFO1),
+            mkEntry("subtopology_1", TOPICS_INFO2));
+        Function<String, Integer> topicPartitionCountProvider = s -> 
Objects.equals(s, SOURCE_TOPIC_NAME1) ? null
+            : (TOPICS.contains(s) ? 3 : null);
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToConfiguredSubtopology,
+            topicPartitionCountProvider
+        );
+
+        final StreamsMissingSourceTopicsException exception = 
assertThrows(StreamsMissingSourceTopicsException.class,
+            repartitionTopics::setup);
+
+        assertNotNull(exception);
+        assertEquals("Missing source topics: source1", exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
 {
+        Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology 
= mkMap(
+            mkEntry("subtopology_0", TOPICS_INFO1),
+            mkEntry("subtopology_1", TOPICS_INFO_WITHOUT_PARTITION_COUNT)
+        );
+        Function<String, Integer> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? 3 : null;
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToConfiguredSubtopology,
+            topicPartitionCountProvider
+        );
+
+        StreamsMissingSourceTopicsException exception = 
assertThrows(StreamsMissingSourceTopicsException.class, 
repartitionTopics::setup);
+
+        assertEquals(
+            "Failed to compute number of partitions for all repartition 
topics, make sure all user input topics are created and all pattern 
subscriptions match at least one topic in the cluster",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
+        final ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(
+            Set.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_WITHOUT_PARTITION_COUNT),
+            Set.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME2),
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
+                mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
+                mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT)
+            ),
+            Collections.emptyMap()
+        );
+        Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology 
= mkMap(
+            mkEntry("subtopology_0", configuredSubtopology),
+            mkEntry("subtopology_1", TOPICS_INFO_WITHOUT_PARTITION_COUNT)
+        );
+        Function<String, Integer> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? 3 : null;
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToConfiguredSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup();
+
+        assertEquals(mkMap(
+            mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
+            mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
+            mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT)
+        ), setup);
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic()
 {
+        final ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(
+            Set.of(REPARTITION_TOPIC_NAME2, 
REPARTITION_WITHOUT_PARTITION_COUNT),
+            Set.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME1),
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
+                mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
+                mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT)
+            ),
+            Collections.emptyMap()
+        );
+        Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology 
= mkMap(
+            mkEntry("subtopology_0", configuredSubtopology),
+            mkEntry("subtopology_1", TOPICS_INFO_WITHOUT_PARTITION_COUNT)
+        );
+        Function<String, Integer> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? 3 : null;
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToConfiguredSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
+                mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
+                mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT)
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartitionTopics() 
{
+        final ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(
+            Collections.singleton(SINK_TOPIC_NAME1),
+            Collections.singleton(SOURCE_TOPIC_NAME1),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+        Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology 
= mkMap(mkEntry("subtopology_0", configuredSubtopology));
+        Function<String, Integer> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? 3 : null;
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToConfiguredSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup();
+
+        assertEquals(Collections.emptyMap(), setup);
+    }
+
+}
\ No newline at end of file

Reply via email to