This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 34fa571ba69 KAFKA-20115: Group coordinator fails to unload metadata
when no longer leader or follower (#21403)
34fa571ba69 is described below
commit 34fa571ba69a725ab3cd8e12a4f2f9bc195781ca
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Feb 10 01:38:25 2026 +0800
KAFKA-20115: Group coordinator fails to unload metadata when no longer
leader or follower (#21403)
While the underlying cause of KAFKA-20115 was addressed by the
resolution of KAFKA-19519, this commit introduces a dedicated
integration test to ensure the scenario remains covered. This prevents
future regressions and ensures the expected behavior is explicitly
verified within the test suite.
Reviewers: David Jacot <[email protected]>
---------
Signed-off-by: Kuan-Po Tseng <[email protected]>
---
.../clients/consumer/ConsumerIntegrationTest.java | 53 ++++++++++++++++++++++
1 file changed, 53 insertions(+)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 220866c240f..0b0e65031f6 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -26,7 +26,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -36,8 +38,10 @@ import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.apache.kafka.test.TestUtils;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@@ -335,6 +339,55 @@ public class ConsumerIntegrationTest {
}
}
+ @ClusterTest(
+ brokers = 2,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value =
"3000")
+ }
+ )
+ public void
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException,
TimeoutException {
+ try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+ producer.send(new ProducerRecord<>("topic",
"value".getBytes(StandardCharsets.UTF_8)));
+ }
+
+ try (var admin = clusterInstance.admin()) {
+ admin.createTopics(List.of(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+ }
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+ var admin = clusterInstance.admin()) {
+ consumer.subscribe(List.of("topic"));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+ // Append records to coordinator.
+ consumer.commitSync();
+
+ var broker0Metrics = clusterInstance.brokers().get(0).metrics();
+ var broker1Metrics = clusterInstance.brokers().get(1).metrics();
+ var activeNumPartitions = broker0Metrics.metricName(
+ "num-partitions",
+ GroupCoordinatorRuntimeMetrics.METRICS_GROUP,
+ Map.of("state", "active")
+ );
+
+ assertEquals(1L,
broker0Metrics.metric(activeNumPartitions).metricValue());
+ assertEquals(0L,
broker1Metrics.metric(activeNumPartitions).metricValue());
+
+ // Unload the coordinator by changing leader (0 -> 1).
+ admin.alterPartitionReassignments(
+ Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
Optional.of(new NewPartitionReassignment(List.of(1))))
+ ).all().get();
+
+ // Wait for the coordinator metrics to update after leadership
change.
+ TestUtils.waitForCondition(() ->
+ 0L == (Long)
broker0Metrics.metric(activeNumPartitions).metricValue() &&
+ 1L == (Long)
broker1Metrics.metric(activeNumPartitions).metricValue(),
+ "Incorrect num-partitions metric after partition reassignment
to the new coordinator"
+ );
+ }
+ }
+
private void sendMsg(ClusterInstance clusterInstance, String topic, int
sendMsgNum) {
try (var producer = clusterInstance.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class,