This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84502429bbe KAFKA-19779: Introduce source topic to subtopology map 
[2/N] (#20717)
84502429bbe is described below

commit 84502429bbe5ba0ca4ec1c5a9a599722334dd524
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Oct 17 13:35:09 2025 +0200

    KAFKA-19779: Introduce source topic to subtopology map [2/N] (#20717)
    
    For validating offset commits based on partitions, we need to be able to
    efficiently find the subtopology from a given source topic (user input
    topic or repartition topic).
    
    We introduce this precomputed map in `StreamsTopology`, where it is
    generated upon construction.
    
    We can use the precomputed map in StreamsGroup.isSuscribedToTopic we can
    use it to validate efficiently if we are still subscribed to a certain
    topic.
    
    Using `StreamTopology` here instead of `ConfiguredTopology` is an
    improvement in itself, because configured topology is soft state and not
    entirely reliable to always be there.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../coordinator/group/streams/StreamsGroup.java    | 13 ++-----
 .../coordinator/group/streams/StreamsTopology.java | 32 ++++++++++++++-
 .../group/streams/StreamsGroupTest.java            | 14 +------
 .../group/streams/StreamsTopologyTest.java         | 45 ++++++++++++++++++++++
 4 files changed, 80 insertions(+), 24 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index c0a4fb1f13b..0a4a739ec6d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -31,7 +31,6 @@ import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.Utils;
-import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -50,7 +49,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.TreeMap;
 
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
@@ -774,16 +772,11 @@ public class StreamsGroup implements Group {
             // This allows offsets to expire for empty groups.
             return false;
         }
-        Optional<ConfiguredTopology> maybeConfiguredTopology = 
configuredTopology.get();
-        if (maybeConfiguredTopology.isEmpty() || 
!maybeConfiguredTopology.get().isReady()) {
+        Optional<StreamsTopology> maybeTopology = topology.get();
+        if (maybeTopology.isEmpty()) {
             return false;
         }
-        for (ConfiguredSubtopology sub : 
maybeConfiguredTopology.get().subtopologies().orElse(new TreeMap<>()).values()) 
{
-            if (sub.sourceTopics().contains(topic) || 
sub.repartitionSourceTopics().containsKey(topic)) {
-                return true;
-            }
-        }
-        return false;
+        return maybeTopology.get().sourceTopicMap().containsKey(topic);
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index 25ea4376331..9d7d097f80f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.To
 
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -41,15 +42,44 @@ import java.util.stream.Stream;
  * @param topologyEpoch The epoch of the topology (must be non-negative).
  * @param subtopologies The subtopologies of the topology containing 
information about source topics,
  *                      repartition topics, changelog topics, co-partition 
groups etc. (must be non-null)
+ * @param sourceTopicMap A precomputed map of source topics to their 
corresponding subtopology (must be non-null)
  */
 public record StreamsTopology(int topologyEpoch,
-                              Map<String, Subtopology> subtopologies) {
+                              Map<String, Subtopology> subtopologies,
+                              Map<String, Subtopology> sourceTopicMap) {
+
+    /**
+     * Constructor that automatically computes the sourceTopicMap from 
subtopologies.
+     *
+     * @param topologyEpoch The epoch of the topology (must be non-negative).
+     * @param subtopologies The subtopologies of the topology.
+     */
+    public StreamsTopology(int topologyEpoch, Map<String, Subtopology> 
subtopologies) {
+        this(topologyEpoch, subtopologies, 
computeSourceTopicMap(subtopologies));
+    }
 
     public StreamsTopology {
         if (topologyEpoch < 0) {
             throw new IllegalArgumentException("Topology epoch must be 
non-negative.");
         }
         subtopologies = 
Collections.unmodifiableMap(Objects.requireNonNull(subtopologies, 
"Subtopologies cannot be null."));
+        sourceTopicMap = 
Collections.unmodifiableMap(Objects.requireNonNull(sourceTopicMap, "Source 
topic map cannot be null."));
+    }
+
+    private static Map<String, Subtopology> computeSourceTopicMap(Map<String, 
Subtopology> subtopologies) {
+        Objects.requireNonNull(subtopologies, "Subtopologies cannot be null.");
+        Map<String, Subtopology> computedMap = new HashMap<>();
+        for (Subtopology subtopology : subtopologies.values()) {
+            // Add regular source topics
+            for (String sourceTopic : subtopology.sourceTopics()) {
+                computedMap.put(sourceTopic, subtopology);
+            }
+            // Add repartition source topics
+            for (TopicInfo repartitionSourceTopic : 
subtopology.repartitionSourceTopics()) {
+                computedMap.put(repartitionSourceTopic.name(), subtopology);
+            }
+        }
+        return computedMap;
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 94f78006426..1324c0679c8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -45,7 +45,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
 import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
-import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
@@ -970,18 +969,7 @@ public class StreamsGroupTest {
         streamsGroup.setTopology(topology);
 
         
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember("member-id"));
-
-        assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
-        assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
-        assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
-
-        MetadataImage metadataImage = new MetadataImageBuilder()
-            .addTopic(Uuid.randomUuid(), "test-topic1", 1)
-            .addTopic(Uuid.randomUuid(), "test-topic2", 1)
-            .build();
-
-        
streamsGroup.setConfiguredTopology(InternalTopicManager.configureTopics(logContext,
 0, topology, new KRaftCoordinatorMetadataImage(metadataImage)));
-
+        
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
index 83ea799cdc9..751be12df86 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -204,6 +204,51 @@ public class StreamsTopologyTest {
         assertEquals(0, describeTopology.subtopologies().size());
     }
 
+    @Test
+    public void sourceTopicMapShouldBeComputedCorrectly() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()),
+            mkEntry(SUBTOPOLOGY_ID_2, mkSubtopology2())
+        );
+        StreamsTopology topology = new StreamsTopology(1, subtopologies);
+        
+        // Verify sourceTopicMap contains all source topics from both 
subtopologies
+        Map<String, Subtopology> sourceTopicMap = topology.sourceTopicMap();
+        
+        // From subtopology 1: SOURCE_TOPIC_1, SOURCE_TOPIC_2, 
REPARTITION_TOPIC_1, REPARTITION_TOPIC_2
+        // From subtopology 2: SOURCE_TOPIC_3, REPARTITION_TOPIC_3
+        assertEquals(6, sourceTopicMap.size());
+        
+        // Verify regular source topics
+        assertTrue(sourceTopicMap.containsKey(SOURCE_TOPIC_1));
+        assertEquals(mkSubtopology1(), sourceTopicMap.get(SOURCE_TOPIC_1));
+        assertTrue(sourceTopicMap.containsKey(SOURCE_TOPIC_2));
+        assertEquals(mkSubtopology1(), sourceTopicMap.get(SOURCE_TOPIC_2));
+        assertTrue(sourceTopicMap.containsKey(SOURCE_TOPIC_3));
+        assertEquals(mkSubtopology2(), sourceTopicMap.get(SOURCE_TOPIC_3));
+        
+        // Verify repartition source topics
+        assertTrue(sourceTopicMap.containsKey(REPARTITION_TOPIC_1));
+        assertEquals(mkSubtopology1(), 
sourceTopicMap.get(REPARTITION_TOPIC_1));
+        assertTrue(sourceTopicMap.containsKey(REPARTITION_TOPIC_2));
+        assertEquals(mkSubtopology1(), 
sourceTopicMap.get(REPARTITION_TOPIC_2));
+        assertTrue(sourceTopicMap.containsKey(REPARTITION_TOPIC_3));
+        assertEquals(mkSubtopology2(), 
sourceTopicMap.get(REPARTITION_TOPIC_3));
+    }
+
+    @Test
+    public void sourceTopicMapShouldBeImmutable() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1())
+        );
+        StreamsTopology topology = new StreamsTopology(1, subtopologies);
+        
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> topology.sourceTopicMap().put("test-topic", mkSubtopology1())
+        );
+    }
+
     private Subtopology mkSubtopology1() {
         return new Subtopology()
             .setSubtopologyId(SUBTOPOLOGY_ID_1)

Reply via email to