This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25bc5f2cfa0 KAFKA-19372: StreamsGroup not subscribed to a topic when 
empty (#19901)
25bc5f2cfa0 is described below

commit 25bc5f2cfa0e42d735c6408ce66304fc3497542c
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Jun 4 20:55:14 2025 +0200

    KAFKA-19372: StreamsGroup not subscribed to a topic when empty (#19901)
    
    We should behave more like a consumer group and make sure to not be
    subscribed to the input topics anymore when the last member leaves the
    group. We don't do this right now because our topology is still
    initialized even after the last member leaves the group.
    
    This will allow:
    * Offsets to expire and be cleaned up.
    * Offsets to be deleted through admin API calls.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../org/apache/kafka/coordinator/group/streams/StreamsGroup.java  | 6 ++++++
 .../apache/kafka/coordinator/group/streams/StreamsGroupTest.java  | 8 ++++++++
 2 files changed, 14 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 061f816296a..afc252a7fee 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -55,6 +55,7 @@ import java.util.Set;
 import java.util.TreeMap;
 
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
@@ -765,6 +766,11 @@ public class StreamsGroup implements Group {
 
     @Override
     public boolean isSubscribedToTopic(String topic) {
+        if (state.get() == EMPTY || state.get() == DEAD) {
+            // No topic subscriptions if the group is empty.
+            // This allows offsets to expire for empty groups.
+            return false;
+        }
         Optional<ConfiguredTopology> maybeConfiguredTopology = 
configuredTopology.get();
         if (maybeConfiguredTopology.isEmpty() || 
!maybeConfiguredTopology.get().isReady()) {
             return false;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 0bd8caf3bbd..8966c936356 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -977,6 +977,8 @@ public class StreamsGroupTest {
             ));
         streamsGroup.setTopology(topology);
 
+        
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember("member-id"));
+
         assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
         assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
@@ -991,6 +993,12 @@ public class StreamsGroupTest {
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+        streamsGroup.removeMember("member-id");
+
+        assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+        assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+        assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
     }
 
     @Test

Reply via email to