This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 482299c4e2c KAFKA-14462; [19/N] Add CoordinatorLoader implementation 
(#13880)
482299c4e2c is described below

commit 482299c4e2c97315feea3db5d6bf0e5c2c8b8cc7
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 29 08:12:53 2023 +0200

    KAFKA-14462; [19/N] Add CoordinatorLoader implementation (#13880)
    
    This patch adds a coordinator loader implementation.
    
    Reviewers: Jeff Kim <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../coordinator/group/CoordinatorLoaderImpl.scala  | 170 +++++++++++
 .../group/CoordinatorLoaderImplTest.scala          | 315 +++++++++++++++++++++
 .../kafka/coordinator/group/RecordSerde.java       | 169 +++++++++++
 .../kafka/coordinator/group/RecordSerializer.java  |  47 ---
 .../group/runtime/CoordinatorLoader.java           |  36 ++-
 .../group/runtime/CoordinatorRuntime.java          |   1 +
 .../kafka/coordinator/group/RecordSerdeTest.java   | 260 +++++++++++++++++
 .../coordinator/group/RecordSerializerTest.java    |  82 ------
 .../group/runtime/CoordinatorRuntimeTest.java      |  11 +-
 9 files changed, 960 insertions(+), 131 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
new file mode 100644
index 00000000000..6b67d1a7a6b
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializer    The deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp          The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+    tp: TopicPartition,
+    coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+    val future = new CompletableFuture[Void]()
+    val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+      () => doLoad(tp, coordinator, future))
+    if (result.isCancelled) {
+      future.completeExceptionally(new RuntimeException("Coordinator loader is 
closed."))
+    }
+    future
+  }
+
+  private def doLoad(
+    tp: TopicPartition,
+    coordinator: CoordinatorPlayback[T],
+    future: CompletableFuture[Void]
+  ): Unit = {
+    try {
+      replicaManager.getLog(tp) match {
+        case None =>
+          future.completeExceptionally(new NotLeaderOrFollowerException(
+            s"Could not load records from $tp because the log does not 
exist."))
+
+        case Some(log) =>
+          def logEndOffset: Long = 
replicaManager.getLogEndOffset(tp).getOrElse(-1L)
+
+          // Buffer may not be needed if records are read from memory.
+          var buffer = ByteBuffer.allocate(0)
+          // Loop breaks if leader changes at any time during the load, since 
logEndOffset is -1.
+          var currentOffset = log.logStartOffset
+          // Loop breaks if no records have been read, since the end of the 
log has been reached.
+          // This is to ensure that the loop breaks even if the current offset 
remains smaller than
+          // the log end offset but the log is empty. This could happen with 
compacted topics.
+          var readAtLeastOneRecord = true
+
+          while (currentOffset < logEndOffset && readAtLeastOneRecord && 
isRunning.get) {
+            val fetchDataInfo = log.read(
+              startOffset = currentOffset,
+              maxLength = loadBufferSize,
+              isolation = FetchIsolation.LOG_END,
+              minOneMessage = true
+            )
+
+            readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
+
+            val memoryRecords = (fetchDataInfo.records: @unchecked) match {
+              case records: MemoryRecords =>
+                records
+
+              case fileRecords: FileRecords =>
+                val sizeInBytes = fileRecords.sizeInBytes
+                val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
+
+                // "minOneMessage = true in the above log.read() means that 
the buffer may need to
+                // be grown to ensure progress can be made.
+                if (buffer.capacity < bytesNeeded) {
+                  if (loadBufferSize < bytesNeeded)
+                    warn(s"Loaded metadata from $tp with buffer larger 
($bytesNeeded bytes) than " +
+                      s"configured buffer size ($loadBufferSize bytes).")
+
+                  buffer = ByteBuffer.allocate(bytesNeeded)
+                } else {
+                  buffer.clear()
+                }
+
+                fileRecords.readInto(buffer, 0)
+                MemoryRecords.readableRecords(buffer)
+            }
+
+            memoryRecords.batches.forEach { batch =>
+              if (batch.isControlBatch) {
+                throw new IllegalStateException("Control batches are not 
supported yet.")
+              } else {
+                batch.asScala.foreach { record =>
+                  try {
+                    coordinator.replay(deserializer.deserialize(record.key, 
record.value))
+                  } catch {
+                    case ex: UnknownRecordTypeException =>
+                      warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
+                        s"from $tp. Ignoring it. It could be a left over from 
an aborted upgrade.")
+                  }
+                }
+              }
+
+              currentOffset = batch.nextOffset
+            }
+          }
+
+          if (isRunning.get) {
+            future.complete(null)
+          } else {
+            future.completeExceptionally(new RuntimeException("Coordinator 
loader is closed."))
+          }
+      }
+    } catch {
+      case ex: Throwable =>
+        future.completeExceptionally(ex)
+    }
+  }
+
+  /**
+   * Closes the loader.
+   */
+  override def close(): Unit = {
+    if (!isRunning.compareAndSet(true, false)) {
+      warn("Coordinator loader is already shutting down.")
+      return
+    }
+    scheduler.shutdown()
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
new file mode 100644
index 00000000000..c71c28d47ea
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.log.UnifiedLog
+import kafka.server.ReplicaManager
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{CompressionType, FileRecords, 
MemoryRecords, SimpleRecord}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, 
LogOffsetMetadata}
+import org.apache.kafka.test.TestUtils.assertFutureThrows
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
+import org.junit.jupiter.api.{Test, Timeout}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify, when}
+
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class StringKeyValueDeserializer extends 
CoordinatorLoader.Deserializer[(String, String)] {
+  override def deserialize(key: ByteBuffer, value: ByteBuffer): (String, 
String) = {
+    (
+      Charset.defaultCharset().decode(key).toString,
+      Charset.defaultCharset().decode(value).toString
+    )
+  }
+}
+
+@Timeout(60)
+class CoordinatorLoaderImplTest {
+  @Test
+  def testNonexistentPartition(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(None)
+
+      val result = loader.load(tp, coordinator)
+      assertFutureThrows(result, classOf[NotLeaderOrFollowerException])
+    }
+  }
+
+  @Test
+  def testLoadingIsRejectedWhenClosed(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      loader.close()
+
+      val result = loader.load(tp, coordinator)
+      assertFutureThrows(result, classOf[RuntimeException])
+    }
+  }
+
+  @Test
+  def testLoading(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+      val readResult1 = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 0L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult1)
+
+      val readResult2 = logReadResult(startOffset = 2, records = Seq(
+        new SimpleRecord("k3".getBytes, "v3".getBytes),
+        new SimpleRecord("k4".getBytes, "v4".getBytes),
+        new SimpleRecord("k5".getBytes, "v5".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 2L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult2)
+
+      assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+      verify(coordinator).replay(("k1", "v1"))
+      verify(coordinator).replay(("k2", "v2"))
+      verify(coordinator).replay(("k3", "v3"))
+      verify(coordinator).replay(("k4", "v4"))
+      verify(coordinator).replay(("k5", "v5"))
+    }
+  }
+
+  @Test
+  def testLoadingStoppedWhenClosed(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(100L))
+
+      val readResult = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      val latch = new CountDownLatch(1)
+      when(log.read(
+        startOffset = ArgumentMatchers.anyLong(),
+        maxLength = ArgumentMatchers.eq(1000),
+        isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+        minOneMessage = ArgumentMatchers.eq(true)
+      )).thenAnswer { _ =>
+        latch.countDown()
+        readResult
+      }
+
+      val result = loader.load(tp, coordinator)
+      latch.await(10, TimeUnit.SECONDS)
+      loader.close()
+
+      val ex = assertFutureThrows(result, classOf[RuntimeException])
+      assertEquals("Coordinator loader is closed.", ex.getMessage)
+    }
+  }
+
+  @Test
+  def testUnknownRecordTypeAreIgnored(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = mock(classOf[StringKeyValueDeserializer])
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
+
+      val readResult = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 0L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult)
+
+      when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
+        .thenThrow(new UnknownRecordTypeException(1))
+        .thenReturn(("k2", "v2"))
+
+      loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
+
+      verify(coordinator).replay(("k2", "v2"))
+    }
+  }
+
+  @Test
+  def testDeserializationErrorFailsTheLoading(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = mock(classOf[StringKeyValueDeserializer])
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
+
+      val readResult = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 0L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult)
+
+      when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
+        .thenThrow(new RuntimeException("Error!"))
+
+      val ex = assertFutureThrows(loader.load(tp, coordinator), 
classOf[RuntimeException])
+      assertEquals("Error!", ex.getMessage)
+    }
+  }
+
+  @Test
+  def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = {
+    // Simulate a case where startOffset < endOffset but log is empty. This 
could theoretically happen
+    // when all the records are expired and the active segment is truncated or 
when the partition
+    // is accidentally corrupted.
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = mock(classOf[StringKeyValueDeserializer])
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(10L))
+
+      val readResult = logReadResult(startOffset = 0, records = Seq())
+
+      when(log.read(
+        startOffset = 0L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult)
+
+      assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+    }
+  }
+
+  private def logReadResult(
+    startOffset: Long,
+    records: Seq[SimpleRecord]
+  ): FetchDataInfo = {
+    val fileRecords = mock(classOf[FileRecords])
+    val memoryRecords = MemoryRecords.withRecords(
+      startOffset,
+      CompressionType.NONE,
+      records: _*
+    )
+
+    when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+    val bufferCapture: ArgumentCaptor[ByteBuffer] = 
ArgumentCaptor.forClass(classOf[ByteBuffer])
+    when(fileRecords.readInto(
+      bufferCapture.capture(),
+      ArgumentMatchers.anyInt())
+    ).thenAnswer { _ =>
+      val buffer = bufferCapture.getValue
+      buffer.put(memoryRecords.buffer.duplicate)
+      buffer.flip()
+    }
+
+    new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
+  }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java
new file mode 100644
index 00000000000..69a4f5fff96
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {@link Record}. The format is defined below:
+ * <pre>
+ *     record_key   = [record_type key_message]
+ *     record_value = [value_version value_message]
+ *
+ *     record_type     : The record type is currently define as the version of 
the key
+ *                       {@link ApiMessageAndVersion} object.
+ *     key_message     : The serialized message of the key {@link 
ApiMessageAndVersion} object.
+ *     value_version   : The value version is currently define as the version 
of the value
+ *                       {@link ApiMessageAndVersion} object.
+ *     value_message   : The serialized message of the value {@link 
ApiMessageAndVersion} object.
+ * </pre>
+ */
+public class RecordSerde implements PartitionWriter.Serializer<Record>, 
CoordinatorLoader.Deserializer<Record> {
+    @Override
+    public byte[] serializeKey(Record record) {
+        // Record does not accept a null key.
+        return MessageUtil.toVersionPrefixedBytes(
+            record.key().version(),
+            record.key().message()
+        );
+    }
+
+    @Override
+    public byte[] serializeValue(Record record) {
+        // Tombstone is represented with a null value.
+        if (record.value() == null) {
+            return null;
+        } else {
+            return MessageUtil.toVersionPrefixedBytes(
+                record.value().version(),
+                record.value().message()
+            );
+        }
+    }
+
+    @Override
+    public Record deserialize(
+        ByteBuffer keyBuffer,
+        ByteBuffer valueBuffer
+    ) throws RuntimeException {
+        final short recordType = readVersion(keyBuffer, "key");
+        final ApiMessage keyMessage = apiMessageKeyFor(recordType);
+        readMessage(keyMessage, keyBuffer, recordType, "key");
+
+        if (valueBuffer == null) {
+            return new Record(new ApiMessageAndVersion(keyMessage, 
recordType), null);
+        }
+
+        final ApiMessage valueMessage = apiMessageValueFor(recordType);
+        final short valueVersion = readVersion(valueBuffer, "value");
+        readMessage(valueMessage, valueBuffer, valueVersion, "value");
+
+        return new Record(
+            new ApiMessageAndVersion(keyMessage, recordType),
+            new ApiMessageAndVersion(valueMessage, valueVersion)
+        );
+    }
+
+    private short readVersion(ByteBuffer buffer, String name) throws 
RuntimeException {
+        try {
+            return buffer.getShort();
+        } catch (BufferUnderflowException ex) {
+            throw new RuntimeException(String.format("Could not read version 
from %s's buffer.", name));
+        }
+    }
+
+    private void readMessage(ApiMessage message, ByteBuffer buffer, short 
version, String name) throws RuntimeException {
+        try {
+            message.read(new ByteBufferAccessor(buffer), version);
+        } catch (RuntimeException ex) {
+            throw new RuntimeException(String.format("Could not read record 
with version %d from %s's buffer due to: %s.",
+                version, name, ex.getMessage()), ex);
+        }
+    }
+
+    private ApiMessage apiMessageKeyFor(short recordType) {
+        switch (recordType) {
+            case 0:
+            case 1:
+                return new OffsetCommitKey();
+            case 2:
+                return new GroupMetadataKey();
+            case 3:
+                return new ConsumerGroupMetadataKey();
+            case 4:
+                return new ConsumerGroupPartitionMetadataKey();
+            case 5:
+                return new ConsumerGroupMemberMetadataKey();
+            case 6:
+                return new ConsumerGroupTargetAssignmentMetadataKey();
+            case 7:
+                return new ConsumerGroupTargetAssignmentMemberKey();
+            case 8:
+                return new ConsumerGroupCurrentMemberAssignmentKey();
+            default:
+                throw new 
CoordinatorLoader.UnknownRecordTypeException(recordType);
+        }
+    }
+
+    private ApiMessage apiMessageValueFor(short recordType) {
+        switch (recordType) {
+            case 0:
+            case 1:
+                return new OffsetCommitValue();
+            case 2:
+                return new GroupMetadataValue();
+            case 3:
+                return new ConsumerGroupMetadataValue();
+            case 4:
+                return new ConsumerGroupPartitionMetadataValue();
+            case 5:
+                return new ConsumerGroupMemberMetadataValue();
+            case 6:
+                return new ConsumerGroupTargetAssignmentMetadataValue();
+            case 7:
+                return new ConsumerGroupTargetAssignmentMemberValue();
+            case 8:
+                return new ConsumerGroupCurrentMemberAssignmentValue();
+            default:
+                throw new 
CoordinatorLoader.UnknownRecordTypeException(recordType);
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java
deleted file mode 100644
index 4736ebe5a4d..00000000000
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.coordinator.group;
-
-import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
-
-/**
- * Serializer which serializes {{@link Record}} to bytes.
- */
-public class RecordSerializer implements PartitionWriter.Serializer<Record> {
-    @Override
-    public byte[] serializeKey(Record record) {
-        // Record does not accept a null key.
-        return MessageUtil.toVersionPrefixedBytes(
-            record.key().version(),
-            record.key().message()
-        );
-    }
-
-    @Override
-    public byte[] serializeValue(Record record) {
-        // Tombstone is represented with a null value.
-        if (record.value() == null) {
-            return null;
-        } else {
-            return MessageUtil.toVersionPrefixedBytes(
-                record.value().version(),
-                record.value().message()
-            );
-        }
-    }
-}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
index c5de10a313c..0fe23b5dc69 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.runtime;
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -26,7 +27,40 @@ import java.util.concurrent.CompletableFuture;
  *
  * @param <U> The type of the record.
  */
-public interface CoordinatorLoader<U> {
+public interface CoordinatorLoader<U> extends AutoCloseable {
+
+    /**
+     * UnknownRecordTypeException is thrown when the Deserializer encounters
+     * an unknown record type.
+     */
+    class UnknownRecordTypeException extends RuntimeException {
+        private final short unknownType;
+
+        public UnknownRecordTypeException(short unknownType) {
+            super(String.format("Found an unknown record type %d", 
unknownType));
+            this.unknownType = unknownType;
+        }
+
+        public short unknownType() {
+            return unknownType;
+        }
+    }
+
+    /**
+     * Deserializer to translates bytes to T.
+     *
+     * @param <T> The record type.
+     */
+    interface Deserializer<T> {
+        /**
+         * Deserializes the key and the value.
+         *
+         * @param key   The key or null if not present.
+         * @param value The value or null if not present.
+         * @return The record.
+         */
+        T deserialize(ByteBuffer key, ByteBuffer value) throws 
RuntimeException;
+    }
 
     /**
      * Loads the coordinator by reading all the records from the TopicPartition
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 51773eb402a..a18964cc3bb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -1096,6 +1096,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> implements AutoClos
         }
 
         log.info("Closing coordinator runtime.");
+        loader.close();
         // This close the processor, drain all the pending events and
         // reject any new events.
         processor.close();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java
new file mode 100644
index 00000000000..1f39e5ba3b6
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+    @Test
+    public void testSerializeKey() {
+        RecordSerde serializer = new RecordSerde();
+        Record record = new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataKey().setGroupId("group"),
+                (short) 3
+            ),
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataValue().setEpoch(10),
+                (short) 0
+            )
+        );
+
+        assertArrayEquals(
+            MessageUtil.toVersionPrefixedBytes(record.key().version(), 
record.key().message()),
+            serializer.serializeKey(record)
+        );
+    }
+
+    @Test
+    public void testSerializeValue() {
+        RecordSerde serializer = new RecordSerde();
+        Record record = new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataKey().setGroupId("group"),
+                (short) 3
+            ),
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataValue().setEpoch(10),
+                (short) 0
+            )
+        );
+
+        assertArrayEquals(
+            MessageUtil.toVersionPrefixedBytes(record.value().version(), 
record.value().message()),
+            serializer.serializeValue(record)
+        );
+    }
+
+    @Test
+    public void testSerializeNullValue() {
+        RecordSerde serializer = new RecordSerde();
+        Record record = new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataKey().setGroupId("group"),
+                (short) 1
+            ),
+            null
+        );
+
+        assertNull(serializer.serializeValue(record));
+    }
+
+    @Test
+    public void testDeserialize() {
+        RecordSerde serde = new RecordSerde();
+
+        ApiMessageAndVersion key = new ApiMessageAndVersion(
+            new ConsumerGroupMetadataKey().setGroupId("foo"),
+            (short) 3
+        );
+        ByteBuffer keyBuffer = 
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+        ApiMessageAndVersion value = new ApiMessageAndVersion(
+            new ConsumerGroupMetadataValue().setEpoch(10),
+            (short) 0
+        );
+        ByteBuffer valueBuffer = 
MessageUtil.toVersionPrefixedByteBuffer(value.version(), value.message());
+
+        Record record = serde.deserialize(keyBuffer, valueBuffer);
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+    }
+
+    @Test
+    public void testDeserializeWithTombstoneForValue() {
+        RecordSerde serde = new RecordSerde();
+
+        ApiMessageAndVersion key = new ApiMessageAndVersion(
+            new ConsumerGroupMetadataKey().setGroupId("foo"),
+            (short) 3
+        );
+        ByteBuffer keyBuffer = 
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+        Record record = serde.deserialize(keyBuffer, null);
+        assertEquals(key, record.key());
+        assertNull(record.value());
+    }
+
+    @Test
+    public void testDeserializeWithInvalidRecordType() {
+        RecordSerde serde = new RecordSerde();
+
+        ByteBuffer keyBuffer = ByteBuffer.allocate(64);
+        keyBuffer.putShort((short) 255);
+        keyBuffer.rewind();
+
+        ByteBuffer valueBuffer = ByteBuffer.allocate(64);
+
+        CoordinatorLoader.UnknownRecordTypeException ex =
+            assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
+                () -> serde.deserialize(keyBuffer, valueBuffer));
+        assertEquals((short) 255, ex.unknownType());
+    }
+
+    @Test
+    public void testDeserializeWithKeyEmptyBuffer() {
+        RecordSerde serde = new RecordSerde();
+
+        ByteBuffer keyBuffer = ByteBuffer.allocate(0);
+        ByteBuffer valueBuffer = ByteBuffer.allocate(64);
+
+        RuntimeException ex =
+            assertThrows(RuntimeException.class,
+                () -> serde.deserialize(keyBuffer, valueBuffer));
+        assertEquals("Could not read version from key's buffer.", 
ex.getMessage());
+    }
+
+    @Test
+    public void testDeserializeWithValueEmptyBuffer() {
+        RecordSerde serde = new RecordSerde();
+
+        ApiMessageAndVersion key = new ApiMessageAndVersion(
+            new ConsumerGroupMetadataKey().setGroupId("foo"),
+            (short) 3
+        );
+        ByteBuffer keyBuffer = 
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+        ByteBuffer valueBuffer = ByteBuffer.allocate(0);
+
+        RuntimeException ex =
+            assertThrows(RuntimeException.class,
+                () -> serde.deserialize(keyBuffer, valueBuffer));
+        assertEquals("Could not read version from value's buffer.", 
ex.getMessage());
+    }
+
+    @Test
+    public void testDeserializeWithInvalidKeyBytes() {
+        RecordSerde serde = new RecordSerde();
+
+        ByteBuffer keyBuffer = ByteBuffer.allocate(2);
+        keyBuffer.putShort((short) 3);
+        keyBuffer.rewind();
+
+        ByteBuffer valueBuffer = ByteBuffer.allocate(2);
+        valueBuffer.putShort((short) 0);
+        valueBuffer.rewind();
+
+        RuntimeException ex =
+            assertThrows(RuntimeException.class,
+                () -> serde.deserialize(keyBuffer, valueBuffer));
+        assertTrue(ex.getMessage().startsWith("Could not read record with 
version 3 from key's buffer due to"),
+            ex.getMessage());
+    }
+
+    @Test
+    public void testDeserializeWithInvalidValueBytes() {
+        RecordSerde serde = new RecordSerde();
+
+        ApiMessageAndVersion key = new ApiMessageAndVersion(
+            new ConsumerGroupMetadataKey().setGroupId("foo"),
+            (short) 3
+        );
+        ByteBuffer keyBuffer = 
MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());
+
+        ByteBuffer valueBuffer = ByteBuffer.allocate(2);
+        valueBuffer.putShort((short) 0);
+        valueBuffer.rewind();
+
+        RuntimeException ex =
+            assertThrows(RuntimeException.class,
+                () -> serde.deserialize(keyBuffer, valueBuffer));
+        assertTrue(ex.getMessage().startsWith("Could not read record with 
version 0 from value's buffer due to"),
+            ex.getMessage());
+    }
+
+    @Test
+    public void testDeserializeAllRecordTypes() {
+        roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue());
+        roundTrip((short) 1, new OffsetCommitKey(), new OffsetCommitValue());
+        roundTrip((short) 2, new GroupMetadataKey(), new GroupMetadataValue());
+        roundTrip((short) 3, new ConsumerGroupMetadataKey(), new 
ConsumerGroupMetadataValue());
+        roundTrip((short) 4, new ConsumerGroupPartitionMetadataKey(), new 
ConsumerGroupPartitionMetadataValue());
+        roundTrip((short) 5, new ConsumerGroupMemberMetadataKey(), new 
ConsumerGroupMemberMetadataValue());
+        roundTrip((short) 6, new ConsumerGroupTargetAssignmentMetadataKey(), 
new ConsumerGroupTargetAssignmentMetadataValue());
+        roundTrip((short) 7, new ConsumerGroupTargetAssignmentMemberKey(), new 
ConsumerGroupTargetAssignmentMemberValue());
+        roundTrip((short) 8, new ConsumerGroupCurrentMemberAssignmentKey(), 
new ConsumerGroupCurrentMemberAssignmentValue());
+    }
+
+    private void roundTrip(
+        short recordType,
+        ApiMessage key,
+        ApiMessage val
+    ) {
+        RecordSerde serde = new RecordSerde();
+
+        for (short version = val.lowestSupportedVersion(); version < 
val.highestSupportedVersion(); version++) {
+            ApiMessageAndVersion keyMessageAndVersion = new 
ApiMessageAndVersion(key, recordType);
+            ApiMessageAndVersion valMessageAndVersion = new 
ApiMessageAndVersion(val, version);
+
+            Record record = serde.deserialize(
+                MessageUtil.toVersionPrefixedByteBuffer(recordType, key),
+                MessageUtil.toVersionPrefixedByteBuffer(version, val)
+            );
+
+            assertEquals(keyMessageAndVersion, record.key());
+            assertEquals(valMessageAndVersion, record.value());
+        }
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java
deleted file mode 100644
index dfb4f375ea2..00000000000
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.coordinator.group;
-
-import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
-import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-public class RecordSerializerTest {
-    @Test
-    public void testSerializeKey() {
-        RecordSerializer serializer = new RecordSerializer();
-        Record record = new Record(
-            new ApiMessageAndVersion(
-                new ConsumerGroupMetadataKey().setGroupId("group"),
-                (short) 1
-            ),
-            new ApiMessageAndVersion(
-                new ConsumerGroupMetadataValue().setEpoch(10),
-                (short) 0
-            )
-        );
-
-        assertArrayEquals(
-            MessageUtil.toVersionPrefixedBytes(record.key().version(), 
record.key().message()),
-            serializer.serializeKey(record)
-        );
-    }
-
-    @Test
-    public void testSerializeValue() {
-        RecordSerializer serializer = new RecordSerializer();
-        Record record = new Record(
-            new ApiMessageAndVersion(
-                new ConsumerGroupMetadataKey().setGroupId("group"),
-                (short) 1
-            ),
-            new ApiMessageAndVersion(
-                new ConsumerGroupMetadataValue().setEpoch(10),
-                (short) 0
-            )
-        );
-
-        assertArrayEquals(
-            MessageUtil.toVersionPrefixedBytes(record.value().version(), 
record.value().message()),
-            serializer.serializeValue(record)
-        );
-    }
-
-    @Test
-    public void testSerializeNullValue() {
-        RecordSerializer serializer = new RecordSerializer();
-        Record record = new Record(
-            new ApiMessageAndVersion(
-                new ConsumerGroupMetadataKey().setGroupId("group"),
-                (short) 1
-            ),
-            null
-        );
-
-        assertNull(serializer.serializeValue(record));
-    }
-}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 201e11c0005..7c40721d5ae 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -45,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -78,6 +79,9 @@ public class CoordinatorRuntimeTest {
         public CompletableFuture<Void> load(TopicPartition tp, 
CoordinatorPlayback<String> replayable) {
             return CompletableFuture.completedFuture(null);
         }
+
+        @Override
+        public void close() throws Exception { }
     }
 
     /**
@@ -790,9 +794,11 @@ public class CoordinatorRuntimeTest {
 
     @Test
     public void testClose() throws Exception {
+        MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
+
         CoordinatorRuntime<MockCoordinator, String> runtime =
             new CoordinatorRuntime.Builder<MockCoordinator, String>()
-                .withLoader(new MockCoordinatorLoader())
+                .withLoader(loader)
                 .withEventProcessor(new MockEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorBuilderSupplier(new 
MockCoordinatorBuilderSupplier())
@@ -824,5 +830,8 @@ public class CoordinatorRuntimeTest {
         // All the pending operations are completed with 
NotCoordinatorException.
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
+
+        // Verify that the loader was closed.
+        verify(loader).close();
     }
 }

Reply via email to