This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 84502429bbe KAFKA-19779: Introduce source topic to subtopology map
[2/N] (#20717)
84502429bbe is described below
commit 84502429bbe5ba0ca4ec1c5a9a599722334dd524
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Oct 17 13:35:09 2025 +0200
KAFKA-19779: Introduce source topic to subtopology map [2/N] (#20717)
For validating offset commits based on partitions, we need to be able to
efficiently find the subtopology from a given source topic (user input
topic or repartition topic).
We introduce this precomputed map in `StreamsTopology`, where it is
generated upon construction.
We can use the precomputed map in StreamsGroup.isSuscribedToTopic we can
use it to validate efficiently if we are still subscribed to a certain
topic.
Using `StreamTopology` here instead of `ConfiguredTopology` is an
improvement in itself, because configured topology is soft state and not
entirely reliable to always be there.
Reviewers: Matthias J. Sax <[email protected]>
---
.../coordinator/group/streams/StreamsGroup.java | 13 ++-----
.../coordinator/group/streams/StreamsTopology.java | 32 ++++++++++++++-
.../group/streams/StreamsGroupTest.java | 14 +------
.../group/streams/StreamsTopologyTest.java | 45 ++++++++++++++++++++++
4 files changed, 80 insertions(+), 24 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index c0a4fb1f13b..0a4a739ec6d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -31,7 +31,6 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
-import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@@ -50,7 +49,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.TreeMap;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
@@ -774,16 +772,11 @@ public class StreamsGroup implements Group {
// This allows offsets to expire for empty groups.
return false;
}
- Optional<ConfiguredTopology> maybeConfiguredTopology =
configuredTopology.get();
- if (maybeConfiguredTopology.isEmpty() ||
!maybeConfiguredTopology.get().isReady()) {
+ Optional<StreamsTopology> maybeTopology = topology.get();
+ if (maybeTopology.isEmpty()) {
return false;
}
- for (ConfiguredSubtopology sub :
maybeConfiguredTopology.get().subtopologies().orElse(new TreeMap<>()).values())
{
- if (sub.sourceTopics().contains(topic) ||
sub.repartitionSourceTopics().containsKey(topic)) {
- return true;
- }
- }
- return false;
+ return maybeTopology.get().sourceTopicMap().containsKey(topic);
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index 25ea4376331..9d7d097f80f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.To
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -41,15 +42,44 @@ import java.util.stream.Stream;
* @param topologyEpoch The epoch of the topology (must be non-negative).
* @param subtopologies The subtopologies of the topology containing
information about source topics,
* repartition topics, changelog topics, co-partition
groups etc. (must be non-null)
+ * @param sourceTopicMap A precomputed map of source topics to their
corresponding subtopology (must be non-null)
*/
public record StreamsTopology(int topologyEpoch,
- Map<String, Subtopology> subtopologies) {
+ Map<String, Subtopology> subtopologies,
+ Map<String, Subtopology> sourceTopicMap) {
+
+ /**
+ * Constructor that automatically computes the sourceTopicMap from
subtopologies.
+ *
+ * @param topologyEpoch The epoch of the topology (must be non-negative).
+ * @param subtopologies The subtopologies of the topology.
+ */
+ public StreamsTopology(int topologyEpoch, Map<String, Subtopology>
subtopologies) {
+ this(topologyEpoch, subtopologies,
computeSourceTopicMap(subtopologies));
+ }
public StreamsTopology {
if (topologyEpoch < 0) {
throw new IllegalArgumentException("Topology epoch must be
non-negative.");
}
subtopologies =
Collections.unmodifiableMap(Objects.requireNonNull(subtopologies,
"Subtopologies cannot be null."));
+ sourceTopicMap =
Collections.unmodifiableMap(Objects.requireNonNull(sourceTopicMap, "Source
topic map cannot be null."));
+ }
+
+ private static Map<String, Subtopology> computeSourceTopicMap(Map<String,
Subtopology> subtopologies) {
+ Objects.requireNonNull(subtopologies, "Subtopologies cannot be null.");
+ Map<String, Subtopology> computedMap = new HashMap<>();
+ for (Subtopology subtopology : subtopologies.values()) {
+ // Add regular source topics
+ for (String sourceTopic : subtopology.sourceTopics()) {
+ computedMap.put(sourceTopic, subtopology);
+ }
+ // Add repartition source topics
+ for (TopicInfo repartitionSourceTopic :
subtopology.repartitionSourceTopics()) {
+ computedMap.put(repartitionSourceTopic.name(), subtopology);
+ }
+ }
+ return computedMap;
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 94f78006426..1324c0679c8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -45,7 +45,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
-import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -970,18 +969,7 @@ public class StreamsGroupTest {
streamsGroup.setTopology(topology);
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember("member-id"));
-
- assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
- assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
- assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
-
- MetadataImage metadataImage = new MetadataImageBuilder()
- .addTopic(Uuid.randomUuid(), "test-topic1", 1)
- .addTopic(Uuid.randomUuid(), "test-topic2", 1)
- .build();
-
-
streamsGroup.setConfiguredTopology(InternalTopicManager.configureTopics(logContext,
0, topology, new KRaftCoordinatorMetadataImage(metadataImage)));
-
+
assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
index 83ea799cdc9..751be12df86 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -204,6 +204,51 @@ public class StreamsTopologyTest {
assertEquals(0, describeTopology.subtopologies().size());
}
+ @Test
+ public void sourceTopicMapShouldBeComputedCorrectly() {
+ Map<String, Subtopology> subtopologies = mkMap(
+ mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()),
+ mkEntry(SUBTOPOLOGY_ID_2, mkSubtopology2())
+ );
+ StreamsTopology topology = new StreamsTopology(1, subtopologies);
+
+ // Verify sourceTopicMap contains all source topics from both
subtopologies
+ Map<String, Subtopology> sourceTopicMap = topology.sourceTopicMap();
+
+ // From subtopology 1: SOURCE_TOPIC_1, SOURCE_TOPIC_2,
REPARTITION_TOPIC_1, REPARTITION_TOPIC_2
+ // From subtopology 2: SOURCE_TOPIC_3, REPARTITION_TOPIC_3
+ assertEquals(6, sourceTopicMap.size());
+
+ // Verify regular source topics
+ assertTrue(sourceTopicMap.containsKey(SOURCE_TOPIC_1));
+ assertEquals(mkSubtopology1(), sourceTopicMap.get(SOURCE_TOPIC_1));
+ assertTrue(sourceTopicMap.containsKey(SOURCE_TOPIC_2));
+ assertEquals(mkSubtopology1(), sourceTopicMap.get(SOURCE_TOPIC_2));
+ assertTrue(sourceTopicMap.containsKey(SOURCE_TOPIC_3));
+ assertEquals(mkSubtopology2(), sourceTopicMap.get(SOURCE_TOPIC_3));
+
+ // Verify repartition source topics
+ assertTrue(sourceTopicMap.containsKey(REPARTITION_TOPIC_1));
+ assertEquals(mkSubtopology1(),
sourceTopicMap.get(REPARTITION_TOPIC_1));
+ assertTrue(sourceTopicMap.containsKey(REPARTITION_TOPIC_2));
+ assertEquals(mkSubtopology1(),
sourceTopicMap.get(REPARTITION_TOPIC_2));
+ assertTrue(sourceTopicMap.containsKey(REPARTITION_TOPIC_3));
+ assertEquals(mkSubtopology2(),
sourceTopicMap.get(REPARTITION_TOPIC_3));
+ }
+
+ @Test
+ public void sourceTopicMapShouldBeImmutable() {
+ Map<String, Subtopology> subtopologies = mkMap(
+ mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1())
+ );
+ StreamsTopology topology = new StreamsTopology(1, subtopologies);
+
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> topology.sourceTopicMap().put("test-topic", mkSubtopology1())
+ );
+ }
+
private Subtopology mkSubtopology1() {
return new Subtopology()
.setSubtopologyId(SUBTOPOLOGY_ID_1)