This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3bda9f817d2 KAFKA-18311: Configuring repartition topics (3/N) (#18395)
3bda9f817d2 is described below

commit 3bda9f817d28846e4ba7e2bd4c88398fb5ef6c74
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 9 13:56:37 2025 +0100

    KAFKA-18311: Configuring repartition topics (3/N) (#18395)
    
    A simplified port of "RepartitionTopics" from the client-side to the group 
coordinator.
    
    Compared to the client-side version, the implementation uses immutable data 
structures, and returns the computed number of partitions instead of modifying 
mutable data structures and calling the admin client.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../group/streams/topics/RepartitionTopics.java    | 178 ++++++++++++++++++
 .../streams/topics/RepartitionTopicsTest.java      | 206 +++++++++++++++++++++
 2 files changed, 384 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
new file mode 100644
index 00000000000..d1fefe67864
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Responsible for configuring the number of partitions in repartitioning 
topics. It computes a fix-point iteration, deriving the number of
+ * partitions for each repartition topic based on the number of partitions of 
the source topics of the topology, if the number of
+ * partitions is not explicitly set in the topology.
+ */
+public class RepartitionTopics {
+
+    private final Logger log;
+    private final Collection<Subtopology> subtopologies;
+    private final Function<String, OptionalInt> topicPartitionCountProvider;
+
+    /**
+     * The constructor for the class.
+     *
+     * @param logContext                   The context for emitting log 
messages.
+     * @param subtopologies                The subtopologies for the requested 
topology.
+     * @param topicPartitionCountProvider  Returns the number of partitions 
for a given topic, representing the current state of the
+     *                                     broker.
+     */
+    public RepartitionTopics(final LogContext logContext,
+                             final Collection<Subtopology> subtopologies,
+                             final Function<String, OptionalInt> 
topicPartitionCountProvider) {
+        this.log = logContext.logger(getClass());
+        this.subtopologies = subtopologies;
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }
+
+    /**
+     * Returns the set of the number of partitions for each repartition topic.
+     *
+     * @return the map of repartition topics for the requested topology to 
their required number of partitions.
+     *
+     * @throws TopicConfigurationException if no valid configuration can be 
found given the broker state, for example, if a source topic
+     *         is missing.
+     * @throws StreamsInvalidTopologyException if the number of partitions for 
all repartition topics cannot be determined, e.g.
+     *         because of loops, or if a repartition source topic is not a 
sink topic of any subtopology.
+     */
+    public Map<String, Integer> setup() {
+        final Set<String> missingSourceTopicsForTopology = new HashSet<>();
+
+        for (final Subtopology subtopology : subtopologies) {
+            final Set<String> missingSourceTopicsForSubtopology = 
computeMissingExternalSourceTopics(subtopology);
+            
missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
+        }
+
+        if (!missingSourceTopicsForTopology.isEmpty()) {
+            throw 
TopicConfigurationException.missingSourceTopics(String.format("Missing source 
topics: %s",
+                String.join(", ", missingSourceTopicsForTopology)));
+        }
+
+        final Map<String, Integer> repartitionTopicPartitionCount = 
computeRepartitionTopicPartitionCount();
+
+        for (final Subtopology subtopology : subtopologies) {
+            if 
(subtopology.repartitionSourceTopics().stream().anyMatch(repartitionTopic -> 
!repartitionTopicPartitionCount.containsKey(repartitionTopic.name()))) {
+                throw new StreamsInvalidTopologyException("Failed to compute 
number of partitions for all repartition topics, because "
+                    + "a repartition source topic is never used as a sink 
topic.");
+            }
+        }
+
+        return repartitionTopicPartitionCount;
+    }
+
+    private Set<String> computeMissingExternalSourceTopics(final Subtopology 
subtopology) {
+        final Set<String> missingExternalSourceTopics = new 
HashSet<>(subtopology.sourceTopics());
+        for (final TopicInfo topicInfo : 
subtopology.repartitionSourceTopics()) {
+            missingExternalSourceTopics.remove(topicInfo.name());
+        }
+        missingExternalSourceTopics.removeIf(x -> 
topicPartitionCountProvider.apply(x).isPresent());
+        return missingExternalSourceTopics;
+    }
+
+    /**
+     * Computes the number of partitions and returns it for each repartition 
topic.
+     */
+    private Map<String, Integer> computeRepartitionTopicPartitionCount() {
+        boolean partitionCountNeeded;
+        Map<String, Integer> repartitionTopicPartitionCounts = new HashMap<>();
+
+        for (final Subtopology subtopology : subtopologies) {
+            for (final TopicInfo repartitionSourceTopic : 
subtopology.repartitionSourceTopics()) {
+                if (repartitionSourceTopic.partitions() != 0) {
+                    
repartitionTopicPartitionCounts.put(repartitionSourceTopic.name(), 
repartitionSourceTopic.partitions());
+                }
+            }
+        }
+
+        do {
+            partitionCountNeeded = false;
+            // avoid infinitely looping without making any progress on unknown 
repartitions
+            boolean progressMadeThisIteration = false;
+
+            for (final Subtopology subtopology : subtopologies) {
+                for (final String repartitionSinkTopic : 
subtopology.repartitionSinkTopics()) {
+                    if 
(!repartitionTopicPartitionCounts.containsKey(repartitionSinkTopic)) {
+                        final Integer numPartitions = computePartitionCount(
+                            repartitionTopicPartitionCounts,
+                            subtopology
+                        );
+
+                        if (numPartitions == null) {
+                            partitionCountNeeded = true;
+                            log.trace("Unable to determine number of 
partitions for {}, another iteration is needed",
+                                repartitionSinkTopic);
+                        } else {
+                            log.trace("Determined number of partitions for {} 
to be {}",
+                                repartitionSinkTopic,
+                                numPartitions);
+                            
repartitionTopicPartitionCounts.put(repartitionSinkTopic, numPartitions);
+                            progressMadeThisIteration = true;
+                        }
+                    }
+                }
+            }
+            if (!progressMadeThisIteration && partitionCountNeeded) {
+                throw new StreamsInvalidTopologyException("Failed to compute 
number of partitions for all " +
+                    "repartition topics. There may be loops in the topology 
that cannot be resolved.");
+            }
+        } while (partitionCountNeeded);
+
+        return repartitionTopicPartitionCounts;
+    }
+
+    private Integer computePartitionCount(final Map<String, Integer> 
repartitionTopicPartitionCounts,
+                                          final Subtopology subtopology) {
+        Integer partitionCount = null;
+        // try set the number of partitions for this repartition topic if it 
is not set yet
+        // use the maximum of all its source topic partitions as the number of 
partitions
+
+        // It is possible that there is another internal topic, i.e,
+        // map().join().join(map())
+        for (final TopicInfo repartitionSourceTopic : 
subtopology.repartitionSourceTopics()) {
+            Integer numPartitionsCandidate = 
repartitionTopicPartitionCounts.get(repartitionSourceTopic.name());
+            if (numPartitionsCandidate != null && (partitionCount == null || 
numPartitionsCandidate > partitionCount)) {
+                partitionCount = numPartitionsCandidate;
+            }
+        }
+        for (final String externalSourceTopic : subtopology.sourceTopics()) {
+            final OptionalInt actualPartitionCount = 
topicPartitionCountProvider.apply(externalSourceTopic);
+            if (actualPartitionCount.isPresent() && (partitionCount == null || 
actualPartitionCount.getAsInt() > partitionCount)) {
+                partitionCount = actualPartitionCount.getAsInt();
+            }
+        }
+        return partitionCount;
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
new file mode 100644
index 00000000000..8257f42dbae
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final TopicInfo REPARTITION_TOPIC1 = new 
TopicInfo().setName("repartition1").setPartitions(4);
+    private static final TopicInfo REPARTITION_TOPIC2 = new 
TopicInfo().setName("repartition2").setPartitions(2);
+    private static final TopicInfo REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = 
new TopicInfo().setName("repartitionWithoutPartitionCount");
+
+    private static OptionalInt sourceTopicPartitionCounts(final String 
topicName) {
+        return SOURCE_TOPIC_NAME1.equals(topicName) || 
SOURCE_TOPIC_NAME2.equals(topicName) ? OptionalInt.of(3) : OptionalInt.empty();
+    }
+
+    @Test
+    public void shouldSetupRepartitionTopics() {
+        final Subtopology subtopology1 = new Subtopology()
+            .setSubtopologyId("subtopology1")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
+        final Subtopology subtopology2 = new Subtopology()
+            .setSubtopologyId("subtopology2")
+            .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
+        final List<Subtopology> subtopologies = List.of(subtopology1, 
subtopology2);
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologies,
+            RepartitionTopicsTest::sourceTopicPartitionCounts
+        );
+
+        final Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            Map.of(REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions()),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+        final Subtopology subtopology1 = new Subtopology()
+            .setSubtopologyId("subtopology1")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
+        final Subtopology subtopology2 = new Subtopology()
+            .setSubtopologyId("subtopology2")
+            .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
+        final Function<String, OptionalInt> topicPartitionCountProvider =
+            s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : 
sourceTopicPartitionCounts(s);
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            List.of(subtopology1, subtopology2),
+            topicPartitionCountProvider
+        );
+
+        final TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class,
+            repartitionTopics::setup);
+
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals("Missing source topics: source1", exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToLoops()
 {
+        final Subtopology subtopology1 = new Subtopology()
+            .setSubtopologyId("subtopology1")
+            
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            List.of(subtopology1),
+            RepartitionTopicsTest::sourceTopicPartitionCounts
+        );
+
+        final StreamsInvalidTopologyException exception = 
assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);
+
+        assertEquals(
+            "Failed to compute number of partitions for all repartition 
topics. There may be loops in the topology that cannot be resolved.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToMissingSinks()
 {
+        final Subtopology subtopology1 = new Subtopology()
+            .setSubtopologyId("subtopology1")
+            
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            List.of(subtopology1),
+            RepartitionTopicsTest::sourceTopicPartitionCounts
+        );
+
+        final StreamsInvalidTopologyException exception = 
assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);
+
+        assertEquals(
+            "Failed to compute number of partitions for all repartition 
topics, because a repartition source topic is never used as a sink topic.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("subtopology0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name(), 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()))
+            .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC2));
+        final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
+            .setSubtopologyId("subtopologyWithoutPartitionCount")
+            .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            List.of(subtopology, subtopologyWithoutPartitionCount),
+            RepartitionTopicsTest::sourceTopicPartitionCounts
+        );
+
+        final Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(Map.of(
+            REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
+            REPARTITION_TOPIC2.name(), REPARTITION_TOPIC2.partitions(),
+            REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(), 
sourceTopicPartitionCounts(SOURCE_TOPIC_NAME1).getAsInt()
+        ), setup);
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic()
 {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("subtopology0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
+            .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1))
+            
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
+        final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
+            .setSubtopologyId("subtopologyWithoutPartitionCount")
+            
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            List.of(subtopology, subtopologyWithoutPartitionCount),
+            RepartitionTopicsTest::sourceTopicPartitionCounts
+        );
+
+        final Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            Map.of(
+                REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
+                REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(), 
REPARTITION_TOPIC1.partitions()
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartitionTopics() 
{
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("subtopology0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1));
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            List.of(subtopology),
+            RepartitionTopicsTest::sourceTopicPartitionCounts
+        );
+
+        final Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(Collections.emptyMap(), setup);
+    }
+
+}
\ No newline at end of file

Reply via email to