This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34fa571ba69 KAFKA-20115: Group coordinator fails to unload metadata 
when no longer leader or follower (#21403)
34fa571ba69 is described below

commit 34fa571ba69a725ab3cd8e12a4f2f9bc195781ca
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Feb 10 01:38:25 2026 +0800

    KAFKA-20115: Group coordinator fails to unload metadata when no longer 
leader or follower (#21403)
    
    While the underlying cause of KAFKA-20115 was addressed by the
    resolution of KAFKA-19519, this commit introduces a dedicated
    integration test to ensure the scenario remains covered. This prevents
    future regressions and ensures the expected behavior is explicitly
    verified within the test suite.
    
    Reviewers: David Jacot <[email protected]>
    
    ---------
    
    Signed-off-by: Kuan-Po Tseng <[email protected]>
---
 .../clients/consumer/ConsumerIntegrationTest.java  | 53 ++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 220866c240f..0b0e65031f6 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -26,7 +26,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -36,8 +38,10 @@ import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTests;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
 import org.apache.kafka.test.TestUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
@@ -335,6 +339,55 @@ public class ConsumerIntegrationTest {
         }
     }
 
+    @ClusterTest(
+        brokers = 2,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value = 
"3000")
+        }
+    )
+    public void 
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException, 
TimeoutException {
+        try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+            producer.send(new ProducerRecord<>("topic", 
"value".getBytes(StandardCharsets.UTF_8)));
+        }
+
+        try (var admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new 
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+        }
+
+        try (var consumer = 
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+            var admin = clusterInstance.admin()) {
+            consumer.subscribe(List.of("topic"));
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+            // Append records to coordinator.
+            consumer.commitSync();
+
+            var broker0Metrics = clusterInstance.brokers().get(0).metrics();
+            var broker1Metrics = clusterInstance.brokers().get(1).metrics();
+            var activeNumPartitions = broker0Metrics.metricName(
+                "num-partitions",
+                GroupCoordinatorRuntimeMetrics.METRICS_GROUP,
+                Map.of("state", "active")
+            );
+
+            assertEquals(1L, 
broker0Metrics.metric(activeNumPartitions).metricValue());
+            assertEquals(0L, 
broker1Metrics.metric(activeNumPartitions).metricValue());
+
+            // Unload the coordinator by changing leader (0 -> 1).
+            admin.alterPartitionReassignments(
+                Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), 
Optional.of(new NewPartitionReassignment(List.of(1))))
+            ).all().get();
+
+            // Wait for the coordinator metrics to update after leadership 
change.
+            TestUtils.waitForCondition(() ->
+                0L == (Long) 
broker0Metrics.metric(activeNumPartitions).metricValue() &&
+                    1L == (Long) 
broker1Metrics.metric(activeNumPartitions).metricValue(),
+                "Incorrect num-partitions metric after partition reassignment 
to the new coordinator"
+            );
+        }
+    }
+
     private void sendMsg(ClusterInstance clusterInstance, String topic, int 
sendMsgNum) {
         try (var producer = clusterInstance.producer(Map.of(
                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class,

Reply via email to