This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f5f901771ce1b5d53f65d76046638898b622f968 Author: Lucas Brutschy <[email protected]> AuthorDate: Tue Oct 22 15:11:40 2024 +0200 Resolve conflicts from trunk rebase - Port tools for topic configuration (#17371) * KSTREAMS-6456: Port tools for topic configuration Ports several tools for topic configuration from the client side, to the broker-side. Several things are refactored: - Decoupling. On the client side, for example, RepartitionTopics was using the CopartitionedTopicEnforcer, the InternalTopicCreator, the TopologyMetadata and the Cluster objects. All of those are mocked with Mockito during testing. This points to bad coupling. We refactored all classes to be mostly self-sufficient, only relying on themselves and simple interfaces. - Tests only use JUnit5, not hamcrast matchers and no other streams utilities, to not pollute the group coordinator module. - All classes only modify the configurations -- the code does not actually call into the AdminClient anymore. - We map all errors to new errors in the broker, in particular, the error for missing topics, inconsistent internal topics, and invalid topologies. We include the internal, mutable datastructures, that are set- and map-based for effiecient algorithms. They are distinctly different from the data represented in `StreamsGroupTopologyValue` and `StreamsGroupInitializeRequest`, since regular expressions must be resolved at this point. Both the topic creation and internal topic validation will be based on this code, the basics of this are implemented in the `InternalTopicManager`. Every time, either the broker-side topology or the topic metadata on the broker changes, we reconfigure the internal topics, check consistency with the current topics on the broker, and possibly trigger creation of the missing internal topics. These changes will be built on top of this change. * formatting fixes --- checkstyle/import-control-group-coordinator.xml | 1 + ...StreamsInconsistentInternalTopicsException.java | 23 ++ .../org/apache/kafka/common/protocol/Errors.java | 11 +- .../group/streams/topics/ChangelogTopics.java | 82 +++++++ .../streams/topics/ConfiguredInternalTopic.java | 138 ++++++++++++ .../streams/topics/ConfiguredSubtopology.java | 144 +++++++++++++ .../topics/CopartitionedTopicsEnforcer.java | 191 ++++++++++++++++ .../group/streams/topics/InternalTopicManager.java | 231 ++++++++++++++++++++ .../group/streams/topics/RepartitionTopics.java | 180 ++++++++++++++++ .../group/streams/topics/ChangelogTopicsTest.java | 136 ++++++++++++ .../topics/ConfiguredInternalTopicTest.java | 104 +++++++++ .../streams/topics/ConfiguredSubtopologyTest.java | 137 ++++++++++++ .../topics/CopartitionedTopicsEnforcerTest.java | 230 ++++++++++++++++++++ .../streams/topics/InternalTopicManagerTest.java | 178 +++++++++++++++ .../streams/topics/RepartitionTopicsTest.java | 239 +++++++++++++++++++++ 15 files changed, 2020 insertions(+), 5 deletions(-) diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 0619ea444d5..0f54365322c 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -63,6 +63,7 @@ <allow pkg="org.apache.kafka.deferred" /> <allow pkg="org.apache.kafka.image"/> <allow pkg="org.apache.kafka.server.common"/> + <allow pkg="org.apache.kafka.server.immutable"/> <allow pkg="org.apache.kafka.server.record"/> <allow pkg="org.apache.kafka.server.util"/> <allow pkg="org.apache.kafka.storage.internals.log"/> diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentInternalTopicsException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentInternalTopicsException.java new file mode 100644 index 00000000000..fbf81f14260 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentInternalTopicsException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsInconsistentInternalTopicsException extends ApiException { + public StreamsInconsistentInternalTopicsException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index db5194d4a2b..e8687314808 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -120,6 +120,7 @@ import org.apache.kafka.common.errors.SnapshotNotFoundException; import org.apache.kafka.common.errors.StaleBrokerEpochException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.StreamsGroupUninitializedException; +import org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException; import org.apache.kafka.common.errors.StreamsInconsistentTopologyException; import org.apache.kafka.common.errors.StreamsInvalidTopologyException; import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; @@ -425,12 +426,12 @@ public enum Errors { StreamsInconsistentTopologyException::new), STREAMS_MISSING_SOURCE_TOPICS(132, "One or more source topics are missing.", StreamsMissingSourceTopicsException::new), - STREAMS_MISSING_INTERNAL_TOPICS(133, "One or more internal topics are missing.", - StreamsMissingInternalTopicsException::new), - STREAMS_GROUP_UNINITIALIZED(134, "The group is not (fully) initialized, broker-side topology information or internal topics are missing.", + STREAMS_GROUP_UNINITIALIZED(133, "The group is not (fully) initialized, broker-side topology information or internal topics are missing.", StreamsGroupUninitializedException::new), - STREAMS_SHUTDOWN_APPLICATION(135, "A client requested the shutdown of the whole application.", - StreamsShutdownApplicationException::new); + STREAMS_SHUTDOWN_APPLICATION(134, "A client requested the shutdown of the whole application.", + StreamsShutdownApplicationException::new), + STREAMS_INCONSISTENT_INTERNAL_TOPICS(135, "One or more internal topics are missing.", + StreamsInconsistentInternalTopicsException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java new file mode 100644 index 00000000000..298c8abbfa3 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.function.Function; + +/** + * This class is responsible for setting up the changelog topics for a topology. + */ +public class ChangelogTopics { + + private final Map<String, ConfiguredSubtopology> subtopologies; + private final Function<String, Integer> topicPartitionCountProvider; + private final Logger log; + + public ChangelogTopics( + final LogContext logContext, + final Map<String, ConfiguredSubtopology> subtopologies, + final Function<String, Integer> topicPartitionCountProvider + ) { + this.log = logContext.logger(getClass()); + this.subtopologies = subtopologies; + this.topicPartitionCountProvider = topicPartitionCountProvider; + } + + /** + * Modifies the provided ConfiguredSubtopology to set the number of partitions for each changelog topic. + * + * @return the map of changelog topics for the requested topology that are internal and may need to be created. + */ + public Map<String, ConfiguredInternalTopic> setup() { + final Map<String, ConfiguredInternalTopic> changelogTopicMetadata = new HashMap<>(); + for (final Map.Entry<String, ConfiguredSubtopology> entry : subtopologies.entrySet()) { + final ConfiguredSubtopology configuredSubtopology = entry.getValue(); + + final OptionalInt maxNumPartitions = + configuredSubtopology.sourceTopics().stream().mapToInt(this::getPartitionCountOrFail).max(); + + if (!maxNumPartitions.isPresent()) { + throw new StreamsInvalidTopologyException("No source topics found for subtopology " + entry.getKey()); + } + for (final ConfiguredInternalTopic topicConfig : configuredSubtopology.nonSourceChangelogTopics()) { + changelogTopicMetadata.put(topicConfig.name(), topicConfig); + topicConfig.setNumberOfPartitions(maxNumPartitions.getAsInt()); + } + } + + log.debug("Expecting state changelog topics {} for the requested topology.", changelogTopicMetadata.values()); + return changelogTopicMetadata; + } + + private int getPartitionCountOrFail(String topic) { + final Integer topicPartitionCount = topicPartitionCountProvider.apply(topic); + if (topicPartitionCount == null) { + throw new StreamsMissingSourceTopicsException("No partition count for source topic " + topic); + } + return topicPartitionCount; + } +} \ No newline at end of file diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java new file mode 100644 index 00000000000..875b3192ea1 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.internals.Topic; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * ConfiguredInternalTopic captures the properties required for configuring the internal topics we create for change-logs and repartitioning + * etc. + * <p> + * It is derived from the topology sent by the client, and the current state of the topics inside the broker. If the topics on the broker + * changes, the internal topic may need to be reconfigured. + */ +public class ConfiguredInternalTopic { + + private final String name; + private final Map<String, String> topicConfigs; + private final Optional<Short> replicationFactor; + private final boolean enforceNumberOfPartitions; + private Optional<Integer> numberOfPartitions; + + public ConfiguredInternalTopic(final String name) { + this(name, Collections.emptyMap(), Optional.empty(), Optional.empty()); + } + + public ConfiguredInternalTopic(final String name, + final Map<String, String> topicConfigs) { + this(name, topicConfigs, Optional.empty(), Optional.empty()); + } + + public ConfiguredInternalTopic(final String name, + final Map<String, String> topicConfigs, + final Optional<Integer> numberOfPartitions, + final Optional<Short> replicationFactor) { + this.name = Objects.requireNonNull(name, "name can't be null"); + Topic.validate(name); + numberOfPartitions.ifPresent(ConfiguredInternalTopic::validateNumberOfPartitions); + this.topicConfigs = Objects.requireNonNull(topicConfigs, "topicConfigs can't be null"); + this.numberOfPartitions = numberOfPartitions; + this.replicationFactor = replicationFactor; + this.enforceNumberOfPartitions = numberOfPartitions.isPresent(); + } + + private static void validateNumberOfPartitions(final int numberOfPartitions) { + if (numberOfPartitions < 1) { + throw new IllegalArgumentException("Number of partitions must be at least 1."); + } + } + + public Map<String, String> topicConfigs() { + return topicConfigs; + } + + public boolean hasEnforcedNumberOfPartitions() { + return enforceNumberOfPartitions; + } + + public String name() { + return name; + } + + public Optional<Integer> numberOfPartitions() { + return numberOfPartitions; + } + + public Optional<Short> replicationFactor() { + return replicationFactor; + } + + public ConfiguredInternalTopic setNumberOfPartitions(final int numberOfPartitions) { + if (this.hasEnforcedNumberOfPartitions() + && this.numberOfPartitions.isPresent() + && this.numberOfPartitions.get() != numberOfPartitions) { + throw new UnsupportedOperationException( + "number of partitions are enforced on topic " + name() + " and can't be altered."); + } + + validateNumberOfPartitions(numberOfPartitions); + + this.numberOfPartitions = Optional.of(numberOfPartitions); + return this; + } + + @Override + public String toString() { + return "ConfiguredInternalTopic(" + + "name=" + name + + ", topicConfigs=" + topicConfigs + + ", numberOfPartitions=" + numberOfPartitions + + ", replicationFactor=" + replicationFactor + + ", enforceNumberOfPartitions=" + enforceNumberOfPartitions + + ")"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ConfiguredInternalTopic that = (ConfiguredInternalTopic) o; + return enforceNumberOfPartitions == that.enforceNumberOfPartitions + && Objects.equals(name, that.name) + && Objects.equals(topicConfigs, that.topicConfigs) + && Objects.equals(numberOfPartitions, that.numberOfPartitions) + && Objects.equals(replicationFactor, that.replicationFactor); + } + + @Override + public int hashCode() { + return Objects.hash(name, + topicConfigs, + numberOfPartitions, + replicationFactor, + enforceNumberOfPartitions); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java new file mode 100644 index 00000000000..1faba6c38ba --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Internal representation of a subtopology. + * <p> + * The subtopology is configured according to the number of partitions available in the source topics. It has regular expressions already + * resolved and defined exactly the information that is being used by streams groups assignment reconciliation. + * <p> + * Configured subtopologies may be recreated every time the input topics used by the subtopology are modified. + */ +public class ConfiguredSubtopology { + + private Set<String> repartitionSinkTopics; + private Set<String> sourceTopics; + private Map<String, ConfiguredInternalTopic> stateChangelogTopics; + private Map<String, ConfiguredInternalTopic> repartitionSourceTopics; + + public ConfiguredSubtopology() { + this.repartitionSinkTopics = new HashSet<>(); + this.sourceTopics = new HashSet<>(); + this.stateChangelogTopics = new HashMap<>(); + this.repartitionSourceTopics = new HashMap<>(); + } + + public ConfiguredSubtopology( + final Set<String> repartitionSinkTopics, + final Set<String> sourceTopics, + final Map<String, ConfiguredInternalTopic> repartitionSourceTopics, + final Map<String, ConfiguredInternalTopic> stateChangelogTopics + ) { + this.repartitionSinkTopics = repartitionSinkTopics; + this.sourceTopics = sourceTopics; + this.stateChangelogTopics = stateChangelogTopics; + this.repartitionSourceTopics = repartitionSourceTopics; + } + + public Set<String> repartitionSinkTopics() { + return repartitionSinkTopics; + } + + public Set<String> sourceTopics() { + return sourceTopics; + } + + public Map<String, ConfiguredInternalTopic> stateChangelogTopics() { + return stateChangelogTopics; + } + + public Map<String, ConfiguredInternalTopic> repartitionSourceTopics() { + return repartitionSourceTopics; + } + + public ConfiguredSubtopology setRepartitionSinkTopics(final Set<String> repartitionSinkTopics) { + this.repartitionSinkTopics = repartitionSinkTopics; + return this; + } + + public ConfiguredSubtopology setSourceTopics(final Set<String> sourceTopics) { + this.sourceTopics = sourceTopics; + return this; + } + + public ConfiguredSubtopology setStateChangelogTopics( + final Map<String, ConfiguredInternalTopic> stateChangelogTopics + ) { + this.stateChangelogTopics = stateChangelogTopics; + return this; + } + + public ConfiguredSubtopology setRepartitionSourceTopics( + final Map<String, ConfiguredInternalTopic> repartitionSourceTopics + ) { + this.repartitionSourceTopics = repartitionSourceTopics; + return this; + } + + /** + * Returns the config for any changelogs that must be prepared for this topic group, ie excluding any source topics that are reused as a + * changelog + */ + public Set<ConfiguredInternalTopic> nonSourceChangelogTopics() { + final Set<ConfiguredInternalTopic> topicConfigs = new HashSet<>(); + for (final Map.Entry<String, ConfiguredInternalTopic> changelogTopicEntry : stateChangelogTopics.entrySet()) { + if (!sourceTopics.contains(changelogTopicEntry.getKey())) { + topicConfigs.add(changelogTopicEntry.getValue()); + } + } + return topicConfigs; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConfiguredSubtopology that = (ConfiguredSubtopology) o; + return Objects.equals(repartitionSinkTopics, that.repartitionSinkTopics) + && Objects.equals(sourceTopics, that.sourceTopics) + && Objects.equals(stateChangelogTopics, that.stateChangelogTopics) + && Objects.equals(repartitionSourceTopics, that.repartitionSourceTopics); + } + + @Override + public int hashCode() { + return Objects.hash(repartitionSinkTopics, sourceTopics, + stateChangelogTopics, repartitionSourceTopics); + } + + @Override + public String toString() { + return "ConfiguredSubtopology{" + + "repartitionSinkTopics=" + repartitionSinkTopics + + ", sourceTopics=" + sourceTopics + + ", stateChangelogTopics=" + stateChangelogTopics + + ", repartitionSourceTopics=" + repartitionSourceTopics + + '}'; + } + +} \ No newline at end of file diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java new file mode 100644 index 00000000000..a74cff5d33e --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class is responsible for enforcing the number of partitions in copartitioned topics. + */ +public class CopartitionedTopicsEnforcer { + + private final Logger log; + private final Function<String, Integer> topicPartitionCountProvider; + + public CopartitionedTopicsEnforcer(final LogContext logContext, + final Function<String, Integer> topicPartitionCountProvider) { + this.log = logContext.logger(getClass()); + this.topicPartitionCountProvider = topicPartitionCountProvider; + } + + private static void maybeSetNumberOfPartitionsForInternalTopic(final int numPartitionsToUseForRepartitionTopics, + final ConfiguredInternalTopic config) { + if (!config.hasEnforcedNumberOfPartitions()) { + config.setNumberOfPartitions(numPartitionsToUseForRepartitionTopics); + } + } + + private static Supplier<StreamsInvalidTopologyException> emptyNumberOfPartitionsExceptionSupplier(final String topic) { + return () -> new StreamsInvalidTopologyException("Number of partitions is not set for topic: " + topic); + } + + /** + * Enforces the number of partitions for copartitioned topics. + * + * @param copartitionedTopics the set of copartitioned topics + * @param repartitionTopics a map from repartition topics to their internal topic configs + */ + public void enforce(final Set<String> copartitionedTopics, + final Map<String, ConfiguredInternalTopic> repartitionTopics) { + if (copartitionedTopics.isEmpty()) { + return; + } + + final Map<Object, ConfiguredInternalTopic> repartitionTopicConfigs = + copartitionedTopics.stream() + .filter(repartitionTopics::containsKey) + .collect(Collectors.toMap(topic -> topic, repartitionTopics::get)); + + final Map<String, Integer> nonRepartitionTopicPartitions = + copartitionedTopics.stream().filter(topic -> !repartitionTopics.containsKey(topic)) + .collect(Collectors.toMap(topic -> topic, topic -> { + final Integer topicPartitionCount = topicPartitionCountProvider.apply(topic); + if (topicPartitionCount == null) { + final String str = String.format("Topic not found: %s", topic); + log.error(str); + throw new StreamsInvalidTopologyException(str); + } else { + return topicPartitionCount; + } + })); + + final int numPartitionsToUseForRepartitionTopics; + final Collection<ConfiguredInternalTopic> configuredInternalTopics = repartitionTopicConfigs.values(); + + if (copartitionedTopics.equals(repartitionTopicConfigs.keySet())) { + final Collection<ConfiguredInternalTopic> configuredConfiguredInternalTopicsWithEnforcedNumberOfPartitions = + configuredInternalTopics + .stream() + .filter(ConfiguredInternalTopic::hasEnforcedNumberOfPartitions) + .collect(Collectors.toList()); + + // if there's at least one repartition topic with enforced number of partitions + // validate that they all have same number of partitions + if (!configuredConfiguredInternalTopicsWithEnforcedNumberOfPartitions.isEmpty()) { + numPartitionsToUseForRepartitionTopics = validateAndGetNumOfPartitions( + repartitionTopicConfigs, + configuredConfiguredInternalTopicsWithEnforcedNumberOfPartitions + ); + } else { + // If all topics for this co-partition group are repartition topics, + // then set the number of partitions to be the maximum of the number of partitions. + numPartitionsToUseForRepartitionTopics = getMaxPartitions(repartitionTopicConfigs); + } + } else { + // Otherwise, use the number of partitions from external topics (which must all be the same) + numPartitionsToUseForRepartitionTopics = getSamePartitions(nonRepartitionTopicPartitions); + } + + // coerce all the repartition topics to use the decided number of partitions. + for (final ConfiguredInternalTopic config : configuredInternalTopics) { + maybeSetNumberOfPartitionsForInternalTopic(numPartitionsToUseForRepartitionTopics, config); + + final int numberOfPartitionsOfInternalTopic = config + .numberOfPartitions() + .orElseThrow(emptyNumberOfPartitionsExceptionSupplier(config.name())); + + if (numberOfPartitionsOfInternalTopic != numPartitionsToUseForRepartitionTopics) { + final String msg = String.format("Number of partitions [%d] of repartition topic [%s] " + + "doesn't match number of partitions [%d] of the source topic.", + numberOfPartitionsOfInternalTopic, + config.name(), + numPartitionsToUseForRepartitionTopics); + throw new StreamsInconsistentInternalTopicsException(msg); + } + } + } + + private int validateAndGetNumOfPartitions(final Map<Object, ConfiguredInternalTopic> repartitionTopicConfigs, + final Collection<ConfiguredInternalTopic> configuredInternalTopics) { + final ConfiguredInternalTopic firstConfiguredInternalTopic = configuredInternalTopics.iterator().next(); + + final int firstNumberOfPartitionsOfInternalTopic = firstConfiguredInternalTopic + .numberOfPartitions() + .orElseThrow(emptyNumberOfPartitionsExceptionSupplier(firstConfiguredInternalTopic.name())); + + for (final ConfiguredInternalTopic configuredInternalTopic : configuredInternalTopics) { + final Integer numberOfPartitions = configuredInternalTopic + .numberOfPartitions() + .orElseThrow(emptyNumberOfPartitionsExceptionSupplier(configuredInternalTopic.name())); + + if (numberOfPartitions != firstNumberOfPartitionsOfInternalTopic) { + final Map<Object, Integer> repartitionTopics = repartitionTopicConfigs + .entrySet() + .stream() + .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().numberOfPartitions().get())); + + final String msg = String.format("Following topics do not have the same number of partitions: [%s]", + new TreeMap<>(repartitionTopics)); + throw new StreamsInconsistentInternalTopicsException(msg); + } + } + + return firstNumberOfPartitionsOfInternalTopic; + } + + private int getSamePartitions(final Map<String, Integer> nonRepartitionTopicsInCopartitionGroup) { + final int partitions = nonRepartitionTopicsInCopartitionGroup.values().iterator().next(); + for (final Entry<String, Integer> entry : nonRepartitionTopicsInCopartitionGroup.entrySet()) { + if (entry.getValue() != partitions) { + final TreeMap<String, Integer> sorted = new TreeMap<>(nonRepartitionTopicsInCopartitionGroup); + throw new StreamsInconsistentInternalTopicsException( + String.format("Topics not co-partitioned: [%s]", sorted) + ); + } + } + return partitions; + } + + private int getMaxPartitions(final Map<Object, ConfiguredInternalTopic> repartitionTopicsInCopartitionGroup) { + int maxPartitions = 0; + + for (final ConfiguredInternalTopic config : repartitionTopicsInCopartitionGroup.values()) { + final Optional<Integer> partitions = config.numberOfPartitions(); + maxPartitions = Integer.max(maxPartitions, partitions.orElse(maxPartitions)); + } + if (maxPartitions == 0) { + throw new StreamsInvalidTopologyException("All topics in the copartition group had undefined partition number: " + + repartitionTopicsInCopartitionGroup.keySet()); + } + return maxPartitions; + } + +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java new file mode 100644 index 00000000000..9485617261f --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class InternalTopicManager { + + public static Map<String, CreatableTopic> missingTopics(Map<String, ConfiguredSubtopology> subtopologyMap, + MetadataImage metadataImage) { + + final Map<String, CreatableTopic> topicsToCreate = new HashMap<>(); + for (ConfiguredSubtopology subtopology : subtopologyMap.values()) { + subtopology.repartitionSourceTopics().values() + .forEach(x -> topicsToCreate.put(x.name(), toCreatableTopic(x))); + subtopology.stateChangelogTopics().values() + .forEach(x -> topicsToCreate.put(x.name(), toCreatableTopic(x))); + } + // TODO: Validate if existing topics are compatible with the new topics + for (String topic : metadataImage.topics().topicsByName().keySet()) { + topicsToCreate.remove(topic); + } + return topicsToCreate; + } + + + public static Map<String, ConfiguredSubtopology> configureTopics(LogContext logContext, + List<StreamsGroupTopologyValue.Subtopology> subtopologyList, + MetadataImage metadataImage) { + + final Logger log = logContext.logger(InternalTopicManager.class); + + final Map<String, ConfiguredSubtopology> configuredSubtopologies = + subtopologyList.stream() + .collect(Collectors.toMap( + StreamsGroupTopologyValue.Subtopology::subtopologyId, + InternalTopicManager::fromPersistedSubtopology) + ); + + final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology = + subtopologyList.stream() + .collect(Collectors.toMap( + StreamsGroupTopologyValue.Subtopology::subtopologyId, + InternalTopicManager::copartitionGroupsFromPersistedSubtopology) + ); + + final Map<String, ConfiguredInternalTopic> configuredInternalTopics = + configuredSubtopologies.values().stream().flatMap(x -> + Stream.concat( + x.repartitionSourceTopics().values().stream(), + x.stateChangelogTopics().values().stream() + ) + ).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)); + + final Function<String, Integer> topicPartitionCountProvider = + topic -> getPartitionCount(metadataImage, topic, configuredInternalTopics); + + configureRepartitionTopics(logContext, configuredSubtopologies, topicPartitionCountProvider); + enforceCopartitioning(logContext, configuredSubtopologies, copartitionGroupsBySubtopology, topicPartitionCountProvider, log); + configureChangelogTopics(logContext, configuredSubtopologies, topicPartitionCountProvider); + + return configuredSubtopologies; + } + + + private static void configureRepartitionTopics(LogContext logContext, + Map<String, ConfiguredSubtopology> configuredSubtopologies, + Function<String, Integer> topicPartitionCountProvider) { + final RepartitionTopics repartitionTopics = new RepartitionTopics(logContext, + configuredSubtopologies, + topicPartitionCountProvider); + repartitionTopics.setup(); + } + + private static void enforceCopartitioning(LogContext logContext, + Map<String, ConfiguredSubtopology> configuredSubtopologies, + Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology, + Function<String, Integer> topicPartitionCountProvider, + Logger log) { + final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = new CopartitionedTopicsEnforcer( + logContext, topicPartitionCountProvider); + + final Map<String, ConfiguredInternalTopic> repartitionTopicConfigs = + configuredSubtopologies.values().stream().flatMap(x -> + x.repartitionSourceTopics().values().stream() + ).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)); + + if (repartitionTopicConfigs.isEmpty()) { + log.info("Skipping the repartition topic validation since there are no repartition topics."); + } else { + // ensure the co-partitioning topics within the group have the same number of partitions, + // and enforce the number of partitions for those repartition topics to be the same if they + // are co-partitioned as well. + for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) { + for (Set<String> copartitionGroup : copartitionGroups) { + copartitionedTopicsEnforcer.enforce(copartitionGroup, repartitionTopicConfigs); + } + } + } + } + + private static void configureChangelogTopics(LogContext logContext, + Map<String, ConfiguredSubtopology> configuredSubtopologies, + Function<String, Integer> topicPartitionCountProvider) { + final ChangelogTopics changelogTopics = new ChangelogTopics(logContext, + configuredSubtopologies, topicPartitionCountProvider); + changelogTopics.setup(); + } + + private static Integer getPartitionCount(MetadataImage metadataImage, + String topic, + Map<String, ConfiguredInternalTopic> configuredInternalTopics) { + final TopicImage topicImage = metadataImage.topics().getTopic(topic); + if (topicImage == null) { + if (configuredInternalTopics.containsKey(topic) && configuredInternalTopics.get(topic).numberOfPartitions().isPresent()) { + return configuredInternalTopics.get(topic).numberOfPartitions().get(); + } else { + return null; + } + } else { + return topicImage.partitions().size(); + } + } + + private static CreatableTopic toCreatableTopic(final ConfiguredInternalTopic config) { + + final CreatableTopic creatableTopic = new CreatableTopic(); + + creatableTopic.setName(config.name()); + + if (!config.numberOfPartitions().isPresent()) { + throw new IllegalStateException( + "Number of partitions must be set for topic " + config.name()); + } else { + creatableTopic.setNumPartitions(config.numberOfPartitions().get()); + } + + if (config.replicationFactor().isPresent() && config.replicationFactor().get() != 0) { + creatableTopic.setReplicationFactor(config.replicationFactor().get()); + } else { + creatableTopic.setReplicationFactor((short) -1); + } + + final CreatableTopicConfigCollection topicConfigs = new CreatableTopicConfigCollection(); + + config.topicConfigs().forEach((k, v) -> { + final CreatableTopicConfig topicConfig = new CreatableTopicConfig(); + topicConfig.setName(k); + topicConfig.setValue(v); + topicConfigs.add(topicConfig); + }); + + creatableTopic.setConfigs(topicConfigs); + + return creatableTopic; + } + + private static ConfiguredSubtopology fromPersistedSubtopology( + final StreamsGroupTopologyValue.Subtopology subtopology) { + // TODO: Need to resolve regular expressions here. + return new ConfiguredSubtopology( + new HashSet<>(subtopology.repartitionSinkTopics()), + new HashSet<>(subtopology.sourceTopics()), + subtopology.repartitionSourceTopics().stream() + .map(InternalTopicManager::fromPersistedTopicInfo) + .collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)), + subtopology.stateChangelogTopics().stream() + .map(InternalTopicManager::fromPersistedTopicInfo) + .collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)) + ); + } + + private static ConfiguredInternalTopic fromPersistedTopicInfo( + final StreamsGroupTopologyValue.TopicInfo topicInfo) { + return new ConfiguredInternalTopic( + topicInfo.name(), + topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() + .collect(Collectors.toMap(StreamsGroupTopologyValue.TopicConfig::key, + StreamsGroupTopologyValue.TopicConfig::value)) + : Collections.emptyMap(), + topicInfo.partitions() == 0 ? Optional.empty() : Optional.of(topicInfo.partitions()), + topicInfo.replicationFactor() == 0 ? Optional.empty() + : Optional.of(topicInfo.replicationFactor())); + } + + private static Collection<Set<String>> copartitionGroupsFromPersistedSubtopology( + final StreamsGroupTopologyValue.Subtopology subtopology) { + return subtopology.copartitionGroups().stream().map(copartitionGroup -> + Stream.concat( + copartitionGroup.sourceTopics().stream() + .map(i -> subtopology.sourceTopics().get(i)), + copartitionGroup.repartitionSourceTopics().stream() + .map(i -> subtopology.repartitionSourceTopics().get(i).name()) + ).collect(Collectors.toSet()) + ).collect(Collectors.toList()); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java new file mode 100644 index 00000000000..022e7a3a5e6 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * This class is responsible for configuring the number of partitions in repartitioning topics. + */ +public class RepartitionTopics { + + private final Logger log; + private final Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology; + private final Function<String, Integer> topicPartitionCountProvider; + + private final Map<String, Set<String>> missingInputTopicsBySubtopology = new HashMap<>(); + + public RepartitionTopics(final LogContext logContext, + final Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology, + final Function<String, Integer> topicPartitionCountProvider) { + this.log = logContext.logger(getClass()); + this.subtopologyToConfiguredSubtopology = subtopologyToConfiguredSubtopology; + this.topicPartitionCountProvider = topicPartitionCountProvider; + } + + /** + * Modifies the provided ConfiguredSubtopology to set the number of partitions for each repartition topic. + * + * @return the map of repartition topics for the requested topology that are internal and may need to be created. + */ + public Map<String, ConfiguredInternalTopic> setup() { + final Set<String> missingSourceTopicsForTopology = new HashSet<>(); + final Map<String, ConfiguredInternalTopic> configuredRepartitionTopics = new HashMap<>(); + + for (final Map.Entry<String, ConfiguredSubtopology> subtopologyEntry : subtopologyToConfiguredSubtopology.entrySet()) { + final ConfiguredSubtopology configuredSubtopology = subtopologyEntry.getValue(); + + configuredRepartitionTopics.putAll( + configuredSubtopology.repartitionSourceTopics() + .values() + .stream() + .collect(Collectors.toMap(ConfiguredInternalTopic::name, topicConfig -> topicConfig))); + + final Set<String> missingSourceTopicsForSubtopology = computeMissingExternalSourceTopics(configuredSubtopology); + missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology); + if (!missingSourceTopicsForSubtopology.isEmpty()) { + final String subtopologyId = subtopologyEntry.getKey(); + missingInputTopicsBySubtopology.put(subtopologyId, missingSourceTopicsForSubtopology); + log.error("Subtopology {} has missing source topics {} and will be excluded from the current assignment, " + + "this can be due to the consumer client's metadata being stale or because they have " + + "not been created yet. Please verify that you have created all input topics; if they " + + "do exist, you just need to wait for the metadata to be updated, at which time a new " + + "rebalance will be kicked off automatically and the topology will be retried at that time.", + subtopologyId, missingSourceTopicsForSubtopology); + } + } + + if (missingSourceTopicsForTopology.isEmpty()) { + setRepartitionSourceTopicPartitionCount(configuredRepartitionTopics); + return configuredRepartitionTopics; + } else { + Set<String> missingSourceTopics = missingInputTopicsBySubtopology.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + throw new StreamsMissingSourceTopicsException(String.format("Missing source topics: %s", + String.join(", ", missingSourceTopics))); + } + } + + private Set<String> computeMissingExternalSourceTopics(final ConfiguredSubtopology configuredSubtopology) { + final Set<String> missingExternalSourceTopics = new HashSet<>(configuredSubtopology.sourceTopics()); + missingExternalSourceTopics.removeAll(configuredSubtopology.repartitionSourceTopics().keySet()); + missingExternalSourceTopics.removeIf(x -> topicPartitionCountProvider.apply(x) != null); + return missingExternalSourceTopics; + } + + /** + * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata + */ + private void setRepartitionSourceTopicPartitionCount(final Map<String, ConfiguredInternalTopic> repartitionTopicMetadata) { + boolean partitionCountNeeded; + do { + partitionCountNeeded = false; + // avoid infinitely looping without making any progress on unknown repartitions + boolean progressMadeThisIteration = false; + + for (final ConfiguredSubtopology configuredSubtopology : subtopologyToConfiguredSubtopology.values()) { + for (final String repartitionSourceTopic : configuredSubtopology.repartitionSourceTopics() + .keySet()) { + final Optional<Integer> repartitionSourceTopicPartitionCount = + repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions(); + + if (!repartitionSourceTopicPartitionCount.isPresent()) { + final Integer numPartitions = computePartitionCount( + repartitionTopicMetadata, + repartitionSourceTopic + ); + + if (numPartitions == null) { + partitionCountNeeded = true; + log.trace("Unable to determine number of partitions for {}, another iteration is needed", + repartitionSourceTopic); + } else { + log.trace("Determined number of partitions for {} to be {}", + repartitionSourceTopic, + numPartitions); + repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions); + progressMadeThisIteration = true; + } + } + } + } + if (!progressMadeThisIteration && partitionCountNeeded) { + throw new StreamsMissingSourceTopicsException("Failed to compute number of partitions for all " + + "repartition topics, make sure all user input topics are created and all pattern subscriptions " + + "match at least one topic in the cluster"); + } + } while (partitionCountNeeded); + } + + private Integer computePartitionCount(final Map<String, ConfiguredInternalTopic> repartitionTopicMetadata, + final String repartitionSourceTopic) { + Integer partitionCount = null; + // try set the number of partitions for this repartition topic if it is not set yet + for (final ConfiguredSubtopology configuredSubtopology : subtopologyToConfiguredSubtopology.values()) { + final Set<String> repartitionSinkTopics = configuredSubtopology.repartitionSinkTopics(); + + if (repartitionSinkTopics.contains(repartitionSourceTopic)) { + // if this topic is one of the sink topics of this topology, + // use the maximum of all its source topic partitions as the number of partitions + for (final String upstreamSourceTopic : configuredSubtopology.sourceTopics()) { + Integer numPartitionsCandidate = null; + // It is possible the sourceTopic is another internal topic, i.e, + // map().join().join(map()) + if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) { + if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) { + numPartitionsCandidate = + repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get(); + } + } else { + numPartitionsCandidate = topicPartitionCountProvider.apply(upstreamSourceTopic); + } + + if (numPartitionsCandidate != null) { + if (partitionCount == null || numPartitionsCandidate > partitionCount) { + partitionCount = numPartitionsCandidate; + } + } + } + } + } + return partitionCount; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java new file mode 100644 index 00000000000..19584c53fef --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ChangelogTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME = "source"; + private static final String SINK_TOPIC_NAME = "sink"; + private static final String REPARTITION_TOPIC_NAME = "repartition"; + private static final String CHANGELOG_TOPIC_NAME1 = "changelog1"; + private static final Map<String, String> TOPIC_CONFIG = Collections.singletonMap("config1", "val1"); + private static final ConfiguredInternalTopic REPARTITION_TOPIC_CONFIG = + new ConfiguredInternalTopic(REPARTITION_TOPIC_NAME, TOPIC_CONFIG); + private static final ConfiguredSubtopology TOPICS_INFO_NOSOURCE = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME), + Collections.emptySet(), + mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)), + mkMap() + ); + private static final ConfiguredSubtopology TOPICS_INFO_STATELESS = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME), + Collections.singleton(SOURCE_TOPIC_NAME), + mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)), + mkMap() + ); + private static final ConfiguredInternalTopic CHANGELOG_TOPIC_CONFIG = + new ConfiguredInternalTopic(CHANGELOG_TOPIC_NAME1, TOPIC_CONFIG); + private static final ConfiguredSubtopology TOPICS_INFO_STATEFUL = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME), + Collections.singleton(SOURCE_TOPIC_NAME), + mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)), + mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)) + ); + private static final ConfiguredSubtopology TOPICS_INFO_SOURCE_CHANGELOG = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME), + Collections.singleton(SOURCE_TOPIC_NAME), + mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)), + mkMap(mkEntry(SOURCE_TOPIC_NAME, CHANGELOG_TOPIC_CONFIG)) + ); + private static final ConfiguredSubtopology TOPICS_INFO_BOTH = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME), + Collections.singleton(SOURCE_TOPIC_NAME), + mkMap(mkEntry(REPARTITION_TOPIC_NAME, REPARTITION_TOPIC_CONFIG)), + mkMap(mkEntry(SOURCE_TOPIC_NAME, null), mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)) + ); + + private static Integer topicPartitionProvider(String s) { + return 3; + } + + @Test + public void shouldFailIfNoSourceTopics() { + final Map<String, ConfiguredSubtopology> subtopologies = mkMap(mkEntry("subtopology_0", TOPICS_INFO_NOSOURCE)); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + StreamsInvalidTopologyException e = assertThrows(StreamsInvalidTopologyException.class, () -> changelogTopics.setup()); + + assertTrue(e.getMessage().contains("No source topics found for subtopology")); + } + + @Test + public void shouldNotContainChangelogsForStatelessTasks() { + final Map<String, ConfiguredSubtopology> subtopologies = mkMap(mkEntry("subtopology_0", TOPICS_INFO_STATELESS)); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup(); + + assertEquals(Collections.emptyMap(), setup); + } + + @Test + public void shouldContainNonSourceBasedChangelogs() { + final Map<String, ConfiguredSubtopology> subtopologies = mkMap(mkEntry("subtopology_0", TOPICS_INFO_STATEFUL)); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup(); + + assertEquals(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)), setup); + assertEquals(3, CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE)); + } + + @Test + public void shouldNotContainSourceBasedChangelogs() { + final Map<String, ConfiguredSubtopology> subtopologies = mkMap(mkEntry("subtopology_0", TOPICS_INFO_SOURCE_CHANGELOG)); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup(); + + assertEquals(Collections.emptyMap(), setup); + } + + @Test + public void shouldContainBothTypesOfPreExistingChangelogs() { + final Map<String, ConfiguredSubtopology> subtopologies = mkMap(mkEntry("subtopology_0", TOPICS_INFO_BOTH)); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map<String, ConfiguredInternalTopic> setup = changelogTopics.setup(); + + assertEquals(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)), setup); + assertEquals(3, CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE)); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java new file mode 100644 index 00000000000..12c8adb0a3b --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ConfiguredInternalTopicTest { + + @Test + public void testConstructorAndGetters() { + Map<String, String> topicConfigs = new HashMap<>(); + topicConfigs.put("retention.ms", "1000"); + topicConfigs.put("message.timestamp.type", "LogAppendTime"); + + ConfiguredInternalTopic config = new ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), Optional.of((short) 2)); + + assertEquals("test-topic", config.name()); + assertEquals(topicConfigs, config.topicConfigs()); + assertEquals(Optional.of(3), config.numberOfPartitions()); + assertEquals(Optional.of((short) 2), config.replicationFactor()); + } + + @Test + public void testConstructorWithNullName() { + assertThrows(NullPointerException.class, () -> new ConfiguredInternalTopic(null, Collections.emptyMap())); + } + + @Test + public void testConstructorWithInvalidName() { + assertThrows(InvalidTopicException.class, () -> new ConfiguredInternalTopic("invalid topic name", Collections.emptyMap())); + } + + @Test + public void testConstructorWithNullTopicConfigs() { + assertThrows(NullPointerException.class, () -> new ConfiguredInternalTopic("test-topic", null)); + } + + @Test + public void testSetNumberOfPartitions() { + ConfiguredInternalTopic config = new ConfiguredInternalTopic("test-topic", Collections.emptyMap()); + config.setNumberOfPartitions(3); + assertEquals(Optional.of(3), config.numberOfPartitions()); + } + + @Test + public void testSetNumberOfPartitionsInvalid() { + ConfiguredInternalTopic config = new ConfiguredInternalTopic("test-topic", Collections.emptyMap()); + assertThrows(IllegalArgumentException.class, () -> config.setNumberOfPartitions(0)); + } + + @Test + public void testSetNumberOfPartitionsUnsupportedOperation() { + ConfiguredInternalTopic config = new ConfiguredInternalTopic("test-topic", Collections.emptyMap(), Optional.of(3), + Optional.empty()); + assertThrows(UnsupportedOperationException.class, () -> config.setNumberOfPartitions(4)); + } + + @Test + public void testEqualsAndHashCode() { + Map<String, String> topicConfigs = new HashMap<>(); + topicConfigs.put("retention.ms", "1000"); + + ConfiguredInternalTopic config1 = new ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), Optional.of((short) 2)); + ConfiguredInternalTopic config2 = new ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), Optional.of((short) 2)); + + assertEquals(config1, config2); + assertEquals(config1.hashCode(), config2.hashCode()); + } + + @Test + public void testToString() { + Map<String, String> topicConfigs = new HashMap<>(); + topicConfigs.put("retention.ms", "1000"); + + ConfiguredInternalTopic config = new ConfiguredInternalTopic("test-topic", topicConfigs, Optional.of(3), Optional.of((short) 2)); + String expectedString = "ConfiguredInternalTopic(name=test-topic, topicConfigs={retention.ms=1000}, numberOfPartitions=Optional[3], replicationFactor=Optional[2], enforceNumberOfPartitions=true)"; + + assertEquals(expectedString, config.toString()); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java new file mode 100644 index 00000000000..71d05ce3d2a --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConfiguredSubtopologyTest { + + @Test + public void testConstructorAndGetters() { + Set<String> repartitionSinkTopics = Set.of("repartitionSinkTopic1", "repartitionSinkTopic2"); + Set<String> sourceTopics = Set.of("sourceTopic1", "sourceTopic2"); + Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new HashMap<>(); + Map<String, ConfiguredInternalTopic> stateChangelogTopics = new HashMap<>(); + + ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology(repartitionSinkTopics, sourceTopics, + repartitionSourceTopics, stateChangelogTopics); + + assertEquals(repartitionSinkTopics, configuredSubtopology.repartitionSinkTopics()); + assertEquals(sourceTopics, configuredSubtopology.sourceTopics()); + assertEquals(repartitionSourceTopics, configuredSubtopology.repartitionSourceTopics()); + assertEquals(stateChangelogTopics, configuredSubtopology.stateChangelogTopics()); + } + + @Test + public void testSetters() { + ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology(); + + Set<String> repartitionSinkTopics = Set.of("repartitionSinkTopic1", "repartitionSinkTopic2"); + configuredSubtopology.setRepartitionSinkTopics(repartitionSinkTopics); + assertEquals(repartitionSinkTopics, configuredSubtopology.repartitionSinkTopics()); + + Set<String> sourceTopics = Set.of("sourceTopic1", "sourceTopic2"); + configuredSubtopology.setSourceTopics(sourceTopics); + assertEquals(sourceTopics, configuredSubtopology.sourceTopics()); + + Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new HashMap<>(); + configuredSubtopology.setRepartitionSourceTopics(repartitionSourceTopics); + assertEquals(repartitionSourceTopics, configuredSubtopology.repartitionSourceTopics()); + + Map<String, ConfiguredInternalTopic> stateChangelogTopics = new HashMap<>(); + configuredSubtopology.setStateChangelogTopics(stateChangelogTopics); + assertEquals(stateChangelogTopics, configuredSubtopology.stateChangelogTopics()); + } + + @Test + public void testNonSourceChangelogTopics() { + Map<String, ConfiguredInternalTopic> stateChangelogTopics = new HashMap<>(); + stateChangelogTopics.put("changelogTopic1", new ConfiguredInternalTopic("changelogTopic1")); + stateChangelogTopics.put("sourceTopic1", new ConfiguredInternalTopic("sourceTopic1")); + + ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology( + Collections.emptySet(), + Collections.singleton("sourceTopic1"), + Collections.emptyMap(), + stateChangelogTopics + ); + + Set<ConfiguredInternalTopic> nonSourceChangelogTopics = configuredSubtopology.nonSourceChangelogTopics(); + assertEquals(1, nonSourceChangelogTopics.size()); + assertTrue(nonSourceChangelogTopics.contains(new ConfiguredInternalTopic("changelogTopic1"))); + } + + @Test + public void testEquals() { + Set<String> repartitionSinkTopics = new HashSet<>(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2")); + Set<String> sourceTopics = new HashSet<>(Arrays.asList("sourceTopic1", "sourceTopic2")); + Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new HashMap<>(); + Map<String, ConfiguredInternalTopic> stateChangelogTopics = new HashMap<>(); + + ConfiguredSubtopology configuredSubtopology1 = new ConfiguredSubtopology(repartitionSinkTopics, sourceTopics, + repartitionSourceTopics, stateChangelogTopics); + ConfiguredSubtopology configuredSubtopology2 = new ConfiguredSubtopology(repartitionSinkTopics, sourceTopics, + repartitionSourceTopics, stateChangelogTopics); + + assertEquals(configuredSubtopology1, configuredSubtopology2); + } + + @Test + public void testHashCode() { + Set<String> repartitionSinkTopics = new HashSet<>(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2")); + Set<String> sourceTopics = new HashSet<>(Arrays.asList("sourceTopic1", "sourceTopic2")); + Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new HashMap<>(); + Map<String, ConfiguredInternalTopic> stateChangelogTopics = new HashMap<>(); + + ConfiguredSubtopology configuredSubtopology1 = new ConfiguredSubtopology(repartitionSinkTopics, sourceTopics, + repartitionSourceTopics, stateChangelogTopics); + ConfiguredSubtopology configuredSubtopology2 = new ConfiguredSubtopology(repartitionSinkTopics, sourceTopics, + repartitionSourceTopics, stateChangelogTopics); + + assertEquals(configuredSubtopology1.hashCode(), configuredSubtopology2.hashCode()); + } + + @Test + public void testToString() { + Set<String> repartitionSinkTopics = new HashSet<>(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2")); + Set<String> sourceTopics = new HashSet<>(Arrays.asList("sourceTopic1", "sourceTopic2")); + Map<String, ConfiguredInternalTopic> repartitionSourceTopics = new HashMap<>(); + Map<String, ConfiguredInternalTopic> stateChangelogTopics = new HashMap<>(); + + ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology(repartitionSinkTopics, sourceTopics, + repartitionSourceTopics, stateChangelogTopics); + + String expectedString = "ConfiguredSubtopology{" + + "repartitionSinkTopics=" + repartitionSinkTopics + + ", sourceTopics=" + sourceTopics + + ", stateChangelogTopics=" + stateChangelogTopics + + ", repartitionSourceTopics=" + repartitionSourceTopics + + '}'; + + assertEquals(expectedString, configuredSubtopology.toString()); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java new file mode 100644 index 00000000000..1e476247c49 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("OptionalGetWithoutIsPresent") +public class CopartitionedTopicsEnforcerTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + + private static Integer emptyTopicPartitionProvider(String topic) { + return null; + } + + private static Integer firstSecondTopicConsistent(String topic) { + if (topic.equals("first") || topic.equals("second")) { + return 2; + } + return null; + } + + private static Integer firstSecondTopicInconsistent(String topic) { + if (topic.equals("first")) { + return 2; + } + if (topic.equals("second")) { + return 1; + } + return null; + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider); + assertThrows(StreamsInvalidTopologyException.class, () -> validator.enforce(Collections.singleton("topic"), + Collections.emptyMap())); + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent); + assertThrows(StreamsInconsistentInternalTopicsException.class, () -> validator.enforce(Set.of("first", "second"), + Collections.emptyMap())); + } + + + @Test + public void shouldEnforceCopartitioningOnRepartitionTopics() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic config = createTopicConfig("repartitioned", 10); + + validator.enforce(Set.of("first", "second", config.name()), + Collections.singletonMap(config.name(), config)); + + assertEquals(Optional.of(2), config.numberOfPartitions()); + } + + + @Test + public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider); + final ConfiguredInternalTopic one = createTopicConfig("one", 1); + final ConfiguredInternalTopic two = createTopicConfig("two", 15); + final ConfiguredInternalTopic three = createTopicConfig("three", 5); + final Map<String, ConfiguredInternalTopic> configuredInternalTopics = new HashMap<>(); + + configuredInternalTopics.put(one.name(), one); + configuredInternalTopics.put(two.name(), two); + configuredInternalTopics.put(three.name(), three); + + validator.enforce(Set.of( + one.name(), + two.name(), + three.name() + ), + configuredInternalTopics + ); + + assertEquals(Optional.of(15), one.numberOfPartitions()); + assertEquals(Optional.of(15), two.numberOfPartitions()); + assertEquals(Optional.of(15), three.numberOfPartitions()); + } + + @Test + public void shouldThrowAnExceptionIfConfiguredInternalTopicsWithEnforcedNumOfPartitionsHaveDifferentNumOfPartitions() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic topic1 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 10); + final ConfiguredInternalTopic topic2 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-2", 5); + + final StreamsInconsistentInternalTopicsException ex = assertThrows( + StreamsInconsistentInternalTopicsException.class, + () -> validator.enforce(Set.of(topic1.name(), topic2.name()), + Utils.mkMap( + Utils.mkEntry(topic1.name(), topic1), + Utils.mkEntry(topic2.name(), topic2) + ) + ) + ); + + final TreeMap<String, Integer> sorted = new TreeMap<>( + Utils.mkMap(Utils.mkEntry(topic1.name(), topic1.numberOfPartitions().get()), + Utils.mkEntry(topic2.name(), topic2.numberOfPartitions().get())) + ); + + assertEquals(String.format( + "Following topics do not have the same number of partitions: " + + "[%s]", sorted), ex.getMessage()); + } + + @Test + public void shouldNotThrowAnExceptionWhenConfiguredInternalTopicsWithEnforcedNumOfPartitionsAreValid() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic topic1 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 10); + final ConfiguredInternalTopic topic2 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-2", 10); + + validator.enforce(Set.of(topic1.name(), topic2.name()), + Utils.mkMap( + Utils.mkEntry(topic1.name(), topic1), + Utils.mkEntry(topic2.name(), topic2) + ) + ); + + assertEquals(Optional.of(10), topic1.numberOfPartitions()); + assertEquals(Optional.of(10), topic2.numberOfPartitions()); + } + + @Test + public void shouldThrowAnExceptionWhenNumberOfPartitionsOfNonRepartitionTopicAndRepartitionTopicWithEnforcedNumOfPartitionsDoNotMatch() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic topic1 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 10); + + final StreamsInconsistentInternalTopicsException ex = assertThrows( + StreamsInconsistentInternalTopicsException.class, + () -> validator.enforce(Set.of(topic1.name(), "second"), + Utils.mkMap(Utils.mkEntry(topic1.name(), topic1))) + ); + + assertEquals(String.format("Number of partitions [%s] " + + "of repartition topic [%s] " + + "doesn't match number of partitions [%s] of the source topic.", + topic1.numberOfPartitions().get(), topic1.name(), 2), ex.getMessage()); + } + + @Test + public void shouldNotThrowAnExceptionWhenNumberOfPartitionsOfNonRepartitionTopicAndRepartitionTopicWithEnforcedNumOfPartitionsMatch() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic topic1 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 2); + + validator.enforce(Set.of(topic1.name(), "second"), + Utils.mkMap(Utils.mkEntry(topic1.name(), topic1))); + + assertEquals(Optional.of(2), topic1.numberOfPartitions()); + } + + @Test + public void shouldDeductNumberOfPartitionsFromRepartitionTopicWithEnforcedNumberOfPartitions() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, + CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic topic1 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-1", 2); + final ConfiguredInternalTopic topic2 = createTopicConfig("repartitioned-2", 5); + final ConfiguredInternalTopic topic3 = createConfiguredInternalTopicWithEnforcedNumberOfPartitions("repartitioned-3", 2); + + validator.enforce(Set.of(topic1.name(), topic2.name()), + Utils.mkMap( + Utils.mkEntry(topic1.name(), topic1), + Utils.mkEntry(topic2.name(), topic2), + Utils.mkEntry(topic3.name(), topic3) + ) + ); + + assertEquals(topic1.numberOfPartitions(), topic2.numberOfPartitions()); + assertEquals(topic2.numberOfPartitions(), topic3.numberOfPartitions()); + } + + private ConfiguredInternalTopic createTopicConfig(final String repartitionTopic, + final int partitions) { + final ConfiguredInternalTopic config = + new ConfiguredInternalTopic(repartitionTopic, Collections.emptyMap()); + + config.setNumberOfPartitions(partitions); + return config; + } + + private ConfiguredInternalTopic createConfiguredInternalTopicWithEnforcedNumberOfPartitions(final String repartitionTopic, + final int partitions) { + return new ConfiguredInternalTopic(repartitionTopic, + Collections.emptyMap(), + Optional.of(partitions), + Optional.empty()); + } + +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java new file mode 100644 index 00000000000..e6ef6fee5d3 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.immutable.ImmutableMap; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class InternalTopicManagerTest { + + @Test + void testMissingTopics() { + MetadataImage metadataImage = mock(MetadataImage.class); + TopicsImage topicsImage = mock(TopicsImage.class); + when(metadataImage.topics()).thenReturn(topicsImage); + final TopicImage sourceTopic1 = + new TopicImage("source_topic1", Uuid.randomUuid(), mkMap(mkEntry(0, null), mkEntry(1, null))); + final TopicImage sourceTopic2 = + new TopicImage("source_topic2", Uuid.randomUuid(), mkMap(mkEntry(0, null), mkEntry(1, null))); + final TopicImage stateChangelogTopic2 = new TopicImage("state_changelog_topic2", Uuid.randomUuid(), + mkMap(mkEntry(0, null), mkEntry(1, null))); + when(topicsImage.topicsByName()).thenReturn( + ImmutableMap.singleton("source_topic1", sourceTopic1) + .updated("source_topic2", sourceTopic2) + .updated("state_changelog_topic2", stateChangelogTopic2) + ); + when(topicsImage.getTopic(eq("source_topic1"))).thenReturn(sourceTopic1); + when(topicsImage.getTopic(eq("source_topic2"))).thenReturn(sourceTopic2); + when(topicsImage.getTopic(eq("state_changelog_topic2"))).thenReturn(stateChangelogTopic2); + Map<String, ConfiguredSubtopology> subtopologyMap = makeExpectedConfiguredTopology(); + + Map<String, CreatableTopic> missingTopics = InternalTopicManager.missingTopics(subtopologyMap, metadataImage); + + assertEquals(2, missingTopics.size()); + assertEquals( + new CreatableTopic() + .setName("repartition_topic") + .setNumPartitions(2) + .setReplicationFactor((short) 3), + missingTopics.get("repartition_topic") + ); + assertEquals( + new CreatableTopic() + .setName("state_changelog_topic1") + .setNumPartitions(2) + .setReplicationFactor((short) -1) + .setConfigs( + new CreatableTopicConfigCollection( + Collections.singletonList(new CreatableTopicConfig().setName("cleanup.policy").setValue("compact")).iterator()) + ), + missingTopics.get("state_changelog_topic1")); + } + + @Test + void testConfigureTopics() { + MetadataImage metadataImage = mock(MetadataImage.class); + TopicsImage topicsImage = mock(TopicsImage.class); + when(metadataImage.topics()).thenReturn(topicsImage); + when(topicsImage.getTopic(eq("source_topic1"))).thenReturn( + new TopicImage("source_topic1", Uuid.randomUuid(), mkMap(mkEntry(0, null), mkEntry(1, null)))); + when(topicsImage.getTopic(eq("source_topic2"))).thenReturn( + new TopicImage("source_topic2", Uuid.randomUuid(), mkMap(mkEntry(0, null), mkEntry(1, null)))); + List<Subtopology> subtopologyList = makeTestTopology(); + + Map<String, ConfiguredSubtopology> configuredSubtopologies = + InternalTopicManager.configureTopics(new LogContext(), subtopologyList, metadataImage); + + Map<String, ConfiguredSubtopology> expectedConfiguredSubtopologyMap = makeExpectedConfiguredTopology(); + assertEquals(expectedConfiguredSubtopologyMap, configuredSubtopologies); + } + + private static Map<String, ConfiguredSubtopology> makeExpectedConfiguredTopology() { + return mkMap( + mkEntry("subtopology1", + new ConfiguredSubtopology() + .setSourceTopics(Set.of("source_topic1")) + .setStateChangelogTopics(Collections.singletonMap("state_changelog_topic1", + new ConfiguredInternalTopic("state_changelog_topic1", + Collections.singletonMap("cleanup.policy", "compact"), + Optional.empty(), + Optional.empty() + ).setNumberOfPartitions(2))) + .setRepartitionSinkTopics(Set.of("repartition_topic")) + ), + mkEntry("subtopology2", + new ConfiguredSubtopology() + .setSourceTopics(Set.of("source_topic2")) + .setRepartitionSourceTopics(Collections.singletonMap("repartition_topic", + new ConfiguredInternalTopic("repartition_topic", + Collections.emptyMap(), + Optional.empty(), + Optional.of((short) 3) + ).setNumberOfPartitions(2) + )) + .setStateChangelogTopics(Collections.singletonMap("state_changelog_topic2", + new ConfiguredInternalTopic("state_changelog_topic2", + Collections.emptyMap(), + Optional.empty(), + Optional.empty() + ).setNumberOfPartitions(2))) + ) + ); + } + + private static List<Subtopology> makeTestTopology() { + // Create a subtopology source -> repartition + Subtopology subtopology1 = new Subtopology() + .setSubtopologyId("subtopology1") + .setSourceTopics(Collections.singletonList("source_topic1")) + .setRepartitionSinkTopics(Collections.singletonList("repartition_topic")) + .setStateChangelogTopics(Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName("state_changelog_topic1") + .setTopicConfigs(Collections.singletonList( + new StreamsGroupTopologyValue.TopicConfig() + .setKey("cleanup.policy") + .setValue("compact") + )) + )); + // Create a subtopology repartition/source2 -> sink (copartitioned) + Subtopology subtopology2 = new Subtopology() + .setSubtopologyId("subtopology2") + .setSourceTopics(Collections.singletonList("source_topic2")) + .setRepartitionSourceTopics(Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName("repartition_topic") + .setReplicationFactor((short) 3) + )) + .setStateChangelogTopics(Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName("state_changelog_topic2") + )) + .setCopartitionGroups(Collections.singletonList( + new StreamsGroupTopologyValue.CopartitionGroup() + .setSourceTopics(Collections.singletonList((short) 0)) + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + )); + return Arrays.asList(subtopology1, subtopology2); + } + +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java new file mode 100644 index 00000000000..54f91508d2d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; +import org.apache.kafka.common.utils.LogContext; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RepartitionTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME1 = "source1"; + private static final String SOURCE_TOPIC_NAME2 = "source2"; + private static final String SINK_TOPIC_NAME1 = "sink1"; + private static final String SINK_TOPIC_NAME2 = "sink2"; + private static final String REPARTITION_TOPIC_NAME1 = "repartition1"; + private static final String REPARTITION_TOPIC_NAME2 = "repartition2"; + private static final String REPARTITION_WITHOUT_PARTITION_COUNT = "repartitionWithoutPartitionCount"; + private static final Map<String, String> TOPIC_CONFIG1 = Collections.singletonMap("config1", "val1"); + private static final Map<String, String> TOPIC_CONFIG2 = Collections.singletonMap("config2", "val2"); + private static final Map<String, String> TOPIC_CONFIG5 = Collections.singletonMap("config5", "val5"); + private static final ConfiguredInternalTopic REPARTITION_TOPIC_CONFIG1 = + new ConfiguredInternalTopic(REPARTITION_TOPIC_NAME1, TOPIC_CONFIG1, Optional.of(4), Optional.empty()); + private static final ConfiguredInternalTopic REPARTITION_TOPIC_CONFIG2 = + new ConfiguredInternalTopic(REPARTITION_TOPIC_NAME2, TOPIC_CONFIG2, Optional.of(2), Optional.empty()); + private static final ConfiguredInternalTopic REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT = + new ConfiguredInternalTopic(REPARTITION_WITHOUT_PARTITION_COUNT, TOPIC_CONFIG5); + private static final ConfiguredSubtopology TOPICS_INFO_WITHOUT_PARTITION_COUNT = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME2), + Set.of(REPARTITION_TOPIC_NAME1, REPARTITION_WITHOUT_PARTITION_COUNT), + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1), + mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT) + ), + Collections.emptyMap() + ); + private static final ConfiguredSubtopology TOPICS_INFO1 = new ConfiguredSubtopology( + Collections.singleton(REPARTITION_TOPIC_NAME1), + Set.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2), + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2) + ), + Collections.emptyMap() + ); + private static final ConfiguredSubtopology TOPICS_INFO2 = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME1), + Collections.singleton(REPARTITION_TOPIC_NAME1), + mkMap(mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)), + Collections.emptyMap() + ); + private static final Set<String> TOPICS = Set.of( + SOURCE_TOPIC_NAME1, + SOURCE_TOPIC_NAME2, + SINK_TOPIC_NAME1, + SINK_TOPIC_NAME2, + REPARTITION_TOPIC_NAME1, + REPARTITION_TOPIC_NAME2 + ); + + @Test + public void shouldSetupRepartitionTopics() { + Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology = mkMap(mkEntry("subtopology_0", TOPICS_INFO1), + mkEntry("subtopology_1", TOPICS_INFO2)); + Function<String, Integer> topicPartitionCountProvider = s -> TOPICS.contains(s) ? 3 : null; + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToConfiguredSubtopology, + topicPartitionCountProvider + ); + + Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2) + ), + setup + ); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() { + Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology = mkMap(mkEntry("subtopology_0", TOPICS_INFO1), + mkEntry("subtopology_1", TOPICS_INFO2)); + Function<String, Integer> topicPartitionCountProvider = s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? null + : (TOPICS.contains(s) ? 3 : null); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToConfiguredSubtopology, + topicPartitionCountProvider + ); + + final StreamsMissingSourceTopicsException exception = assertThrows(StreamsMissingSourceTopicsException.class, + repartitionTopics::setup); + + assertNotNull(exception); + assertEquals("Missing source topics: source1", exception.getMessage()); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() { + Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology = mkMap( + mkEntry("subtopology_0", TOPICS_INFO1), + mkEntry("subtopology_1", TOPICS_INFO_WITHOUT_PARTITION_COUNT) + ); + Function<String, Integer> topicPartitionCountProvider = s -> TOPICS.contains(s) ? 3 : null; + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToConfiguredSubtopology, + topicPartitionCountProvider + ); + + StreamsMissingSourceTopicsException exception = assertThrows(StreamsMissingSourceTopicsException.class, repartitionTopics::setup); + + assertEquals( + "Failed to compute number of partitions for all repartition topics, make sure all user input topics are created and all pattern subscriptions match at least one topic in the cluster", + exception.getMessage() + ); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() { + final ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology( + Set.of(REPARTITION_TOPIC_NAME1, REPARTITION_WITHOUT_PARTITION_COUNT), + Set.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME2), + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2), + mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT) + ), + Collections.emptyMap() + ); + Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology = mkMap( + mkEntry("subtopology_0", configuredSubtopology), + mkEntry("subtopology_1", TOPICS_INFO_WITHOUT_PARTITION_COUNT) + ); + Function<String, Integer> topicPartitionCountProvider = s -> TOPICS.contains(s) ? 3 : null; + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToConfiguredSubtopology, + topicPartitionCountProvider + ); + + Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup(); + + assertEquals(mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2), + mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT) + ), setup); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic() { + final ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology( + Set.of(REPARTITION_TOPIC_NAME2, REPARTITION_WITHOUT_PARTITION_COUNT), + Set.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME1), + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2), + mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT) + ), + Collections.emptyMap() + ); + Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology = mkMap( + mkEntry("subtopology_0", configuredSubtopology), + mkEntry("subtopology_1", TOPICS_INFO_WITHOUT_PARTITION_COUNT) + ); + Function<String, Integer> topicPartitionCountProvider = s -> TOPICS.contains(s) ? 3 : null; + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToConfiguredSubtopology, + topicPartitionCountProvider + ); + + Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2), + mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, REPARTITION_TOPIC_CONFIG_WITHOUT_PARTITION_COUNT) + ), + setup + ); + } + + @Test + public void shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartitionTopics() { + final ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology( + Collections.singleton(SINK_TOPIC_NAME1), + Collections.singleton(SOURCE_TOPIC_NAME1), + Collections.emptyMap(), + Collections.emptyMap() + ); + Map<String, ConfiguredSubtopology> subtopologyToConfiguredSubtopology = mkMap(mkEntry("subtopology_0", configuredSubtopology)); + Function<String, Integer> topicPartitionCountProvider = s -> TOPICS.contains(s) ? 3 : null; + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToConfiguredSubtopology, + topicPartitionCountProvider + ); + + Map<String, ConfiguredInternalTopic> setup = repartitionTopics.setup(); + + assertEquals(Collections.emptyMap(), setup); + } + +} \ No newline at end of file
