This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a81486e4f82 KAFKA-14462; [18/N] Add GroupCoordinatorService (#13812)
a81486e4f82 is described below

commit a81486e4f822b646039586ed0f70f9d75ec3ae36
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 22 09:06:10 2023 +0200

    KAFKA-14462; [18/N] Add GroupCoordinatorService (#13812)
    
    This patch introduces the GroupCoordinatorService. This is the new 
(incomplete) implementation of the group coordinator based on the coordinator 
runtime introduced in https://github.com/apache/kafka/pull/13795.
    
    Reviewers: Divij Vaidya <[email protected]>, Justine Olshan 
<[email protected]>
---
 checkstyle/import-control.xml                      |   3 +
 .../coordinator/group/GroupCoordinatorConfig.java  |  74 +++
 .../coordinator/group/GroupCoordinatorService.java | 581 +++++++++++++++++++++
 .../group/ReplicatedGroupCoordinator.java          | 203 +++++++
 .../coordinator/group/runtime/Coordinator.java     |   4 +-
 .../group/runtime/CoordinatorBuilder.java          |  14 +-
 .../group/runtime/CoordinatorBuilderSupplier.java  |   2 +-
 .../group/runtime/CoordinatorLoader.java           |   2 +-
 .../group/runtime/CoordinatorRuntime.java          |  32 +-
 .../group/GroupCoordinatorConfigTest.java          |  47 ++
 .../group/GroupCoordinatorServiceTest.java         | 274 ++++++++++
 .../group/ReplicatedGroupCoordinatorTest.java      | 308 +++++++++++
 .../apache/kafka/coordinator/group/TestUtil.java   |  49 ++
 .../group/runtime/CoordinatorRuntimeTest.java      |  14 +
 14 files changed, 1600 insertions(+), 7 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4de0671d77a..ee8355fef12 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -223,6 +223,8 @@
     <subpackage name="group">
       <allow pkg="org.apache.kafka.clients.consumer" />
       <allow pkg="org.apache.kafka.common.annotation" />
+      <allow pkg="org.apache.kafka.common.config" />
+      <allow pkg="org.apache.kafka.common.internals" />
       <allow pkg="org.apache.kafka.common.message" />
       <allow pkg="org.apache.kafka.common.metadata" />
       <allow pkg="org.apache.kafka.common.network" />
@@ -232,6 +234,7 @@
       <allow pkg="org.apache.kafka.deferred" />
       <allow pkg="org.apache.kafka.image"/>
       <allow pkg="org.apache.kafka.server.common"/>
+      <allow pkg="org.apache.kafka.server.record"/>
       <allow pkg="org.apache.kafka.server.util"/>
       <allow pkg="org.apache.kafka.test" />
       <allow pkg="org.apache.kafka.timeline" />
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
new file mode 100644
index 00000000000..f4b23a429fa
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+
+import java.util.List;
+
+/**
+ * The group coordinator configurations.
+ */
+public class GroupCoordinatorConfig {
+
+    /**
+     * The number of threads or event loops running.
+     */
+    public final int numThreads;
+
+    /**
+     * The consumer group session timeout in milliseconds.
+     */
+    public final int consumerGroupSessionTimeoutMs;
+
+    /**
+     * The consumer group heartbeat interval in milliseconds.
+     */
+    public final int consumerGroupHeartbeatIntervalMs;
+
+    /**
+     * The consumer group maximum size.
+     */
+    public final int consumerGroupMaxSize;
+
+    /**
+     * The consumer group assignors.
+     */
+    public final List<PartitionAssignor> consumerGroupAssignors;
+
+    /**
+     * The offsets topic segment bytes should be kept relatively small to 
facilitate faster
+     * log compaction and faster offset loads.
+     */
+    public final int offsetsTopicSegmentBytes;
+
+    public GroupCoordinatorConfig(
+        int numThreads,
+        int consumerGroupSessionTimeoutMs,
+        int consumerGroupHeartbeatIntervalMs,
+        int consumerGroupMaxSize,
+        List<PartitionAssignor> consumerGroupAssignors,
+        int offsetsTopicSegmentBytes
+    ) {
+        this.numThreads = numThreads;
+        this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
+        this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+        this.consumerGroupMaxSize = consumerGroupMaxSize;
+        this.consumerGroupAssignors = consumerGroupAssignors;
+        this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
new file mode 100644
index 00000000000..88a70aca925
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+    public static class Builder {
+        private final int nodeId;
+        private final GroupCoordinatorConfig config;
+        private PartitionWriter<Record> writer;
+        private CoordinatorLoader<Record> loader;
+
+        public Builder(
+            int nodeId,
+            GroupCoordinatorConfig config
+        ) {
+            this.nodeId = nodeId;
+            this.config = config;
+        }
+
+        public Builder withWriter(PartitionWriter<Record> writer) {
+            this.writer = writer;
+            return this;
+        }
+
+        public Builder withLoader(CoordinatorLoader<Record> loader) {
+            this.loader = loader;
+            return this;
+        }
+
+        public GroupCoordinatorService build() {
+            if (config == null)
+                throw new IllegalArgumentException("Config must be set.");
+            if (writer == null)
+                throw new IllegalArgumentException("Writer must be set.");
+            if (loader == null)
+                throw new IllegalArgumentException("Loader must be set.");
+
+            String logPrefix = String.format("GroupCoordinator id=%d ", 
nodeId);
+            LogContext logContext = new LogContext(String.format("[%s ]", 
logPrefix));
+
+            CoordinatorBuilderSupplier<ReplicatedGroupCoordinator, Record> 
supplier = () ->
+                new ReplicatedGroupCoordinator.Builder(config);
+
+            CoordinatorEventProcessor processor = new 
MultiThreadedEventProcessor(
+                logContext,
+                "group-coordinator-event-processor-",
+                config.numThreads
+            );
+
+            CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
+                new CoordinatorRuntime.Builder<ReplicatedGroupCoordinator, 
Record>()
+                    .withLogPrefix(logPrefix)
+                    .withLogContext(logContext)
+                    .withEventProcessor(processor)
+                    .withPartitionWriter(writer)
+                    .withLoader(loader)
+                    .withCoordinatorBuilderSupplier(supplier)
+                    .build();
+
+            return new GroupCoordinatorService(
+                logContext,
+                config,
+                runtime
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The group coordinator configurations.
+     */
+    private final GroupCoordinatorConfig config;
+
+    /**
+     * The coordinator runtime.
+     */
+    private final CoordinatorRuntime<ReplicatedGroupCoordinator, Record> 
runtime;
+
+    /**
+     * Boolean indicating whether the coordinator is active or not.
+     */
+    private final AtomicBoolean isActive = new AtomicBoolean(false);
+
+    /**
+     * The number of partitions of the __consumer_offsets topics. This is 
provided
+     * when the component is started.
+     */
+    private volatile int numPartitions = -1;
+
+    /**
+     *
+     * @param logContext
+     * @param config
+     * @param runtime
+     */
+    GroupCoordinatorService(
+        LogContext logContext,
+        GroupCoordinatorConfig config,
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime
+    ) {
+        this.log = logContext.logger(CoordinatorLoader.class);
+        this.config = config;
+        this.runtime = runtime;
+    }
+
+    /**
+     * Throws CoordinatorNotAvailableException if the not active.
+     */
+    private void throwIfNotActive() {
+        if (!isActive.get()) {
+            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+        }
+    }
+
+    /**
+     * @return The topic partition for the given group.
+     */
+    private TopicPartition topicPartitionFor(
+        String groupId
+    ) {
+        return new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
partitionFor(groupId));
+    }
+
+    /**
+     * See {@link GroupCoordinator#partitionFor(String)}
+     */
+    @Override
+    public int partitionFor(
+        String groupId
+    ) {
+        throwIfNotActive();
+        return Utils.abs(groupId.hashCode()) % numPartitions;
+    }
+
+    /**
+     * See {@link GroupCoordinator#consumerGroupHeartbeat(RequestContext, 
ConsumerGroupHeartbeatRequestData)}.
+     */
+    @Override
+    public CompletableFuture<ConsumerGroupHeartbeatResponseData> 
consumerGroupHeartbeat(
+        RequestContext context,
+        ConsumerGroupHeartbeatRequestData request
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return runtime.scheduleWriteOperation(
+            "consumer-group-heartbeat",
+            topicPartitionFor(request.groupId()),
+            coordinator -> coordinator.consumerGroupHeartbeat(context, request)
+        ).exceptionally(exception -> {
+            if (exception instanceof UnknownTopicOrPartitionException ||
+                exception instanceof NotEnoughReplicasException) {
+                return new ConsumerGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+            }
+
+            if (exception instanceof NotLeaderOrFollowerException ||
+                exception instanceof KafkaStorageException) {
+                return new ConsumerGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code());
+            }
+
+            if (exception instanceof RecordTooLargeException ||
+                exception instanceof RecordBatchTooLargeException ||
+                exception instanceof InvalidFetchSizeException) {
+                return new ConsumerGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+            }
+
+            return new ConsumerGroupHeartbeatResponseData()
+                .setErrorCode(Errors.forException(exception).code())
+                .setErrorMessage(exception.getMessage());
+        });
+    }
+
+    /**
+     * See {@link GroupCoordinator#joinGroup(RequestContext, 
JoinGroupRequestData, BufferSupplier)}.
+     */
+    @Override
+    public CompletableFuture<JoinGroupResponseData> joinGroup(
+        RequestContext context,
+        JoinGroupRequestData request,
+        BufferSupplier bufferSupplier
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#syncGroup(RequestContext, 
SyncGroupRequestData, BufferSupplier)}.
+     */
+    @Override
+    public CompletableFuture<SyncGroupResponseData> syncGroup(
+        RequestContext context,
+        SyncGroupRequestData request,
+        BufferSupplier bufferSupplier
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#heartbeat(RequestContext, 
HeartbeatRequestData)}.
+     */
+    @Override
+    public CompletableFuture<HeartbeatResponseData> heartbeat(
+        RequestContext context,
+        HeartbeatRequestData request
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#leaveGroup(RequestContext, 
LeaveGroupRequestData)}.
+     */
+    @Override
+    public CompletableFuture<LeaveGroupResponseData> leaveGroup(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#listGroups(RequestContext, 
ListGroupsRequestData)}.
+     */
+    @Override
+    public CompletableFuture<ListGroupsResponseData> listGroups(
+        RequestContext context,
+        ListGroupsRequestData request
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#describeGroups(RequestContext, List)}.
+     */
+    @Override
+    public CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> 
describeGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#deleteGroups(RequestContext, List, 
BufferSupplier)}.
+     */
+    @Override
+    public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
deleteGroups(
+        RequestContext context,
+        List<String> groupIds,
+        BufferSupplier bufferSupplier
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#fetchOffsets(RequestContext, String, List, 
boolean)}.
+     */
+    @Override
+    public 
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> 
fetchOffsets(
+        RequestContext context,
+        String groupId,
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+        boolean requireStable
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#fetchAllOffsets(RequestContext, String, 
boolean)}.
+     */
+    @Override
+    public 
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> 
fetchAllOffsets(
+        RequestContext context,
+        String groupId,
+        boolean requireStable
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#commitOffsets(RequestContext, 
OffsetCommitRequestData, BufferSupplier)}.
+     */
+    @Override
+    public CompletableFuture<OffsetCommitResponseData> commitOffsets(
+        RequestContext context,
+        OffsetCommitRequestData request,
+        BufferSupplier bufferSupplier
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#commitTransactionalOffsets(RequestContext, 
TxnOffsetCommitRequestData, BufferSupplier)}.
+     */
+    @Override
+    public CompletableFuture<TxnOffsetCommitResponseData> 
commitTransactionalOffsets(
+        RequestContext context,
+        TxnOffsetCommitRequestData request,
+        BufferSupplier bufferSupplier
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#deleteOffsets(RequestContext, 
OffsetDeleteRequestData, BufferSupplier)}.
+     */
+    @Override
+    public CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
+        RequestContext context,
+        OffsetDeleteRequestData request,
+        BufferSupplier bufferSupplier
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+            "This API is not implemented yet."
+        ));
+    }
+
+    /**
+     * See {@link GroupCoordinator#onTransactionCompleted(long, Iterable, 
TransactionResult)}.
+     */
+    @Override
+    public void onTransactionCompleted(
+        long producerId,
+        Iterable<TopicPartition> partitions,
+        TransactionResult transactionResult
+    ) {
+        throwIfNotActive();
+    }
+
+    /**
+     * See {@link GroupCoordinator#onPartitionsDeleted(List, BufferSupplier)}.
+     */
+    @Override
+    public void onPartitionsDeleted(
+        List<TopicPartition> topicPartitions,
+        BufferSupplier bufferSupplier
+    ) {
+        throwIfNotActive();
+    }
+
+    /**
+     * See {@link GroupCoordinator#onElection(int, int)}.
+     */
+    @Override
+    public void onElection(
+        int groupMetadataPartitionIndex,
+        int groupMetadataPartitionLeaderEpoch
+    ) {
+        throwIfNotActive();
+        runtime.scheduleLoadOperation(
+            new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataPartitionIndex),
+            groupMetadataPartitionLeaderEpoch
+        );
+    }
+
+    /**
+     * See {@link GroupCoordinator#onResignation(int, OptionalInt)}.
+     */
+    @Override
+    public void onResignation(
+        int groupMetadataPartitionIndex,
+        OptionalInt groupMetadataPartitionLeaderEpoch
+    ) {
+        throwIfNotActive();
+        if (!groupMetadataPartitionLeaderEpoch.isPresent()) {
+            throw new IllegalArgumentException("The leader epoch should always 
be provided in KRaft.");
+        }
+        runtime.scheduleUnloadOperation(
+            new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataPartitionIndex),
+            groupMetadataPartitionLeaderEpoch.getAsInt()
+        );
+    }
+
+    /**
+     * See {@link GroupCoordinator#onNewMetadataImage(MetadataImage, 
MetadataDelta)}.
+     */
+    @Override
+    public void onNewMetadataImage(
+        MetadataImage newImage,
+        MetadataDelta delta
+    ) {
+        throwIfNotActive();
+    }
+
+    /**
+     * See {@link GroupCoordinator#groupMetadataTopicConfigs()}.
+     */
+    @Override
+    public Properties groupMetadataTopicConfigs() {
+        Properties properties = new Properties();
+        properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+        properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.PRODUCER.name);
+        properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
String.valueOf(config.offsetsTopicSegmentBytes));
+        return properties;
+    }
+
+    /**
+     * See {@link GroupCoordinator#startup(IntSupplier)}.
+     */
+    @Override
+    public void startup(
+        IntSupplier groupMetadataTopicPartitionCount
+    ) {
+        if (!isActive.compareAndSet(false, true)) {
+            log.warn("Group coordinator is already running.");
+            return;
+        }
+
+        log.info("Starting up.");
+        numPartitions = groupMetadataTopicPartitionCount.getAsInt();
+        isActive.set(true);
+        log.info("Startup complete.");
+    }
+
+    /**
+     * See {@link GroupCoordinator#shutdown()}.
+     */
+    @Override
+    public void shutdown() {
+        if (!isActive.compareAndSet(true, false)) {
+            log.warn("Group coordinator is already shutting down.");
+            return;
+        }
+
+        log.info("Shutting down.");
+        isActive.set(false);
+        Utils.closeQuietly(runtime, "coordinator runtime");
+        log.info("Shutdown complete.");
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
new file mode 100644
index 00000000000..89d08d450a2
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.runtime.Coordinator;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+/**
+ * The group coordinator replicated state machine that manages the metadata of 
all generic and
+ * consumer groups. It holds the hard and the soft state of the groups. This 
class has two kinds
+ * of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *    mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *    handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class ReplicatedGroupCoordinator implements Coordinator<Record> {
+
+    public static class Builder implements 
CoordinatorBuilder<ReplicatedGroupCoordinator, Record> {
+        private final GroupCoordinatorConfig config;
+        private LogContext logContext;
+        private SnapshotRegistry snapshotRegistry;
+
+        public Builder(
+            GroupCoordinatorConfig config
+        ) {
+            this.config = config;
+        }
+
+        @Override
+        public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> 
withLogContext(
+            LogContext logContext
+        ) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        @Override
+        public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> 
withSnapshotRegistry(
+            SnapshotRegistry snapshotRegistry
+        ) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        @Override
+        public ReplicatedGroupCoordinator build() {
+            if (logContext == null) logContext = new LogContext();
+            if (config == null)
+                throw new IllegalArgumentException("Config must be set.");
+            if (snapshotRegistry == null)
+                throw new IllegalArgumentException("SnapshotRegistry must be 
set.");
+
+            return new ReplicatedGroupCoordinator(
+                new GroupMetadataManager.Builder()
+                    .withLogContext(logContext)
+                    .withSnapshotRegistry(snapshotRegistry)
+                    .withAssignors(config.consumerGroupAssignors)
+                    .withConsumerGroupMaxSize(config.consumerGroupMaxSize)
+                    
.withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
+                    .build()
+            );
+        }
+    }
+
+    /**
+     * The group metadata manager.
+     */
+    private final GroupMetadataManager groupMetadataManager;
+
+    /**
+     * Constructor.
+     *
+     * @param groupMetadataManager The group metadata manager.
+     */
+    ReplicatedGroupCoordinator(
+        GroupMetadataManager groupMetadataManager
+    ) {
+        this.groupMetadataManager = groupMetadataManager;
+    }
+
+    /**
+     * Handles a ConsumerGroupHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual ConsumerGroupHeartbeat request.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupHeartbeat(
+        RequestContext context,
+        ConsumerGroupHeartbeatRequestData request
+    ) {
+        return groupMetadataManager.consumerGroupHeartbeat(context, request);
+    }
+
+    /**
+     * @return The ApiMessage or null.
+     */
+    private ApiMessage messageOrNull(ApiMessageAndVersion 
apiMessageAndVersion) {
+        if (apiMessageAndVersion == null) {
+            return null;
+        } else {
+            return apiMessageAndVersion.message();
+        }
+    }
+
+    /**
+     * Replays the Record to update the hard state of the group coordinator.
+
+     * @param record The record to apply to the state machine.
+     * @throws RuntimeException
+     */
+    @Override
+    public void replay(Record record) throws RuntimeException {
+        ApiMessageAndVersion key = record.key();
+        ApiMessageAndVersion value = record.value();
+
+        switch (key.version()) {
+            case 3:
+                groupMetadataManager.replay(
+                    (ConsumerGroupMetadataKey) key.message(),
+                    (ConsumerGroupMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case 4:
+                groupMetadataManager.replay(
+                    (ConsumerGroupPartitionMetadataKey) key.message(),
+                    (ConsumerGroupPartitionMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case 5:
+                groupMetadataManager.replay(
+                    (ConsumerGroupMemberMetadataKey) key.message(),
+                    (ConsumerGroupMemberMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case 6:
+                groupMetadataManager.replay(
+                    (ConsumerGroupTargetAssignmentMetadataKey) key.message(),
+                    (ConsumerGroupTargetAssignmentMetadataValue) 
messageOrNull(value)
+                );
+                break;
+
+            case 7:
+                groupMetadataManager.replay(
+                    (ConsumerGroupTargetAssignmentMemberKey) key.message(),
+                    (ConsumerGroupTargetAssignmentMemberValue) 
messageOrNull(value)
+                );
+                break;
+
+            case 8:
+                groupMetadataManager.replay(
+                    (ConsumerGroupCurrentMemberAssignmentKey) key.message(),
+                    (ConsumerGroupCurrentMemberAssignmentValue) 
messageOrNull(value)
+                );
+                break;
+
+            default:
+                throw new IllegalStateException("Received an unknown record 
type " + key.version()
+                    + " in " + record);
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
index 0eb2499520a..5ab88a0efa6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
@@ -26,11 +26,11 @@ public interface Coordinator<U> extends 
CoordinatorPlayback<U> {
      * The coordinator has been loaded. This is used to apply any
      * post loading operations (e.g. registering timers).
      */
-    default void onLoaded() {};
+    default void onLoaded() {}
 
     /**
      * The coordinator has been unloaded. This is used to apply
      * any post unloading operations.
      */
-    default void onUnloaded() {};
+    default void onUnloaded() {}
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
index 4d9febc14b7..6dd2288928a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group.runtime;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
 /**
@@ -24,7 +25,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
  * @param <S> The type of the coordinator.
  * @param <U> The record type.
  */
-interface CoordinatorBuilder<S extends Coordinator<U>, U> {
+public interface CoordinatorBuilder<S extends Coordinator<U>, U> {
 
     /**
      * Sets the snapshot registry used to back all the timeline
@@ -38,6 +39,17 @@ interface CoordinatorBuilder<S extends Coordinator<U>, U> {
         SnapshotRegistry snapshotRegistry
     );
 
+    /**
+     * Sets the log context.
+     *
+     * @param logContext The log context.
+     *
+     * @return The builder.
+     */
+    CoordinatorBuilder<S, U> withLogContext(
+        LogContext logContext
+    );
+
     /**
      * @return The built coordinator.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
index 35048177961..98b7c54fca8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
@@ -22,7 +22,7 @@ package org.apache.kafka.coordinator.group.runtime;
  * @param <S> The type of the coordinator.
  * @param <U> The record type.
  */
-interface CoordinatorBuilderSupplier<S extends Coordinator<U>, U> {
+public interface CoordinatorBuilderSupplier<S extends Coordinator<U>, U> {
     /**
      * @return A {@link CoordinatorBuilder}.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
index 0bef7fbc580..c5de10a313c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
  *
  * @param <U> The type of the record.
  */
-interface CoordinatorLoader<U> {
+public interface CoordinatorLoader<U> {
 
     /**
      * Loads the coordinator by reading all the records from the TopicPartition
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 4a2c0546f12..51773eb402a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @param <S> The type of the state machine.
  * @param <U> The type of the record.
  */
-public class CoordinatorRuntime<S extends Coordinator<U>, U> {
+public class CoordinatorRuntime<S extends Coordinator<U>, U> implements 
AutoCloseable {
 
     /**
      * Builder to create a CoordinatorRuntime.
@@ -69,12 +69,18 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
      * @param <U> The type of the record.
      */
     public static class Builder<S extends Coordinator<U>, U> {
+        private String logPrefix;
         private LogContext logContext;
         private CoordinatorEventProcessor eventProcessor;
         private PartitionWriter<U> partitionWriter;
         private CoordinatorLoader<U> loader;
         private CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier;
 
+        public Builder<S, U> withLogPrefix(String logPrefix) {
+            this.logPrefix = logPrefix;
+            return this;
+        }
+
         public Builder<S, U> withLogContext(LogContext logContext) {
             this.logContext = logContext;
             return this;
@@ -101,8 +107,10 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
         }
 
         public CoordinatorRuntime<S, U> build() {
+            if (logPrefix == null)
+                logPrefix = "";
             if (logContext == null)
-                logContext = new LogContext();
+                logContext = new LogContext(logPrefix);
             if (eventProcessor == null)
                 throw new IllegalArgumentException("Event processor must be 
set.");
             if (partitionWriter == null)
@@ -113,6 +121,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
                 throw new IllegalArgumentException("State machine supplier 
must be set.");
 
             return new CoordinatorRuntime<>(
+                logPrefix,
                 logContext,
                 eventProcessor,
                 partitionWriter,
@@ -188,6 +197,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
          */
         final TopicPartition tp;
 
+        /**
+         * The log context.
+         */
+        final LogContext logContext;
+
         /**
          * The snapshot registry backing the coordinator.
          */
@@ -235,6 +249,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
             TopicPartition tp
         ) {
             this.tp = tp;
+            this.logContext = new LogContext(String.format("[%s topic=%s 
partition=%d] ",
+                logPrefix,
+                tp.topic(),
+                tp.partition()
+            ));
             this.snapshotRegistry = new SnapshotRegistry(logContext);
             this.deferredEventQueue = new DeferredEventQueue(logContext);
             this.state = CoordinatorState.INITIAL;
@@ -321,6 +340,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
                     state = CoordinatorState.LOADING;
                     coordinator = coordinatorBuilderSupplier
                         .get()
+                        .withLogContext(logContext)
                         .withSnapshotRegistry(snapshotRegistry)
                         .build();
                     break;
@@ -735,6 +755,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
         }
     }
 
+    /**
+     * The log prefix.
+     */
+    private final String logPrefix;
+
     /**
      * The log context.
      */
@@ -785,6 +810,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
     /**
      * Constructor.
      *
+     * @param logPrefix                     The log prefix.
      * @param logContext                    The log context.
      * @param processor                     The event processor.
      * @param partitionWriter               The partition writer.
@@ -792,12 +818,14 @@ public class CoordinatorRuntime<S extends Coordinator<U>, 
U> {
      * @param coordinatorBuilderSupplier    The coordinator builder.
      */
     private CoordinatorRuntime(
+        String logPrefix,
         LogContext logContext,
         CoordinatorEventProcessor processor,
         PartitionWriter<U> partitionWriter,
         CoordinatorLoader<U> loader,
         CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier
     ) {
+        this.logPrefix = logPrefix;
         this.logContext = logContext;
         this.log = logContext.logger(CoordinatorRuntime.class);
         this.coordinators = new ConcurrentHashMap<>();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
new file mode 100644
index 00000000000..e54b6229c37
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class GroupCoordinatorConfigTest {
+    @Test
+    public void testConfigs() {
+        PartitionAssignor assignor = new RangeAssignor();
+        GroupCoordinatorConfig config = new GroupCoordinatorConfig(
+            10,
+            30,
+            10,
+            55,
+            Collections.singletonList(assignor),
+            2222
+        );
+
+        assertEquals(10, config.numThreads);
+        assertEquals(30, config.consumerGroupSessionTimeoutMs);
+        assertEquals(10, config.consumerGroupHeartbeatIntervalMs);
+        assertEquals(55, config.consumerGroupMaxSize);
+        assertEquals(Collections.singletonList(assignor), 
config.consumerGroupAssignors);
+        assertEquals(2222, config.offsetsTopicSegmentBytes);
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
new file mode 100644
index 00000000000..29851e4db37
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentMatchers;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GroupCoordinatorServiceTest {
+
+    @SuppressWarnings("unchecked")
+    private CoordinatorRuntime<ReplicatedGroupCoordinator, Record> 
mockRuntime() {
+        return (CoordinatorRuntime<ReplicatedGroupCoordinator, Record>) 
mock(CoordinatorRuntime.class);
+    }
+
+    private GroupCoordinatorConfig createConfig() {
+        return new GroupCoordinatorConfig(
+            1,
+            45,
+            5,
+            Integer.MAX_VALUE,
+            Collections.singletonList(new RangeAssignor()),
+            1000
+        );
+    }
+
+    @Test
+    public void testStartupShutdown() throws Exception {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        service.startup(() -> 1);
+        service.shutdown();
+
+        verify(runtime, times(1)).close();
+    }
+
+    @Test
+    public void testConsumerGroupHeartbeatWhenNotStarted() {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId("foo");
+
+        assertFutureThrows(
+            service.consumerGroupHeartbeat(
+                requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+                request
+            ),
+            CoordinatorNotAvailableException.class
+        );
+    }
+
+    @Test
+    public void testConsumerGroupHeartbeat() throws ExecutionException, 
InterruptedException, TimeoutException {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("consumer-group-heartbeat"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new ConsumerGroupHeartbeatResponseData()
+        ));
+
+        CompletableFuture<ConsumerGroupHeartbeatResponseData> future = 
service.consumerGroupHeartbeat(
+            requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+            request
+        );
+
+        assertEquals(new ConsumerGroupHeartbeatResponseData(), future.get(5, 
TimeUnit.SECONDS));
+    }
+
+    private static Stream<Arguments> 
testConsumerGroupHeartbeatWithExceptionSource() {
+        return Stream.of(
+            Arguments.arguments(new UnknownTopicOrPartitionException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotEnoughReplicasException(), 
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+            Arguments.arguments(new NotLeaderOrFollowerException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new KafkaStorageException(), 
Errors.NOT_COORDINATOR.code(), null),
+            Arguments.arguments(new RecordTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new RecordBatchTooLargeException(), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidFetchSizeException(""), 
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+            Arguments.arguments(new InvalidRequestException("Invalid"), 
Errors.INVALID_REQUEST.code(), "Invalid")
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
+    public void testConsumerGroupHeartbeatWithException(
+        Throwable exception,
+        short expectedErrorCode,
+        String expectedErrorMessage
+    ) throws ExecutionException, InterruptedException, TimeoutException {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("consumer-group-heartbeat"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(exception));
+
+        CompletableFuture<ConsumerGroupHeartbeatResponseData> future = 
service.consumerGroupHeartbeat(
+            requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+            request
+        );
+
+        assertEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setErrorCode(expectedErrorCode)
+                .setErrorMessage(expectedErrorMessage),
+            future.get(5, TimeUnit.SECONDS)
+        );
+    }
+
+    @Test
+    public void testPartitionFor() {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        assertThrows(CoordinatorNotAvailableException.class,
+            () -> service.partitionFor("foo"));
+
+        service.startup(() -> 10);
+
+        assertEquals(Utils.abs("foo".hashCode()) % 10, 
service.partitionFor("foo"));
+    }
+
+    @Test
+    public void testGroupMetadataTopicConfigs() {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        Properties expectedProperties = new Properties();
+        expectedProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+        expectedProperties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.PRODUCER.name);
+        expectedProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000");
+
+        assertEquals(expectedProperties, service.groupMetadataTopicConfigs());
+    }
+
+    @Test
+    public void testOnElection() {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        assertThrows(CoordinatorNotAvailableException.class,
+            () -> service.onElection(5, 10));
+
+        service.startup(() -> 1);
+        service.onElection(5, 10);
+
+        verify(runtime, times(1)).scheduleLoadOperation(
+            new TopicPartition("__consumer_offsets", 5),
+            10
+        );
+    }
+
+    @Test
+    public void testOnResignation() {
+        CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        assertThrows(CoordinatorNotAvailableException.class,
+            () -> service.onResignation(5, OptionalInt.of(10)));
+
+        service.startup(() -> 1);
+        service.onResignation(5, OptionalInt.of(10));
+
+        verify(runtime, times(1)).scheduleUnloadOperation(
+            new TopicPartition("__consumer_offsets", 5),
+            10
+        );
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java
new file mode 100644
index 00000000000..0e71b80838d
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReplicatedGroupCoordinatorTest {
+
+    @Test
+    public void testConsumerGroupHeartbeat() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        RequestContext context = 
requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
+        ConsumerGroupHeartbeatRequestData request = new 
ConsumerGroupHeartbeatRequestData();
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
new CoordinatorResult<>(
+            Collections.emptyList(),
+            new ConsumerGroupHeartbeatResponseData()
+        );
+
+        when(coordinator.consumerGroupHeartbeat(
+            context,
+            request
+        )).thenReturn(result);
+
+        assertEquals(result, coordinator.consumerGroupHeartbeat(context, 
request));
+    }
+
+    @Test
+    public void testReplayConsumerGroupMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
+        ConsumerGroupMetadataValue value = new ConsumerGroupMetadataValue();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 3),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);
+    }
+
+    @Test
+    public void testReplayConsumerGroupMetadataWithNullValue() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 3),
+            null
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, null);
+    }
+
+    @Test
+    public void testReplayConsumerGroupPartitionMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupPartitionMetadataKey key = new 
ConsumerGroupPartitionMetadataKey();
+        ConsumerGroupPartitionMetadataValue value = new 
ConsumerGroupPartitionMetadataValue();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 4),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);
+    }
+
+    @Test
+    public void testReplayConsumerGroupPartitionMetadataWithNullValue() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupPartitionMetadataKey key = new 
ConsumerGroupPartitionMetadataKey();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 4),
+            null
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, null);
+    }
+
+    @Test
+    public void testReplayConsumerGroupMemberMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupMemberMetadataKey key = new 
ConsumerGroupMemberMetadataKey();
+        ConsumerGroupMemberMetadataValue value = new 
ConsumerGroupMemberMetadataValue();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 5),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);
+    }
+
+    @Test
+    public void testReplayConsumerGroupMemberMetadataWithNullValue() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupMemberMetadataKey key = new 
ConsumerGroupMemberMetadataKey();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 5),
+            null
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, null);
+    }
+
+    @Test
+    public void testReplayConsumerGroupTargetAssignmentMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupTargetAssignmentMetadataKey key = new 
ConsumerGroupTargetAssignmentMetadataKey();
+        ConsumerGroupTargetAssignmentMetadataValue value = new 
ConsumerGroupTargetAssignmentMetadataValue();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 6),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);
+    }
+
+    @Test
+    public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() 
{
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupTargetAssignmentMetadataKey key = new 
ConsumerGroupTargetAssignmentMetadataKey();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 6),
+            null
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, null);
+    }
+
+    @Test
+    public void testReplayConsumerGroupTargetAssignmentMember() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupTargetAssignmentMemberKey key = new 
ConsumerGroupTargetAssignmentMemberKey();
+        ConsumerGroupTargetAssignmentMemberValue value = new 
ConsumerGroupTargetAssignmentMemberValue();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 7),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupTargetAssignmentMemberKey key = new 
ConsumerGroupTargetAssignmentMemberKey();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 7),
+            null
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, null);
+    }
+
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignment() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupCurrentMemberAssignmentKey key = new 
ConsumerGroupCurrentMemberAssignmentKey();
+        ConsumerGroupCurrentMemberAssignmentValue value = new 
ConsumerGroupCurrentMemberAssignmentValue();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 8),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);
+    }
+
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupCurrentMemberAssignmentKey key = new 
ConsumerGroupCurrentMemberAssignmentKey();
+
+        coordinator.replay(new Record(
+            new ApiMessageAndVersion(key, (short) 8),
+            null
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, null);
+    }
+
+    @Test
+    public void testReplayKeyCannotBeNull() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        assertThrows(NullPointerException.class, () -> coordinator.replay(new 
Record(null, null)));
+    }
+
+    @Test
+    public void testReplayWithUnsupportedVersion() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        ReplicatedGroupCoordinator coordinator = new 
ReplicatedGroupCoordinator(
+            groupMetadataManager
+        );
+
+        ConsumerGroupCurrentMemberAssignmentKey key = new 
ConsumerGroupCurrentMemberAssignmentKey();
+        ConsumerGroupCurrentMemberAssignmentValue value = new 
ConsumerGroupCurrentMemberAssignmentValue();
+
+        assertThrows(IllegalStateException.class, () -> coordinator.replay(new 
Record(
+            new ApiMessageAndVersion(key, (short) 255),
+            new ApiMessageAndVersion(value, (short) 0)
+        )));
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/TestUtil.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/TestUtil.java
new file mode 100644
index 00000000000..7c36c2bf216
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/TestUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+
+public class TestUtil {
+    public static RequestContext requestContext(
+        ApiKeys apiKey
+    ) {
+        return new RequestContext(
+            new RequestHeader(
+                apiKey,
+                apiKey.latestVersion(),
+                "client",
+                0
+            ),
+            "1",
+            InetAddress.getLoopbackAddress(),
+            KafkaPrincipal.ANONYMOUS,
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+            SecurityProtocol.PLAINTEXT,
+            ClientInformation.EMPTY,
+            false
+        );
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 76181fc409e..201e11c0005 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.runtime;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashSet;
 import org.junit.jupiter.api.Test;
@@ -151,6 +152,13 @@ public class CoordinatorRuntimeTest {
             return this;
         }
 
+        @Override
+        public CoordinatorBuilder<MockCoordinator, String> withLogContext(
+            LogContext logContext
+        ) {
+            return this;
+        }
+
         @Override
         public MockCoordinator build() {
             return new 
MockCoordinator(Objects.requireNonNull(this.snapshotRegistry));
@@ -184,6 +192,7 @@ public class CoordinatorRuntimeTest {
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -235,6 +244,7 @@ public class CoordinatorRuntimeTest {
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -279,6 +289,7 @@ public class CoordinatorRuntimeTest {
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -321,6 +332,7 @@ public class CoordinatorRuntimeTest {
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -380,6 +392,7 @@ public class CoordinatorRuntimeTest {
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
 
@@ -421,6 +434,7 @@ public class CoordinatorRuntimeTest {
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
 

Reply via email to