This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3bda9f817d2 KAFKA-18311: Configuring repartition topics (3/N) (#18395)
3bda9f817d2 is described below
commit 3bda9f817d28846e4ba7e2bd4c88398fb5ef6c74
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 9 13:56:37 2025 +0100
KAFKA-18311: Configuring repartition topics (3/N) (#18395)
A simplified port of "RepartitionTopics" from the client-side to the group
coordinator.
Compared to the client-side version, the implementation uses immutable data
structures, and returns the computed number of partitions instead of modifying
mutable data structures and calling the admin client.
Reviewers: Bruno Cadonna <[email protected]>
---
.../group/streams/topics/RepartitionTopics.java | 178 ++++++++++++++++++
.../streams/topics/RepartitionTopicsTest.java | 206 +++++++++++++++++++++
2 files changed, 384 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
new file mode 100644
index 00000000000..d1fefe67864
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Responsible for configuring the number of partitions in repartitioning
topics. It computes a fix-point iteration, deriving the number of
+ * partitions for each repartition topic based on the number of partitions of
the source topics of the topology, if the number of
+ * partitions is not explicitly set in the topology.
+ */
+public class RepartitionTopics {
+
+ private final Logger log;
+ private final Collection<Subtopology> subtopologies;
+ private final Function<String, OptionalInt> topicPartitionCountProvider;
+
+ /**
+ * The constructor for the class.
+ *
+ * @param logContext The context for emitting log
messages.
+ * @param subtopologies The subtopologies for the requested
topology.
+ * @param topicPartitionCountProvider Returns the number of partitions
for a given topic, representing the current state of the
+ * broker.
+ */
+ public RepartitionTopics(final LogContext logContext,
+ final Collection<Subtopology> subtopologies,
+ final Function<String, OptionalInt>
topicPartitionCountProvider) {
+ this.log = logContext.logger(getClass());
+ this.subtopologies = subtopologies;
+ this.topicPartitionCountProvider = topicPartitionCountProvider;
+ }
+
+ /**
+ * Returns the set of the number of partitions for each repartition topic.
+ *
+ * @return the map of repartition topics for the requested topology to
their required number of partitions.
+ *
+ * @throws TopicConfigurationException if no valid configuration can be
found given the broker state, for example, if a source topic
+ * is missing.
+ * @throws StreamsInvalidTopologyException if the number of partitions for
all repartition topics cannot be determined, e.g.
+ * because of loops, or if a repartition source topic is not a
sink topic of any subtopology.
+ */
+ public Map<String, Integer> setup() {
+ final Set<String> missingSourceTopicsForTopology = new HashSet<>();
+
+ for (final Subtopology subtopology : subtopologies) {
+ final Set<String> missingSourceTopicsForSubtopology =
computeMissingExternalSourceTopics(subtopology);
+
missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
+ }
+
+ if (!missingSourceTopicsForTopology.isEmpty()) {
+ throw
TopicConfigurationException.missingSourceTopics(String.format("Missing source
topics: %s",
+ String.join(", ", missingSourceTopicsForTopology)));
+ }
+
+ final Map<String, Integer> repartitionTopicPartitionCount =
computeRepartitionTopicPartitionCount();
+
+ for (final Subtopology subtopology : subtopologies) {
+ if
(subtopology.repartitionSourceTopics().stream().anyMatch(repartitionTopic ->
!repartitionTopicPartitionCount.containsKey(repartitionTopic.name()))) {
+ throw new StreamsInvalidTopologyException("Failed to compute
number of partitions for all repartition topics, because "
+ + "a repartition source topic is never used as a sink
topic.");
+ }
+ }
+
+ return repartitionTopicPartitionCount;
+ }
+
+ private Set<String> computeMissingExternalSourceTopics(final Subtopology
subtopology) {
+ final Set<String> missingExternalSourceTopics = new
HashSet<>(subtopology.sourceTopics());
+ for (final TopicInfo topicInfo :
subtopology.repartitionSourceTopics()) {
+ missingExternalSourceTopics.remove(topicInfo.name());
+ }
+ missingExternalSourceTopics.removeIf(x ->
topicPartitionCountProvider.apply(x).isPresent());
+ return missingExternalSourceTopics;
+ }
+
+ /**
+ * Computes the number of partitions and returns it for each repartition
topic.
+ */
+ private Map<String, Integer> computeRepartitionTopicPartitionCount() {
+ boolean partitionCountNeeded;
+ Map<String, Integer> repartitionTopicPartitionCounts = new HashMap<>();
+
+ for (final Subtopology subtopology : subtopologies) {
+ for (final TopicInfo repartitionSourceTopic :
subtopology.repartitionSourceTopics()) {
+ if (repartitionSourceTopic.partitions() != 0) {
+
repartitionTopicPartitionCounts.put(repartitionSourceTopic.name(),
repartitionSourceTopic.partitions());
+ }
+ }
+ }
+
+ do {
+ partitionCountNeeded = false;
+ // avoid infinitely looping without making any progress on unknown
repartitions
+ boolean progressMadeThisIteration = false;
+
+ for (final Subtopology subtopology : subtopologies) {
+ for (final String repartitionSinkTopic :
subtopology.repartitionSinkTopics()) {
+ if
(!repartitionTopicPartitionCounts.containsKey(repartitionSinkTopic)) {
+ final Integer numPartitions = computePartitionCount(
+ repartitionTopicPartitionCounts,
+ subtopology
+ );
+
+ if (numPartitions == null) {
+ partitionCountNeeded = true;
+ log.trace("Unable to determine number of
partitions for {}, another iteration is needed",
+ repartitionSinkTopic);
+ } else {
+ log.trace("Determined number of partitions for {}
to be {}",
+ repartitionSinkTopic,
+ numPartitions);
+
repartitionTopicPartitionCounts.put(repartitionSinkTopic, numPartitions);
+ progressMadeThisIteration = true;
+ }
+ }
+ }
+ }
+ if (!progressMadeThisIteration && partitionCountNeeded) {
+ throw new StreamsInvalidTopologyException("Failed to compute
number of partitions for all " +
+ "repartition topics. There may be loops in the topology
that cannot be resolved.");
+ }
+ } while (partitionCountNeeded);
+
+ return repartitionTopicPartitionCounts;
+ }
+
+ private Integer computePartitionCount(final Map<String, Integer>
repartitionTopicPartitionCounts,
+ final Subtopology subtopology) {
+ Integer partitionCount = null;
+ // try set the number of partitions for this repartition topic if it
is not set yet
+ // use the maximum of all its source topic partitions as the number of
partitions
+
+ // It is possible that there is another internal topic, i.e,
+ // map().join().join(map())
+ for (final TopicInfo repartitionSourceTopic :
subtopology.repartitionSourceTopics()) {
+ Integer numPartitionsCandidate =
repartitionTopicPartitionCounts.get(repartitionSourceTopic.name());
+ if (numPartitionsCandidate != null && (partitionCount == null ||
numPartitionsCandidate > partitionCount)) {
+ partitionCount = numPartitionsCandidate;
+ }
+ }
+ for (final String externalSourceTopic : subtopology.sourceTopics()) {
+ final OptionalInt actualPartitionCount =
topicPartitionCountProvider.apply(externalSourceTopic);
+ if (actualPartitionCount.isPresent() && (partitionCount == null ||
actualPartitionCount.getAsInt() > partitionCount)) {
+ partitionCount = actualPartitionCount.getAsInt();
+ }
+ }
+ return partitionCount;
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
new file mode 100644
index 00000000000..8257f42dbae
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+ private static final LogContext LOG_CONTEXT = new LogContext();
+ private static final String SOURCE_TOPIC_NAME1 = "source1";
+ private static final String SOURCE_TOPIC_NAME2 = "source2";
+ private static final TopicInfo REPARTITION_TOPIC1 = new
TopicInfo().setName("repartition1").setPartitions(4);
+ private static final TopicInfo REPARTITION_TOPIC2 = new
TopicInfo().setName("repartition2").setPartitions(2);
+ private static final TopicInfo REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT =
new TopicInfo().setName("repartitionWithoutPartitionCount");
+
+ private static OptionalInt sourceTopicPartitionCounts(final String
topicName) {
+ return SOURCE_TOPIC_NAME1.equals(topicName) ||
SOURCE_TOPIC_NAME2.equals(topicName) ? OptionalInt.of(3) : OptionalInt.empty();
+ }
+
+ @Test
+ public void shouldSetupRepartitionTopics() {
+ final Subtopology subtopology1 = new Subtopology()
+ .setSubtopologyId("subtopology1")
+ .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+ .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
+ final Subtopology subtopology2 = new Subtopology()
+ .setSubtopologyId("subtopology2")
+ .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
+ final List<Subtopology> subtopologies = List.of(subtopology1,
subtopology2);
+ final RepartitionTopics repartitionTopics = new RepartitionTopics(
+ LOG_CONTEXT,
+ subtopologies,
+ RepartitionTopicsTest::sourceTopicPartitionCounts
+ );
+
+ final Map<String, Integer> setup = repartitionTopics.setup();
+
+ assertEquals(
+ Map.of(REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions()),
+ setup
+ );
+ }
+
+ @Test
+ public void
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+ final Subtopology subtopology1 = new Subtopology()
+ .setSubtopologyId("subtopology1")
+ .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+ .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
+ final Subtopology subtopology2 = new Subtopology()
+ .setSubtopologyId("subtopology2")
+ .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
+ final Function<String, OptionalInt> topicPartitionCountProvider =
+ s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() :
sourceTopicPartitionCounts(s);
+ final RepartitionTopics repartitionTopics = new RepartitionTopics(
+ LOG_CONTEXT,
+ List.of(subtopology1, subtopology2),
+ topicPartitionCountProvider
+ );
+
+ final TopicConfigurationException exception =
assertThrows(TopicConfigurationException.class,
+ repartitionTopics::setup);
+
+ assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+ assertEquals("Missing source topics: source1", exception.getMessage());
+ }
+
+ @Test
+ public void
shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToLoops()
{
+ final Subtopology subtopology1 = new Subtopology()
+ .setSubtopologyId("subtopology1")
+
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
+ final RepartitionTopics repartitionTopics = new RepartitionTopics(
+ LOG_CONTEXT,
+ List.of(subtopology1),
+ RepartitionTopicsTest::sourceTopicPartitionCounts
+ );
+
+ final StreamsInvalidTopologyException exception =
assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);
+
+ assertEquals(
+ "Failed to compute number of partitions for all repartition
topics. There may be loops in the topology that cannot be resolved.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void
shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToMissingSinks()
{
+ final Subtopology subtopology1 = new Subtopology()
+ .setSubtopologyId("subtopology1")
+
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
+ final RepartitionTopics repartitionTopics = new RepartitionTopics(
+ LOG_CONTEXT,
+ List.of(subtopology1),
+ RepartitionTopicsTest::sourceTopicPartitionCounts
+ );
+
+ final StreamsInvalidTopologyException exception =
assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);
+
+ assertEquals(
+ "Failed to compute number of partitions for all repartition
topics, because a repartition source topic is never used as a sink topic.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void
shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
+ final Subtopology subtopology = new Subtopology()
+ .setSubtopologyId("subtopology0")
+ .setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
+ .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name(),
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()))
+ .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC2));
+ final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
+ .setSubtopologyId("subtopologyWithoutPartitionCount")
+ .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1,
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
+ final RepartitionTopics repartitionTopics = new RepartitionTopics(
+ LOG_CONTEXT,
+ List.of(subtopology, subtopologyWithoutPartitionCount),
+ RepartitionTopicsTest::sourceTopicPartitionCounts
+ );
+
+ final Map<String, Integer> setup = repartitionTopics.setup();
+
+ assertEquals(Map.of(
+ REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
+ REPARTITION_TOPIC2.name(), REPARTITION_TOPIC2.partitions(),
+ REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(),
sourceTopicPartitionCounts(SOURCE_TOPIC_NAME1).getAsInt()
+ ), setup);
+ }
+
+ @Test
+ public void
shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic()
{
+ final Subtopology subtopology = new Subtopology()
+ .setSubtopologyId("subtopology0")
+ .setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
+ .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1))
+
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
+ final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
+ .setSubtopologyId("subtopologyWithoutPartitionCount")
+
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+ .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
+ final RepartitionTopics repartitionTopics = new RepartitionTopics(
+ LOG_CONTEXT,
+ List.of(subtopology, subtopologyWithoutPartitionCount),
+ RepartitionTopicsTest::sourceTopicPartitionCounts
+ );
+
+ final Map<String, Integer> setup = repartitionTopics.setup();
+
+ assertEquals(
+ Map.of(
+ REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
+ REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(),
REPARTITION_TOPIC1.partitions()
+ ),
+ setup
+ );
+ }
+
+ @Test
+ public void
shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartitionTopics()
{
+ final Subtopology subtopology = new Subtopology()
+ .setSubtopologyId("subtopology0")
+ .setSourceTopics(List.of(SOURCE_TOPIC_NAME1));
+ final RepartitionTopics repartitionTopics = new RepartitionTopics(
+ LOG_CONTEXT,
+ List.of(subtopology),
+ RepartitionTopicsTest::sourceTopicPartitionCounts
+ );
+
+ final Map<String, Integer> setup = repartitionTopics.setup();
+
+ assertEquals(Collections.emptyMap(), setup);
+ }
+
+}
\ No newline at end of file