This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25bc5f2cfa0 KAFKA-19372: StreamsGroup not subscribed to a topic when
empty (#19901)
25bc5f2cfa0 is described below
commit 25bc5f2cfa0e42d735c6408ce66304fc3497542c
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Jun 4 20:55:14 2025 +0200
KAFKA-19372: StreamsGroup not subscribed to a topic when empty (#19901)
We should behave more like a consumer group and make sure to not be
subscribed to the input topics anymore when the last member leaves the
group. We don't do this right now because our topology is still
initialized even after the last member leaves the group.
This will allow:
* Offsets to expire and be cleaned up.
* Offsets to be deleted through admin API calls.
Reviewers: Bill Bejeck <[email protected]>
---
.../org/apache/kafka/coordinator/group/streams/StreamsGroup.java | 6 ++++++
.../apache/kafka/coordinator/group/streams/StreamsGroupTest.java | 8 ++++++++
2 files changed, 14 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 061f816296a..afc252a7fee 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -55,6 +55,7 @@ import java.util.Set;
import java.util.TreeMap;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
@@ -765,6 +766,11 @@ public class StreamsGroup implements Group {
@Override
public boolean isSubscribedToTopic(String topic) {
+ if (state.get() == EMPTY || state.get() == DEAD) {
+ // No topic subscriptions if the group is empty.
+ // This allows offsets to expire for empty groups.
+ return false;
+ }
Optional<ConfiguredTopology> maybeConfiguredTopology =
configuredTopology.get();
if (maybeConfiguredTopology.isEmpty() ||
!maybeConfiguredTopology.get().isReady()) {
return false;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 0bd8caf3bbd..8966c936356 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -977,6 +977,8 @@ public class StreamsGroupTest {
));
streamsGroup.setTopology(topology);
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember("member-id"));
+
assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
@@ -991,6 +993,12 @@ public class StreamsGroupTest {
assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+ streamsGroup.removeMember("member-id");
+
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+ assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
}
@Test