This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 3b1abeffae7 KAFKA-20115: Group coordinator fails to unload metadata
when no longer leader or follower (#21396)
3b1abeffae7 is described below
commit 3b1abeffae72c144233139c33f42862f37706798
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Mon Feb 9 22:08:43 2026 +0800
KAFKA-20115: Group coordinator fails to unload metadata when no longer
leader or follower (#21396)
When a broker loses leadership of a __consumer_offsets partition while a
write batch is pending, the coordinator unload process fails because
freeCurrentBatch() attempts to access partition writer configuration
which throws NOT_LEADER_OR_FOLLOWER exception.
This commit fixes the issue by using cached maxBatchSize instead of
querying log config max message size in `freeCurrentBatch`.
Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>, David Jacot <[email protected]>
---
.../clients/consumer/ConsumerIntegrationTest.java | 53 ++++++++++++++
.../common/runtime/CoordinatorRuntime.java | 5 +-
.../common/runtime/CoordinatorRuntimeTest.java | 80 ++++++++++++++++++++++
3 files changed, 137 insertions(+), 1 deletion(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 220866c240f..336a37fbf05 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -26,7 +26,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -36,8 +38,10 @@ import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.apache.kafka.test.TestUtils;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@@ -335,6 +339,55 @@ public class ConsumerIntegrationTest {
}
}
+ @ClusterTest(
+ brokers = 2,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value =
"3000")
+ }
+ )
+ public void
testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException,
TimeoutException {
+ try (var producer = clusterInstance.<byte[], byte[]>producer()) {
+ producer.send(new ProducerRecord<>("topic",
"value".getBytes(StandardCharsets.UTF_8)));
+ }
+
+ try (var admin = clusterInstance.admin()) {
+ admin.createTopics(List.of(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
+ }
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
+ var admin = clusterInstance.admin()) {
+ consumer.subscribe(List.of("topic"));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
+ // Append records to coordinator.
+ consumer.commitSync();
+
+ var broker0Metrics = clusterInstance.brokers().get(0).metrics();
+ var broker1Metrics = clusterInstance.brokers().get(1).metrics();
+ var activeNumPartitions = broker0Metrics.metricName(
+ "num-partitions",
+ GroupCoordinatorRuntimeMetrics.METRICS_GROUP,
+ Map.of("state", "active")
+ );
+
+ assertEquals(1L,
broker0Metrics.metric(activeNumPartitions).metricValue());
+ assertEquals(0L,
broker1Metrics.metric(activeNumPartitions).metricValue());
+
+ // Unload the coordinator by changing leader (0 -> 1).
+ admin.alterPartitionReassignments(
+ Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
Optional.of(new NewPartitionReassignment(List.of(1))))
+ ).all().get();
+
+ // Wait for the coordinator metrics to update after leadership
change.
+ TestUtils.waitForCondition(() ->
+ 0L == (Long)
broker0Metrics.metric(activeNumPartitions).metricValue() &&
+ 1L == (Long)
broker1Metrics.metric(activeNumPartitions).metricValue(),
+ "Incorrect num-partitions metric after partition reassignment
to the new coordinator"
+ );
+ }
+ }
+
private void sendMsg(ClusterInstance clusterInstance, String topic, int
sendMsgNum) {
try (var producer = clusterInstance.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class,
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1d3fb49ad30..b67a71d5b43 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -773,7 +773,10 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
// Release the buffer only if it is not larger than the
maxBatchSize.
- int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+ // We avoid querying the log's configuration for the max message
size here,
+ // because after a partition leadership change, this throws a
NOT_LEADER_OR_FOLLOWER
+ // exception. Such exceptions can propagate unexpectedly and
disrupt subsequent operations.
+ int maxBatchSize = currentBatch.maxBatchSize;
if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
bufferSupplier.release(currentBatch.builder.buffer());
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index a359832756d..aeb808649f1 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -710,6 +710,86 @@ public class CoordinatorRuntimeTest {
assertThrows(NotCoordinatorException.class, () ->
runtime.contextOrThrow(TP));
}
+ @Test
+ public void
testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorShardBuilderSupplier supplier =
mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder =
mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(OptionalInt.of(10))
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.withExecutor(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+
+ // Configure the partition writer with a normal config initially.
+ LogConfig initialLogConfig = new LogConfig(
+ Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 *
1024)) // 1MB
+ );
+ when(writer.config(TP)).thenReturn(initialLogConfig);
+ when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L);
+
+ // Load the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+
+ // Schedule a write operation to create a pending batch.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(List.of("record1"), "response1")
+ );
+
+ // Verify that the write is not committed yet and a batch exists.
+ assertFalse(write1.isDone());
+ assertNotNull(ctx.currentBatch);
+
+ // Simulate the broker losing leadership: partitionWriter.config() now
throws NOT_LEADER_OR_FOLLOWER.
+ // This is the scenario described in KAFKA-20115.
+
when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
+ // Schedule the unloading. This should trigger the bug where
freeCurrentBatch()
+ // tries to call partitionWriter.config(tp).maxMessageSize() and
throws an exception.
+ // Without the fix, this would prevent the coordinator from unloading
properly.
+ runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+
+ // The unload should complete despite the NOT_LEADER_OR_FOLLOWER
exception
+ // when trying to access partition writer config during buffer cleanup.
+ assertEquals(CLOSED, ctx.state);
+
+ // Verify that onUnloaded is called.
+ verify(coordinator, times(1)).onUnloaded();
+
+ // Verify that the listener is deregistered.
+ verify(writer, times(1)).deregisterListener(
+ eq(TP),
+ any(PartitionWriter.Listener.class)
+ );
+
+ // Getting the coordinator context fails because it no longer exists.
+ assertThrows(NotCoordinatorException.class, () ->
runtime.contextOrThrow(TP));
+ }
+
@Test
public void testScheduleWriteOp() throws ExecutionException,
InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();