This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 3b1abeffae7 KAFKA-20115: Group coordinator fails to unload metadata 
when no longer leader or follower (#21396)
3b1abeffae7 is described below

commit 3b1abeffae72c144233139c33f42862f37706798
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Mon Feb 9 22:08:43 2026 +0800

    KAFKA-20115: Group coordinator fails to unload metadata when no longer 
leader or follower (#21396)
    
    When a broker loses leadership of a __consumer_offsets partition while a
    write batch is pending, the coordinator unload process fails because
    freeCurrentBatch() attempts to access partition writer configuration
    which throws NOT_LEADER_OR_FOLLOWER exception.
    
    This commit fixes the issue by using cached maxBatchSize instead of
    querying log config max message size in `freeCurrentBatch`.
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>, David Jacot <[email protected]>
---
 .../clients/consumer/ConsumerIntegrationTest.java  | 53 ++++++++++++++
 .../common/runtime/CoordinatorRuntime.java         |  5 +-
 .../common/runtime/CoordinatorRuntimeTest.java     | 80 ++++++++++++++++++++++
 3 files changed, 137 insertions(+), 1 deletion(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 220866c240f..336a37fbf05 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -26,7 +26,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -36,8 +38,10 @@ import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTests;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
 import org.apache.kafka.test.TestUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
@@ -335,6 +339,55 @@ public class ConsumerIntegrationTest {
         }
     }
 
+    @ClusterTest(
+        brokers = 2,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value = 
"3000")
+        }
+    )
+    public void 
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException, 
TimeoutException {
+        try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+            producer.send(new ProducerRecord<>("topic", 
"value".getBytes(StandardCharsets.UTF_8)));
+        }
+
+        try (var admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new 
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+        }
+
+        try (var consumer = 
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+             var admin = clusterInstance.admin()) {
+            consumer.subscribe(List.of("topic"));
+            TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+            // Append records to coordinator.
+            consumer.commitSync();
+
+            var broker0Metrics = clusterInstance.brokers().get(0).metrics();
+            var broker1Metrics = clusterInstance.brokers().get(1).metrics();
+            var activeNumPartitions = broker0Metrics.metricName(
+                "num-partitions",
+                GroupCoordinatorRuntimeMetrics.METRICS_GROUP,
+                Map.of("state", "active")
+            );
+
+            assertEquals(1L, 
broker0Metrics.metric(activeNumPartitions).metricValue());
+            assertEquals(0L, 
broker1Metrics.metric(activeNumPartitions).metricValue());
+
+            // Unload the coordinator by changing leader (0 -> 1).
+            admin.alterPartitionReassignments(
+                Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), 
Optional.of(new NewPartitionReassignment(List.of(1))))
+            ).all().get();
+
+            // Wait for the coordinator metrics to update after leadership 
change.
+            TestUtils.waitForCondition(() ->
+                0L == (Long) 
broker0Metrics.metric(activeNumPartitions).metricValue() &&
+                    1L == (Long) 
broker1Metrics.metric(activeNumPartitions).metricValue(),
+                "Incorrect num-partitions metric after partition reassignment 
to the new coordinator"
+            );
+        }
+    }
+
     private void sendMsg(ClusterInstance clusterInstance, String topic, int 
sendMsgNum) {
         try (var producer = clusterInstance.producer(Map.of(
                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class,
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1d3fb49ad30..b67a71d5b43 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -773,7 +773,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
 
             // Release the buffer only if it is not larger than the 
maxBatchSize.
-            int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+            // We avoid querying the log's configuration for the max message 
size here,
+            // because after a partition leadership change, this throws a 
NOT_LEADER_OR_FOLLOWER
+            // exception. Such exceptions can propagate unexpectedly and 
disrupt subsequent operations.
+            int maxBatchSize = currentBatch.maxBatchSize;
 
             if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
                 bufferSupplier.release(currentBatch.builder.buffer());
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index a359832756d..aeb808649f1 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -710,6 +710,86 @@ public class CoordinatorRuntimeTest {
         assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
     }
 
+    @Test
+    public void 
testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(OptionalInt.of(10))
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.withExecutor(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Configure the partition writer with a normal config initially.
+        LogConfig initialLogConfig = new LogConfig(
+            Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 
1024)) // 1MB
+        );
+        when(writer.config(TP)).thenReturn(initialLogConfig);
+        when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L);
+
+        // Load the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+        // Schedule a write operation to create a pending batch.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
+        );
+
+        // Verify that the write is not committed yet and a batch exists.
+        assertFalse(write1.isDone());
+        assertNotNull(ctx.currentBatch);
+
+        // Simulate the broker losing leadership: partitionWriter.config() now 
throws NOT_LEADER_OR_FOLLOWER.
+        // This is the scenario described in KAFKA-20115.
+        
when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
+        // Schedule the unloading. This should trigger the bug where 
freeCurrentBatch()
+        // tries to call partitionWriter.config(tp).maxMessageSize() and 
throws an exception.
+        // Without the fix, this would prevent the coordinator from unloading 
properly.
+        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+
+        // The unload should complete despite the NOT_LEADER_OR_FOLLOWER 
exception
+        // when trying to access partition writer config during buffer cleanup.
+        assertEquals(CLOSED, ctx.state);
+
+        // Verify that onUnloaded is called.
+        verify(coordinator, times(1)).onUnloaded();
+
+        // Verify that the listener is deregistered.
+        verify(writer, times(1)).deregisterListener(
+            eq(TP),
+            any(PartitionWriter.Listener.class)
+        );
+
+        // Getting the coordinator context fails because it no longer exists.
+        assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
+    }
+
     @Test
     public void testScheduleWriteOp() throws ExecutionException, 
InterruptedException, TimeoutException {
         MockTimer timer = new MockTimer();

Reply via email to