This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 482299c4e2c KAFKA-14462; [19/N] Add CoordinatorLoader implementation
(#13880)
482299c4e2c is described below
commit 482299c4e2c97315feea3db5d6bf0e5c2c8b8cc7
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 29 08:12:53 2023 +0200
KAFKA-14462; [19/N] Add CoordinatorLoader implementation (#13880)
This patch adds a coordinator loader implementation.
Reviewers: Jeff Kim <[email protected]>, Justine Olshan
<[email protected]>
---
.../coordinator/group/CoordinatorLoaderImpl.scala | 170 +++++++++++
.../group/CoordinatorLoaderImplTest.scala | 315 +++++++++++++++++++++
.../kafka/coordinator/group/RecordSerde.java | 169 +++++++++++
.../kafka/coordinator/group/RecordSerializer.java | 47 ---
.../group/runtime/CoordinatorLoader.java | 36 ++-
.../group/runtime/CoordinatorRuntime.java | 1 +
.../kafka/coordinator/group/RecordSerdeTest.java | 260 +++++++++++++++++
.../coordinator/group/RecordSerializerTest.java | 82 ------
.../group/runtime/CoordinatorRuntimeTest.java | 11 +-
9 files changed, 960 insertions(+), 131 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
new file mode 100644
index 00000000000..6b67d1a7a6b
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer,
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader,
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager The replica manager.
+ * @param deserializer The deserializer to use.
+ * @param loadBufferSize The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+ replicaManager: ReplicaManager,
+ deserializer: Deserializer[T],
+ loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+ private val isRunning = new AtomicBoolean(true)
+ private val scheduler = new KafkaScheduler(1)
+ scheduler.startup()
+
+ /**
+ * Loads the coordinator by reading all the records from the TopicPartition
+ * and applying them to the Replayable object.
+ *
+ * @param tp The TopicPartition to read from.
+ * @param coordinator The object to apply records to.
+ */
+ override def load(
+ tp: TopicPartition,
+ coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+ val future = new CompletableFuture[Void]()
+ val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+ () => doLoad(tp, coordinator, future))
+ if (result.isCancelled) {
+ future.completeExceptionally(new RuntimeException("Coordinator loader is
closed."))
+ }
+ future
+ }
+
+ private def doLoad(
+ tp: TopicPartition,
+ coordinator: CoordinatorPlayback[T],
+ future: CompletableFuture[Void]
+ ): Unit = {
+ try {
+ replicaManager.getLog(tp) match {
+ case None =>
+ future.completeExceptionally(new NotLeaderOrFollowerException(
+ s"Could not load records from $tp because the log does not
exist."))
+
+ case Some(log) =>
+ def logEndOffset: Long =
replicaManager.getLogEndOffset(tp).getOrElse(-1L)
+
+ // Buffer may not be needed if records are read from memory.
+ var buffer = ByteBuffer.allocate(0)
+ // Loop breaks if leader changes at any time during the load, since
logEndOffset is -1.
+ var currentOffset = log.logStartOffset
+ // Loop breaks if no records have been read, since the end of the
log has been reached.
+ // This is to ensure that the loop breaks even if the current offset
remains smaller than
+ // the log end offset but the log is empty. This could happen with
compacted topics.
+ var readAtLeastOneRecord = true
+
+ while (currentOffset < logEndOffset && readAtLeastOneRecord &&
isRunning.get) {
+ val fetchDataInfo = log.read(
+ startOffset = currentOffset,
+ maxLength = loadBufferSize,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )
+
+ readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
+
+ val memoryRecords = (fetchDataInfo.records: @unchecked) match {
+ case records: MemoryRecords =>
+ records
+
+ case fileRecords: FileRecords =>
+ val sizeInBytes = fileRecords.sizeInBytes
+ val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
+
+ // "minOneMessage = true in the above log.read() means that
the buffer may need to
+ // be grown to ensure progress can be made.
+ if (buffer.capacity < bytesNeeded) {
+ if (loadBufferSize < bytesNeeded)
+ warn(s"Loaded metadata from $tp with buffer larger
($bytesNeeded bytes) than " +
+ s"configured buffer size ($loadBufferSize bytes).")
+
+ buffer = ByteBuffer.allocate(bytesNeeded)
+ } else {
+ buffer.clear()
+ }
+
+ fileRecords.readInto(buffer, 0)
+ MemoryRecords.readableRecords(buffer)
+ }
+
+ memoryRecords.batches.forEach { batch =>
+ if (batch.isControlBatch) {
+ throw new IllegalStateException("Control batches are not
supported yet.")
+ } else {
+ batch.asScala.foreach { record =>
+ try {
+ coordinator.replay(deserializer.deserialize(record.key,
record.value))
+ } catch {
+ case ex: UnknownRecordTypeException =>
+ warn(s"Unknown record type ${ex.unknownType} while
loading offsets and group metadata " +
+ s"from $tp. Ignoring it. It could be a left over from
an aborted upgrade.")
+ }
+ }
+ }
+
+ currentOffset = batch.nextOffset
+ }
+ }
+
+ if (isRunning.get) {
+ future.complete(null)
+ } else {
+ future.completeExceptionally(new RuntimeException("Coordinator
loader is closed."))
+ }
+ }
+ } catch {
+ case ex: Throwable =>
+ future.completeExceptionally(ex)
+ }
+ }
+
+ /**
+ * Closes the loader.
+ */
+ override def close(): Unit = {
+ if (!isRunning.compareAndSet(true, false)) {
+ warn("Coordinator loader is already shutting down.")
+ return
+ }
+ scheduler.shutdown()
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
new file mode 100644
index 00000000000..c71c28d47ea
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.log.UnifiedLog
+import kafka.server.ReplicaManager
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{CompressionType, FileRecords,
MemoryRecords, SimpleRecord}
+import
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader,
CoordinatorPlayback}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation,
LogOffsetMetadata}
+import org.apache.kafka.test.TestUtils.assertFutureThrows
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
+import org.junit.jupiter.api.{Test, Timeout}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify, when}
+
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class StringKeyValueDeserializer extends
CoordinatorLoader.Deserializer[(String, String)] {
+ override def deserialize(key: ByteBuffer, value: ByteBuffer): (String,
String) = {
+ (
+ Charset.defaultCharset().decode(key).toString,
+ Charset.defaultCharset().decode(value).toString
+ )
+ }
+}
+
+@Timeout(60)
+class CoordinatorLoaderImplTest {
+ @Test
+ def testNonexistentPartition(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(None)
+
+ val result = loader.load(tp, coordinator)
+ assertFutureThrows(result, classOf[NotLeaderOrFollowerException])
+ }
+ }
+
+ @Test
+ def testLoadingIsRejectedWhenClosed(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ loader.close()
+
+ val result = loader.load(tp, coordinator)
+ assertFutureThrows(result, classOf[RuntimeException])
+ }
+ }
+
+ @Test
+ def testLoading(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+ val readResult1 = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 0L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult1)
+
+ val readResult2 = logReadResult(startOffset = 2, records = Seq(
+ new SimpleRecord("k3".getBytes, "v3".getBytes),
+ new SimpleRecord("k4".getBytes, "v4".getBytes),
+ new SimpleRecord("k5".getBytes, "v5".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 2L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult2)
+
+ assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+ verify(coordinator).replay(("k1", "v1"))
+ verify(coordinator).replay(("k2", "v2"))
+ verify(coordinator).replay(("k3", "v3"))
+ verify(coordinator).replay(("k4", "v4"))
+ verify(coordinator).replay(("k5", "v5"))
+ }
+ }
+
+ @Test
+ def testLoadingStoppedWhenClosed(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(100L))
+
+ val readResult = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ val latch = new CountDownLatch(1)
+ when(log.read(
+ startOffset = ArgumentMatchers.anyLong(),
+ maxLength = ArgumentMatchers.eq(1000),
+ isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+ minOneMessage = ArgumentMatchers.eq(true)
+ )).thenAnswer { _ =>
+ latch.countDown()
+ readResult
+ }
+
+ val result = loader.load(tp, coordinator)
+ latch.await(10, TimeUnit.SECONDS)
+ loader.close()
+
+ val ex = assertFutureThrows(result, classOf[RuntimeException])
+ assertEquals("Coordinator loader is closed.", ex.getMessage)
+ }
+ }
+
+ @Test
+ def testUnknownRecordTypeAreIgnored(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = mock(classOf[StringKeyValueDeserializer])
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
+
+ val readResult = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 0L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult)
+
+ when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
+ .thenThrow(new UnknownRecordTypeException(1))
+ .thenReturn(("k2", "v2"))
+
+ loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
+
+ verify(coordinator).replay(("k2", "v2"))
+ }
+ }
+
+ @Test
+ def testDeserializationErrorFailsTheLoading(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = mock(classOf[StringKeyValueDeserializer])
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
+
+ val readResult = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 0L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult)
+
+ when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
+ .thenThrow(new RuntimeException("Error!"))
+
+ val ex = assertFutureThrows(loader.load(tp, coordinator),
classOf[RuntimeException])
+ assertEquals("Error!", ex.getMessage)
+ }
+ }
+
+ @Test
+ def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = {
+ // Simulate a case where startOffset < endOffset but log is empty. This
could theoretically happen
+ // when all the records are expired and the active segment is truncated or
when the partition
+ // is accidentally corrupted.
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = mock(classOf[StringKeyValueDeserializer])
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(10L))
+
+ val readResult = logReadResult(startOffset = 0, records = Seq())
+
+ when(log.read(
+ startOffset = 0L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult)
+
+ assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+ }
+ }
+
+ private def logReadResult(
+ startOffset: Long,
+ records: Seq[SimpleRecord]
+ ): FetchDataInfo = {
+ val fileRecords = mock(classOf[FileRecords])
+ val memoryRecords = MemoryRecords.withRecords(
+ startOffset,
+ CompressionType.NONE,
+ records: _*
+ )
+
+ when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+ val bufferCapture: ArgumentCaptor[ByteBuffer] =
ArgumentCaptor.forClass(classOf[ByteBuffer])
+ when(fileRecords.readInto(
+ bufferCapture.capture(),
+ ArgumentMatchers.anyInt())
+ ).thenAnswer { _ =>
+ val buffer = bufferCapture.getValue
+ buffer.put(memoryRecords.buffer.duplicate)
+ buffer.flip()
+ }
+
+ new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java
new file mode 100644
index 00000000000..69a4f5fff96
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {@link Record}. The format is defined below:
+ * <pre>
+ * record_key = [record_type key_message]
+ * record_value = [value_version value_message]
+ *
+ * record_type : The record type is currently define as the version of
the key
+ * {@link ApiMessageAndVersion} object.
+ * key_message : The serialized message of the key {@link
ApiMessageAndVersion} object.
+ * value_version : The value version is currently define as the version
of the value
+ * {@link ApiMessageAndVersion} object.
+ * value_message : The serialized message of the value {@link
ApiMessageAndVersion} object.
+ * </pre>
+ */
+public class RecordSerde implements PartitionWriter.Serializer<Record>,
CoordinatorLoader.Deserializer<Record> {
+ @Override
+ public byte[] serializeKey(Record record) {
+ // Record does not accept a null key.
+ return MessageUtil.toVersionPrefixedBytes(
+ record.key().version(),
+ record.key().message()
+ );
+ }
+
+ @Override
+ public byte[] serializeValue(Record record) {
+ // Tombstone is represented with a null value.
+ if (record.value() == null) {
+ return null;
+ } else {
+ return MessageUtil.toVersionPrefixedBytes(
+ record.value().version(),
+ record.value().message()
+ );
+ }
+ }
+
+ @Override
+ public Record deserialize(
+ ByteBuffer keyBuffer,
+ ByteBuffer valueBuffer
+ ) throws RuntimeException {
+ final short recordType = readVersion(keyBuffer, "key");
+ final ApiMessage keyMessage = apiMessageKeyFor(recordType);
+ readMessage(keyMessage, keyBuffer, recordType, "key");
+
+ if (valueBuffer == null) {
+ return new Record(new ApiMessageAndVersion(keyMessage,
recordType), null);
+ }
+
+ final ApiMessage valueMessage = apiMessageValueFor(recordType);
+ final short valueVersion = readVersion(valueBuffer, "value");
+ readMessage(valueMessage, valueBuffer, valueVersion, "value");
+
+ return new Record(
+ new ApiMessageAndVersion(keyMessage, recordType),
+ new ApiMessageAndVersion(valueMessage, valueVersion)
+ );
+ }
+
+ private short readVersion(ByteBuffer buffer, String name) throws
RuntimeException {
+ try {
+ return buffer.getShort();
+ } catch (BufferUnderflowException ex) {
+ throw new RuntimeException(String.format("Could not read version
from %s's buffer.", name));
+ }
+ }
+
+ private void readMessage(ApiMessage message, ByteBuffer buffer, short
version, String name) throws RuntimeException {
+ try {
+ message.read(new ByteBufferAccessor(buffer), version);
+ } catch (RuntimeException ex) {
+ throw new RuntimeException(String.format("Could not read record
with version %d from %s's buffer due to: %s.",
+ version, name, ex.getMessage()), ex);
+ }
+ }
+
+ private ApiMessage apiMessageKeyFor(short recordType) {
+ switch (recordType) {
+ case 0:
+ case 1:
+ return new OffsetCommitKey();
+ case 2:
+ return new GroupMetadataKey();
+ case 3:
+ return new ConsumerGroupMetadataKey();
+ case 4:
+ return new ConsumerGroupPartitionMetadataKey();
+ case 5:
+ return new ConsumerGroupMemberMetadataKey();
+ case 6:
+ return new ConsumerGroupTargetAssignmentMetadataKey();
+ case 7:
+ return new ConsumerGroupTargetAssignmentMemberKey();
+ case 8:
+ return new ConsumerGroupCurrentMemberAssignmentKey();
+ default:
+ throw new
CoordinatorLoader.UnknownRecordTypeException(recordType);
+ }
+ }
+
+ private ApiMessage apiMessageValueFor(short recordType) {
+ switch (recordType) {
+ case 0:
+ case 1:
+ return new OffsetCommitValue();
+ case 2:
+ return new GroupMetadataValue();
+ case 3:
+ return new ConsumerGroupMetadataValue();
+ case 4:
+ return new ConsumerGroupPartitionMetadataValue();
+ case 5:
+ return new ConsumerGroupMemberMetadataValue();
+ case 6:
+ return new ConsumerGroupTargetAssignmentMetadataValue();
+ case 7:
+ return new ConsumerGroupTargetAssignmentMemberValue();
+ case 8:
+ return new ConsumerGroupCurrentMemberAssignmentValue();
+ default:
+ throw new
CoordinatorLoader.UnknownRecordTypeException(recordType);
+ }
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java
deleted file mode 100644
index 4736ebe5a4d..00000000000
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.coordinator.group;
-
-import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
-
-/**
- * Serializer which serializes {{@link Record}} to bytes.
- */
-public class RecordSerializer implements PartitionWriter.Serializer<Record> {
- @Override
- public byte[] serializeKey(Record record) {
- // Record does not accept a null key.
- return MessageUtil.toVersionPrefixedBytes(
- record.key().version(),
- record.key().message()
- );
- }
-
- @Override
- public byte[] serializeValue(Record record) {
- // Tombstone is represented with a null value.
- if (record.value() == null) {
- return null;
- } else {
- return MessageUtil.toVersionPrefixedBytes(
- record.value().version(),
- record.value().message()
- );
- }
- }
-}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
index c5de10a313c..0fe23b5dc69 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
+import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
/**
@@ -26,7 +27,40 @@ import java.util.concurrent.CompletableFuture;
*
* @param <U> The type of the record.
*/
-public interface CoordinatorLoader<U> {
+public interface CoordinatorLoader<U> extends AutoCloseable {
+
+ /**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+ class UnknownRecordTypeException extends RuntimeException {
+ private final short unknownType;
+
+ public UnknownRecordTypeException(short unknownType) {
+ super(String.format("Found an unknown record type %d",
unknownType));
+ this.unknownType = unknownType;
+ }
+
+ public short unknownType() {
+ return unknownType;
+ }
+ }
+
+ /**
+ * Deserializer to translates bytes to T.
+ *
+ * @param <T> The record type.
+ */
+ interface Deserializer<T> {
+ /**
+ * Deserializes the key and the value.
+ *
+ * @param key The key or null if not present.
+ * @param value The value or null if not present.
+ * @return The record.
+ */
+ T deserialize(ByteBuffer key, ByteBuffer value) throws
RuntimeException;
+ }
/**
* Loads the coordinator by reading all the records from the TopicPartition
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 51773eb402a..a18964cc3bb 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -1096,6 +1096,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> implements AutoClos
}
log.info("Closing coordinator runtime.");
+ loader.close();
// This close the processor, drain all the pending events and
// reject any new events.
processor.close();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java
new file mode 100644
index 00000000000..1f39e5ba3b6
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+ @Test
+ public void testSerializeKey() {
+ RecordSerde serializer = new RecordSerde();
+ Record record = new Record(
+ new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("group"),
+ (short) 3
+ ),
+ new ApiMessageAndVersion(
+ new ConsumerGroupMetadataValue().setEpoch(10),
+ (short) 0
+ )
+ );
+
+ assertArrayEquals(
+ MessageUtil.toVersionPrefixedBytes(record.key().version(),
record.key().message()),
+ serializer.serializeKey(record)
+ );
+ }
+
+ @Test
+ public void testSerializeValue() {
+ RecordSerde serializer = new RecordSerde();
+ Record record = new Record(
+ new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("group"),
+ (short) 3
+ ),
+ new ApiMessageAndVersion(
+ new ConsumerGroupMetadataValue().setEpoch(10),
+ (short) 0
+ )
+ );
+
+ assertArrayEquals(
+ MessageUtil.toVersionPrefixedBytes(record.value().version(),
record.value().message()),
+ serializer.serializeValue(record)
+ );
+ }
+
+ @Test
+ public void testSerializeNullValue() {
+ RecordSerde serializer = new RecordSerde();
+ Record record = new Record(
+ new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("group"),
+ (short) 1
+ ),
+ null
+ );
+
+ assertNull(serializer.serializeValue(record));
+ }
+
+ @Test
+ public void testDeserialize() {
+ RecordSerde serde = new RecordSerde();
+
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("foo"),
+ (short) 3
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ ApiMessageAndVersion value = new ApiMessageAndVersion(
+ new ConsumerGroupMetadataValue().setEpoch(10),
+ (short) 0
+ );
+ ByteBuffer valueBuffer =
MessageUtil.toVersionPrefixedByteBuffer(value.version(), value.message());
+
+ Record record = serde.deserialize(keyBuffer, valueBuffer);
+ assertEquals(key, record.key());
+ assertEquals(value, record.value());
+ }
+
+ @Test
+ public void testDeserializeWithTombstoneForValue() {
+ RecordSerde serde = new RecordSerde();
+
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("foo"),
+ (short) 3
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ Record record = serde.deserialize(keyBuffer, null);
+ assertEquals(key, record.key());
+ assertNull(record.value());
+ }
+
+ @Test
+ public void testDeserializeWithInvalidRecordType() {
+ RecordSerde serde = new RecordSerde();
+
+ ByteBuffer keyBuffer = ByteBuffer.allocate(64);
+ keyBuffer.putShort((short) 255);
+ keyBuffer.rewind();
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(64);
+
+ CoordinatorLoader.UnknownRecordTypeException ex =
+ assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertEquals((short) 255, ex.unknownType());
+ }
+
+ @Test
+ public void testDeserializeWithKeyEmptyBuffer() {
+ RecordSerde serde = new RecordSerde();
+
+ ByteBuffer keyBuffer = ByteBuffer.allocate(0);
+ ByteBuffer valueBuffer = ByteBuffer.allocate(64);
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertEquals("Could not read version from key's buffer.",
ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeWithValueEmptyBuffer() {
+ RecordSerde serde = new RecordSerde();
+
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("foo"),
+ (short) 3
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(0);
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertEquals("Could not read version from value's buffer.",
ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeWithInvalidKeyBytes() {
+ RecordSerde serde = new RecordSerde();
+
+ ByteBuffer keyBuffer = ByteBuffer.allocate(2);
+ keyBuffer.putShort((short) 3);
+ keyBuffer.rewind();
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(2);
+ valueBuffer.putShort((short) 0);
+ valueBuffer.rewind();
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertTrue(ex.getMessage().startsWith("Could not read record with
version 3 from key's buffer due to"),
+ ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeWithInvalidValueBytes() {
+ RecordSerde serde = new RecordSerde();
+
+ ApiMessageAndVersion key = new ApiMessageAndVersion(
+ new ConsumerGroupMetadataKey().setGroupId("foo"),
+ (short) 3
+ );
+ ByteBuffer keyBuffer =
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+ ByteBuffer valueBuffer = ByteBuffer.allocate(2);
+ valueBuffer.putShort((short) 0);
+ valueBuffer.rewind();
+
+ RuntimeException ex =
+ assertThrows(RuntimeException.class,
+ () -> serde.deserialize(keyBuffer, valueBuffer));
+ assertTrue(ex.getMessage().startsWith("Could not read record with
version 0 from value's buffer due to"),
+ ex.getMessage());
+ }
+
+ @Test
+ public void testDeserializeAllRecordTypes() {
+ roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue());
+ roundTrip((short) 1, new OffsetCommitKey(), new OffsetCommitValue());
+ roundTrip((short) 2, new GroupMetadataKey(), new GroupMetadataValue());
+ roundTrip((short) 3, new ConsumerGroupMetadataKey(), new
ConsumerGroupMetadataValue());
+ roundTrip((short) 4, new ConsumerGroupPartitionMetadataKey(), new
ConsumerGroupPartitionMetadataValue());
+ roundTrip((short) 5, new ConsumerGroupMemberMetadataKey(), new
ConsumerGroupMemberMetadataValue());
+ roundTrip((short) 6, new ConsumerGroupTargetAssignmentMetadataKey(),
new ConsumerGroupTargetAssignmentMetadataValue());
+ roundTrip((short) 7, new ConsumerGroupTargetAssignmentMemberKey(), new
ConsumerGroupTargetAssignmentMemberValue());
+ roundTrip((short) 8, new ConsumerGroupCurrentMemberAssignmentKey(),
new ConsumerGroupCurrentMemberAssignmentValue());
+ }
+
+ private void roundTrip(
+ short recordType,
+ ApiMessage key,
+ ApiMessage val
+ ) {
+ RecordSerde serde = new RecordSerde();
+
+ for (short version = val.lowestSupportedVersion(); version <
val.highestSupportedVersion(); version++) {
+ ApiMessageAndVersion keyMessageAndVersion = new
ApiMessageAndVersion(key, recordType);
+ ApiMessageAndVersion valMessageAndVersion = new
ApiMessageAndVersion(val, version);
+
+ Record record = serde.deserialize(
+ MessageUtil.toVersionPrefixedByteBuffer(recordType, key),
+ MessageUtil.toVersionPrefixedByteBuffer(version, val)
+ );
+
+ assertEquals(keyMessageAndVersion, record.key());
+ assertEquals(valMessageAndVersion, record.value());
+ }
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java
deleted file mode 100644
index dfb4f375ea2..00000000000
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.coordinator.group;
-
-import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
-import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-public class RecordSerializerTest {
- @Test
- public void testSerializeKey() {
- RecordSerializer serializer = new RecordSerializer();
- Record record = new Record(
- new ApiMessageAndVersion(
- new ConsumerGroupMetadataKey().setGroupId("group"),
- (short) 1
- ),
- new ApiMessageAndVersion(
- new ConsumerGroupMetadataValue().setEpoch(10),
- (short) 0
- )
- );
-
- assertArrayEquals(
- MessageUtil.toVersionPrefixedBytes(record.key().version(),
record.key().message()),
- serializer.serializeKey(record)
- );
- }
-
- @Test
- public void testSerializeValue() {
- RecordSerializer serializer = new RecordSerializer();
- Record record = new Record(
- new ApiMessageAndVersion(
- new ConsumerGroupMetadataKey().setGroupId("group"),
- (short) 1
- ),
- new ApiMessageAndVersion(
- new ConsumerGroupMetadataValue().setEpoch(10),
- (short) 0
- )
- );
-
- assertArrayEquals(
- MessageUtil.toVersionPrefixedBytes(record.value().version(),
record.value().message()),
- serializer.serializeValue(record)
- );
- }
-
- @Test
- public void testSerializeNullValue() {
- RecordSerializer serializer = new RecordSerializer();
- Record record = new Record(
- new ApiMessageAndVersion(
- new ConsumerGroupMetadataKey().setGroupId("group"),
- (short) 1
- ),
- null
- );
-
- assertNull(serializer.serializeValue(record));
- }
-}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 201e11c0005..7c40721d5ae 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -45,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -78,6 +79,9 @@ public class CoordinatorRuntimeTest {
public CompletableFuture<Void> load(TopicPartition tp,
CoordinatorPlayback<String> replayable) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public void close() throws Exception { }
}
/**
@@ -790,9 +794,11 @@ public class CoordinatorRuntimeTest {
@Test
public void testClose() throws Exception {
+ MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
+
CoordinatorRuntime<MockCoordinator, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinator, String>()
- .withLoader(new MockCoordinatorLoader())
+ .withLoader(loader)
.withEventProcessor(new MockEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorBuilderSupplier(new
MockCoordinatorBuilderSupplier())
@@ -824,5 +830,8 @@ public class CoordinatorRuntimeTest {
// All the pending operations are completed with
NotCoordinatorException.
assertFutureThrows(write1, NotCoordinatorException.class);
assertFutureThrows(write2, NotCoordinatorException.class);
+
+ // Verify that the loader was closed.
+ verify(loader).close();
}
}