This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a81486e4f82 KAFKA-14462; [18/N] Add GroupCoordinatorService (#13812)
a81486e4f82 is described below
commit a81486e4f822b646039586ed0f70f9d75ec3ae36
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 22 09:06:10 2023 +0200
KAFKA-14462; [18/N] Add GroupCoordinatorService (#13812)
This patch introduces the GroupCoordinatorService. This is the new
(incomplete) implementation of the group coordinator based on the coordinator
runtime introduced in https://github.com/apache/kafka/pull/13795.
Reviewers: Divij Vaidya <[email protected]>, Justine Olshan
<[email protected]>
---
checkstyle/import-control.xml | 3 +
.../coordinator/group/GroupCoordinatorConfig.java | 74 +++
.../coordinator/group/GroupCoordinatorService.java | 581 +++++++++++++++++++++
.../group/ReplicatedGroupCoordinator.java | 203 +++++++
.../coordinator/group/runtime/Coordinator.java | 4 +-
.../group/runtime/CoordinatorBuilder.java | 14 +-
.../group/runtime/CoordinatorBuilderSupplier.java | 2 +-
.../group/runtime/CoordinatorLoader.java | 2 +-
.../group/runtime/CoordinatorRuntime.java | 32 +-
.../group/GroupCoordinatorConfigTest.java | 47 ++
.../group/GroupCoordinatorServiceTest.java | 274 ++++++++++
.../group/ReplicatedGroupCoordinatorTest.java | 308 +++++++++++
.../apache/kafka/coordinator/group/TestUtil.java | 49 ++
.../group/runtime/CoordinatorRuntimeTest.java | 14 +
14 files changed, 1600 insertions(+), 7 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4de0671d77a..ee8355fef12 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -223,6 +223,8 @@
<subpackage name="group">
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.common.annotation" />
+ <allow pkg="org.apache.kafka.common.config" />
+ <allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.network" />
@@ -232,6 +234,7 @@
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.server.common"/>
+ <allow pkg="org.apache.kafka.server.record"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
new file mode 100644
index 00000000000..f4b23a429fa
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+
+import java.util.List;
+
+/**
+ * The group coordinator configurations.
+ */
+public class GroupCoordinatorConfig {
+
+ /**
+ * The number of threads or event loops running.
+ */
+ public final int numThreads;
+
+ /**
+ * The consumer group session timeout in milliseconds.
+ */
+ public final int consumerGroupSessionTimeoutMs;
+
+ /**
+ * The consumer group heartbeat interval in milliseconds.
+ */
+ public final int consumerGroupHeartbeatIntervalMs;
+
+ /**
+ * The consumer group maximum size.
+ */
+ public final int consumerGroupMaxSize;
+
+ /**
+ * The consumer group assignors.
+ */
+ public final List<PartitionAssignor> consumerGroupAssignors;
+
+ /**
+ * The offsets topic segment bytes should be kept relatively small to
facilitate faster
+ * log compaction and faster offset loads.
+ */
+ public final int offsetsTopicSegmentBytes;
+
+ public GroupCoordinatorConfig(
+ int numThreads,
+ int consumerGroupSessionTimeoutMs,
+ int consumerGroupHeartbeatIntervalMs,
+ int consumerGroupMaxSize,
+ List<PartitionAssignor> consumerGroupAssignors,
+ int offsetsTopicSegmentBytes
+ ) {
+ this.numThreads = numThreads;
+ this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
+ this.consumerGroupHeartbeatIntervalMs =
consumerGroupHeartbeatIntervalMs;
+ this.consumerGroupMaxSize = consumerGroupMaxSize;
+ this.consumerGroupAssignors = consumerGroupAssignors;
+ this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
new file mode 100644
index 00000000000..88a70aca925
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+ public static class Builder {
+ private final int nodeId;
+ private final GroupCoordinatorConfig config;
+ private PartitionWriter<Record> writer;
+ private CoordinatorLoader<Record> loader;
+
+ public Builder(
+ int nodeId,
+ GroupCoordinatorConfig config
+ ) {
+ this.nodeId = nodeId;
+ this.config = config;
+ }
+
+ public Builder withWriter(PartitionWriter<Record> writer) {
+ this.writer = writer;
+ return this;
+ }
+
+ public Builder withLoader(CoordinatorLoader<Record> loader) {
+ this.loader = loader;
+ return this;
+ }
+
+ public GroupCoordinatorService build() {
+ if (config == null)
+ throw new IllegalArgumentException("Config must be set.");
+ if (writer == null)
+ throw new IllegalArgumentException("Writer must be set.");
+ if (loader == null)
+ throw new IllegalArgumentException("Loader must be set.");
+
+ String logPrefix = String.format("GroupCoordinator id=%d ",
nodeId);
+ LogContext logContext = new LogContext(String.format("[%s ]",
logPrefix));
+
+ CoordinatorBuilderSupplier<ReplicatedGroupCoordinator, Record>
supplier = () ->
+ new ReplicatedGroupCoordinator.Builder(config);
+
+ CoordinatorEventProcessor processor = new
MultiThreadedEventProcessor(
+ logContext,
+ "group-coordinator-event-processor-",
+ config.numThreads
+ );
+
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
+ new CoordinatorRuntime.Builder<ReplicatedGroupCoordinator,
Record>()
+ .withLogPrefix(logPrefix)
+ .withLogContext(logContext)
+ .withEventProcessor(processor)
+ .withPartitionWriter(writer)
+ .withLoader(loader)
+ .withCoordinatorBuilderSupplier(supplier)
+ .build();
+
+ return new GroupCoordinatorService(
+ logContext,
+ config,
+ runtime
+ );
+ }
+ }
+
+ /**
+ * The logger.
+ */
+ private final Logger log;
+
+ /**
+ * The group coordinator configurations.
+ */
+ private final GroupCoordinatorConfig config;
+
+ /**
+ * The coordinator runtime.
+ */
+ private final CoordinatorRuntime<ReplicatedGroupCoordinator, Record>
runtime;
+
+ /**
+ * Boolean indicating whether the coordinator is active or not.
+ */
+ private final AtomicBoolean isActive = new AtomicBoolean(false);
+
+ /**
+ * The number of partitions of the __consumer_offsets topics. This is
provided
+ * when the component is started.
+ */
+ private volatile int numPartitions = -1;
+
+ /**
+ *
+ * @param logContext
+ * @param config
+ * @param runtime
+ */
+ GroupCoordinatorService(
+ LogContext logContext,
+ GroupCoordinatorConfig config,
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime
+ ) {
+ this.log = logContext.logger(CoordinatorLoader.class);
+ this.config = config;
+ this.runtime = runtime;
+ }
+
+ /**
+ * Throws CoordinatorNotAvailableException if the not active.
+ */
+ private void throwIfNotActive() {
+ if (!isActive.get()) {
+ throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+ }
+ }
+
+ /**
+ * @return The topic partition for the given group.
+ */
+ private TopicPartition topicPartitionFor(
+ String groupId
+ ) {
+ return new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
partitionFor(groupId));
+ }
+
+ /**
+ * See {@link GroupCoordinator#partitionFor(String)}
+ */
+ @Override
+ public int partitionFor(
+ String groupId
+ ) {
+ throwIfNotActive();
+ return Utils.abs(groupId.hashCode()) % numPartitions;
+ }
+
+ /**
+ * See {@link GroupCoordinator#consumerGroupHeartbeat(RequestContext,
ConsumerGroupHeartbeatRequestData)}.
+ */
+ @Override
+ public CompletableFuture<ConsumerGroupHeartbeatResponseData>
consumerGroupHeartbeat(
+ RequestContext context,
+ ConsumerGroupHeartbeatRequestData request
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return runtime.scheduleWriteOperation(
+ "consumer-group-heartbeat",
+ topicPartitionFor(request.groupId()),
+ coordinator -> coordinator.consumerGroupHeartbeat(context, request)
+ ).exceptionally(exception -> {
+ if (exception instanceof UnknownTopicOrPartitionException ||
+ exception instanceof NotEnoughReplicasException) {
+ return new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+ }
+
+ if (exception instanceof NotLeaderOrFollowerException ||
+ exception instanceof KafkaStorageException) {
+ return new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NOT_COORDINATOR.code());
+ }
+
+ if (exception instanceof RecordTooLargeException ||
+ exception instanceof RecordBatchTooLargeException ||
+ exception instanceof InvalidFetchSizeException) {
+ return new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+ }
+
+ return new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.forException(exception).code())
+ .setErrorMessage(exception.getMessage());
+ });
+ }
+
+ /**
+ * See {@link GroupCoordinator#joinGroup(RequestContext,
JoinGroupRequestData, BufferSupplier)}.
+ */
+ @Override
+ public CompletableFuture<JoinGroupResponseData> joinGroup(
+ RequestContext context,
+ JoinGroupRequestData request,
+ BufferSupplier bufferSupplier
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#syncGroup(RequestContext,
SyncGroupRequestData, BufferSupplier)}.
+ */
+ @Override
+ public CompletableFuture<SyncGroupResponseData> syncGroup(
+ RequestContext context,
+ SyncGroupRequestData request,
+ BufferSupplier bufferSupplier
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#heartbeat(RequestContext,
HeartbeatRequestData)}.
+ */
+ @Override
+ public CompletableFuture<HeartbeatResponseData> heartbeat(
+ RequestContext context,
+ HeartbeatRequestData request
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#leaveGroup(RequestContext,
LeaveGroupRequestData)}.
+ */
+ @Override
+ public CompletableFuture<LeaveGroupResponseData> leaveGroup(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#listGroups(RequestContext,
ListGroupsRequestData)}.
+ */
+ @Override
+ public CompletableFuture<ListGroupsResponseData> listGroups(
+ RequestContext context,
+ ListGroupsRequestData request
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#describeGroups(RequestContext, List)}.
+ */
+ @Override
+ public CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
describeGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#deleteGroups(RequestContext, List,
BufferSupplier)}.
+ */
+ @Override
+ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
deleteGroups(
+ RequestContext context,
+ List<String> groupIds,
+ BufferSupplier bufferSupplier
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#fetchOffsets(RequestContext, String, List,
boolean)}.
+ */
+ @Override
+ public
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>>
fetchOffsets(
+ RequestContext context,
+ String groupId,
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+ boolean requireStable
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#fetchAllOffsets(RequestContext, String,
boolean)}.
+ */
+ @Override
+ public
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>>
fetchAllOffsets(
+ RequestContext context,
+ String groupId,
+ boolean requireStable
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#commitOffsets(RequestContext,
OffsetCommitRequestData, BufferSupplier)}.
+ */
+ @Override
+ public CompletableFuture<OffsetCommitResponseData> commitOffsets(
+ RequestContext context,
+ OffsetCommitRequestData request,
+ BufferSupplier bufferSupplier
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#commitTransactionalOffsets(RequestContext,
TxnOffsetCommitRequestData, BufferSupplier)}.
+ */
+ @Override
+ public CompletableFuture<TxnOffsetCommitResponseData>
commitTransactionalOffsets(
+ RequestContext context,
+ TxnOffsetCommitRequestData request,
+ BufferSupplier bufferSupplier
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#deleteOffsets(RequestContext,
OffsetDeleteRequestData, BufferSupplier)}.
+ */
+ @Override
+ public CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
+ RequestContext context,
+ OffsetDeleteRequestData request,
+ BufferSupplier bufferSupplier
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ "This API is not implemented yet."
+ ));
+ }
+
+ /**
+ * See {@link GroupCoordinator#onTransactionCompleted(long, Iterable,
TransactionResult)}.
+ */
+ @Override
+ public void onTransactionCompleted(
+ long producerId,
+ Iterable<TopicPartition> partitions,
+ TransactionResult transactionResult
+ ) {
+ throwIfNotActive();
+ }
+
+ /**
+ * See {@link GroupCoordinator#onPartitionsDeleted(List, BufferSupplier)}.
+ */
+ @Override
+ public void onPartitionsDeleted(
+ List<TopicPartition> topicPartitions,
+ BufferSupplier bufferSupplier
+ ) {
+ throwIfNotActive();
+ }
+
+ /**
+ * See {@link GroupCoordinator#onElection(int, int)}.
+ */
+ @Override
+ public void onElection(
+ int groupMetadataPartitionIndex,
+ int groupMetadataPartitionLeaderEpoch
+ ) {
+ throwIfNotActive();
+ runtime.scheduleLoadOperation(
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupMetadataPartitionIndex),
+ groupMetadataPartitionLeaderEpoch
+ );
+ }
+
+ /**
+ * See {@link GroupCoordinator#onResignation(int, OptionalInt)}.
+ */
+ @Override
+ public void onResignation(
+ int groupMetadataPartitionIndex,
+ OptionalInt groupMetadataPartitionLeaderEpoch
+ ) {
+ throwIfNotActive();
+ if (!groupMetadataPartitionLeaderEpoch.isPresent()) {
+ throw new IllegalArgumentException("The leader epoch should always
be provided in KRaft.");
+ }
+ runtime.scheduleUnloadOperation(
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupMetadataPartitionIndex),
+ groupMetadataPartitionLeaderEpoch.getAsInt()
+ );
+ }
+
+ /**
+ * See {@link GroupCoordinator#onNewMetadataImage(MetadataImage,
MetadataDelta)}.
+ */
+ @Override
+ public void onNewMetadataImage(
+ MetadataImage newImage,
+ MetadataDelta delta
+ ) {
+ throwIfNotActive();
+ }
+
+ /**
+ * See {@link GroupCoordinator#groupMetadataTopicConfigs()}.
+ */
+ @Override
+ public Properties groupMetadataTopicConfigs() {
+ Properties properties = new Properties();
+ properties.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
+ properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.PRODUCER.name);
+ properties.put(TopicConfig.SEGMENT_BYTES_CONFIG,
String.valueOf(config.offsetsTopicSegmentBytes));
+ return properties;
+ }
+
+ /**
+ * See {@link GroupCoordinator#startup(IntSupplier)}.
+ */
+ @Override
+ public void startup(
+ IntSupplier groupMetadataTopicPartitionCount
+ ) {
+ if (!isActive.compareAndSet(false, true)) {
+ log.warn("Group coordinator is already running.");
+ return;
+ }
+
+ log.info("Starting up.");
+ numPartitions = groupMetadataTopicPartitionCount.getAsInt();
+ isActive.set(true);
+ log.info("Startup complete.");
+ }
+
+ /**
+ * See {@link GroupCoordinator#shutdown()}.
+ */
+ @Override
+ public void shutdown() {
+ if (!isActive.compareAndSet(true, false)) {
+ log.warn("Group coordinator is already shutting down.");
+ return;
+ }
+
+ log.info("Shutting down.");
+ isActive.set(false);
+ Utils.closeQuietly(runtime, "coordinator runtime");
+ log.info("Shutdown complete.");
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
new file mode 100644
index 00000000000..89d08d450a2
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.runtime.Coordinator;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+/**
+ * The group coordinator replicated state machine that manages the metadata of
all generic and
+ * consumer groups. It holds the hard and the soft state of the groups. This
class has two kinds
+ * of methods:
+ * 1) The request handlers which handle the requests and generate a response
and records to
+ * mutate the hard state. Those records will be written by the runtime and
applied to the
+ * hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used
in the request
+ * handling as well as during the initial loading of the records from the
partitions.
+ */
+public class ReplicatedGroupCoordinator implements Coordinator<Record> {
+
+ public static class Builder implements
CoordinatorBuilder<ReplicatedGroupCoordinator, Record> {
+ private final GroupCoordinatorConfig config;
+ private LogContext logContext;
+ private SnapshotRegistry snapshotRegistry;
+
+ public Builder(
+ GroupCoordinatorConfig config
+ ) {
+ this.config = config;
+ }
+
+ @Override
+ public CoordinatorBuilder<ReplicatedGroupCoordinator, Record>
withLogContext(
+ LogContext logContext
+ ) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ @Override
+ public CoordinatorBuilder<ReplicatedGroupCoordinator, Record>
withSnapshotRegistry(
+ SnapshotRegistry snapshotRegistry
+ ) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ @Override
+ public ReplicatedGroupCoordinator build() {
+ if (logContext == null) logContext = new LogContext();
+ if (config == null)
+ throw new IllegalArgumentException("Config must be set.");
+ if (snapshotRegistry == null)
+ throw new IllegalArgumentException("SnapshotRegistry must be
set.");
+
+ return new ReplicatedGroupCoordinator(
+ new GroupMetadataManager.Builder()
+ .withLogContext(logContext)
+ .withSnapshotRegistry(snapshotRegistry)
+ .withAssignors(config.consumerGroupAssignors)
+ .withConsumerGroupMaxSize(config.consumerGroupMaxSize)
+
.withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
+ .build()
+ );
+ }
+ }
+
+ /**
+ * The group metadata manager.
+ */
+ private final GroupMetadataManager groupMetadataManager;
+
+ /**
+ * Constructor.
+ *
+ * @param groupMetadataManager The group metadata manager.
+ */
+ ReplicatedGroupCoordinator(
+ GroupMetadataManager groupMetadataManager
+ ) {
+ this.groupMetadataManager = groupMetadataManager;
+ }
+
+ /**
+ * Handles a ConsumerGroupHeartbeat request.
+ *
+ * @param context The request context.
+ * @param request The actual ConsumerGroupHeartbeat request.
+ *
+ * @return A Result containing the ConsumerGroupHeartbeat response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record>
consumerGroupHeartbeat(
+ RequestContext context,
+ ConsumerGroupHeartbeatRequestData request
+ ) {
+ return groupMetadataManager.consumerGroupHeartbeat(context, request);
+ }
+
+ /**
+ * @return The ApiMessage or null.
+ */
+ private ApiMessage messageOrNull(ApiMessageAndVersion
apiMessageAndVersion) {
+ if (apiMessageAndVersion == null) {
+ return null;
+ } else {
+ return apiMessageAndVersion.message();
+ }
+ }
+
+ /**
+ * Replays the Record to update the hard state of the group coordinator.
+
+ * @param record The record to apply to the state machine.
+ * @throws RuntimeException
+ */
+ @Override
+ public void replay(Record record) throws RuntimeException {
+ ApiMessageAndVersion key = record.key();
+ ApiMessageAndVersion value = record.value();
+
+ switch (key.version()) {
+ case 3:
+ groupMetadataManager.replay(
+ (ConsumerGroupMetadataKey) key.message(),
+ (ConsumerGroupMetadataValue) messageOrNull(value)
+ );
+ break;
+
+ case 4:
+ groupMetadataManager.replay(
+ (ConsumerGroupPartitionMetadataKey) key.message(),
+ (ConsumerGroupPartitionMetadataValue) messageOrNull(value)
+ );
+ break;
+
+ case 5:
+ groupMetadataManager.replay(
+ (ConsumerGroupMemberMetadataKey) key.message(),
+ (ConsumerGroupMemberMetadataValue) messageOrNull(value)
+ );
+ break;
+
+ case 6:
+ groupMetadataManager.replay(
+ (ConsumerGroupTargetAssignmentMetadataKey) key.message(),
+ (ConsumerGroupTargetAssignmentMetadataValue)
messageOrNull(value)
+ );
+ break;
+
+ case 7:
+ groupMetadataManager.replay(
+ (ConsumerGroupTargetAssignmentMemberKey) key.message(),
+ (ConsumerGroupTargetAssignmentMemberValue)
messageOrNull(value)
+ );
+ break;
+
+ case 8:
+ groupMetadataManager.replay(
+ (ConsumerGroupCurrentMemberAssignmentKey) key.message(),
+ (ConsumerGroupCurrentMemberAssignmentValue)
messageOrNull(value)
+ );
+ break;
+
+ default:
+ throw new IllegalStateException("Received an unknown record
type " + key.version()
+ + " in " + record);
+ }
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
index 0eb2499520a..5ab88a0efa6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
@@ -26,11 +26,11 @@ public interface Coordinator<U> extends
CoordinatorPlayback<U> {
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
*/
- default void onLoaded() {};
+ default void onLoaded() {}
/**
* The coordinator has been unloaded. This is used to apply
* any post unloading operations.
*/
- default void onUnloaded() {};
+ default void onUnloaded() {}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
index 4d9febc14b7..6dd2288928a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.runtime;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.timeline.SnapshotRegistry;
/**
@@ -24,7 +25,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
* @param <S> The type of the coordinator.
* @param <U> The record type.
*/
-interface CoordinatorBuilder<S extends Coordinator<U>, U> {
+public interface CoordinatorBuilder<S extends Coordinator<U>, U> {
/**
* Sets the snapshot registry used to back all the timeline
@@ -38,6 +39,17 @@ interface CoordinatorBuilder<S extends Coordinator<U>, U> {
SnapshotRegistry snapshotRegistry
);
+ /**
+ * Sets the log context.
+ *
+ * @param logContext The log context.
+ *
+ * @return The builder.
+ */
+ CoordinatorBuilder<S, U> withLogContext(
+ LogContext logContext
+ );
+
/**
* @return The built coordinator.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
index 35048177961..98b7c54fca8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java
@@ -22,7 +22,7 @@ package org.apache.kafka.coordinator.group.runtime;
* @param <S> The type of the coordinator.
* @param <U> The record type.
*/
-interface CoordinatorBuilderSupplier<S extends Coordinator<U>, U> {
+public interface CoordinatorBuilderSupplier<S extends Coordinator<U>, U> {
/**
* @return A {@link CoordinatorBuilder}.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
index 0bef7fbc580..c5de10a313c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
*
* @param <U> The type of the record.
*/
-interface CoordinatorLoader<U> {
+public interface CoordinatorLoader<U> {
/**
* Loads the coordinator by reading all the records from the TopicPartition
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 4a2c0546f12..51773eb402a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @param <S> The type of the state machine.
* @param <U> The type of the record.
*/
-public class CoordinatorRuntime<S extends Coordinator<U>, U> {
+public class CoordinatorRuntime<S extends Coordinator<U>, U> implements
AutoCloseable {
/**
* Builder to create a CoordinatorRuntime.
@@ -69,12 +69,18 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
* @param <U> The type of the record.
*/
public static class Builder<S extends Coordinator<U>, U> {
+ private String logPrefix;
private LogContext logContext;
private CoordinatorEventProcessor eventProcessor;
private PartitionWriter<U> partitionWriter;
private CoordinatorLoader<U> loader;
private CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier;
+ public Builder<S, U> withLogPrefix(String logPrefix) {
+ this.logPrefix = logPrefix;
+ return this;
+ }
+
public Builder<S, U> withLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
@@ -101,8 +107,10 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
}
public CoordinatorRuntime<S, U> build() {
+ if (logPrefix == null)
+ logPrefix = "";
if (logContext == null)
- logContext = new LogContext();
+ logContext = new LogContext(logPrefix);
if (eventProcessor == null)
throw new IllegalArgumentException("Event processor must be
set.");
if (partitionWriter == null)
@@ -113,6 +121,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
throw new IllegalArgumentException("State machine supplier
must be set.");
return new CoordinatorRuntime<>(
+ logPrefix,
logContext,
eventProcessor,
partitionWriter,
@@ -188,6 +197,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
*/
final TopicPartition tp;
+ /**
+ * The log context.
+ */
+ final LogContext logContext;
+
/**
* The snapshot registry backing the coordinator.
*/
@@ -235,6 +249,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
TopicPartition tp
) {
this.tp = tp;
+ this.logContext = new LogContext(String.format("[%s topic=%s
partition=%d] ",
+ logPrefix,
+ tp.topic(),
+ tp.partition()
+ ));
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.state = CoordinatorState.INITIAL;
@@ -321,6 +340,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
state = CoordinatorState.LOADING;
coordinator = coordinatorBuilderSupplier
.get()
+ .withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.build();
break;
@@ -735,6 +755,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
}
}
+ /**
+ * The log prefix.
+ */
+ private final String logPrefix;
+
/**
* The log context.
*/
@@ -785,6 +810,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
/**
* Constructor.
*
+ * @param logPrefix The log prefix.
* @param logContext The log context.
* @param processor The event processor.
* @param partitionWriter The partition writer.
@@ -792,12 +818,14 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> {
* @param coordinatorBuilderSupplier The coordinator builder.
*/
private CoordinatorRuntime(
+ String logPrefix,
LogContext logContext,
CoordinatorEventProcessor processor,
PartitionWriter<U> partitionWriter,
CoordinatorLoader<U> loader,
CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier
) {
+ this.logPrefix = logPrefix;
this.logContext = logContext;
this.log = logContext.logger(CoordinatorRuntime.class);
this.coordinators = new ConcurrentHashMap<>();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
new file mode 100644
index 00000000000..e54b6229c37
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class GroupCoordinatorConfigTest {
+ @Test
+ public void testConfigs() {
+ PartitionAssignor assignor = new RangeAssignor();
+ GroupCoordinatorConfig config = new GroupCoordinatorConfig(
+ 10,
+ 30,
+ 10,
+ 55,
+ Collections.singletonList(assignor),
+ 2222
+ );
+
+ assertEquals(10, config.numThreads);
+ assertEquals(30, config.consumerGroupSessionTimeoutMs);
+ assertEquals(10, config.consumerGroupHeartbeatIntervalMs);
+ assertEquals(55, config.consumerGroupMaxSize);
+ assertEquals(Collections.singletonList(assignor),
config.consumerGroupAssignors);
+ assertEquals(2222, config.offsetsTopicSegmentBytes);
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
new file mode 100644
index 00000000000..29851e4db37
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentMatchers;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GroupCoordinatorServiceTest {
+
+ @SuppressWarnings("unchecked")
+ private CoordinatorRuntime<ReplicatedGroupCoordinator, Record>
mockRuntime() {
+ return (CoordinatorRuntime<ReplicatedGroupCoordinator, Record>)
mock(CoordinatorRuntime.class);
+ }
+
+ private GroupCoordinatorConfig createConfig() {
+ return new GroupCoordinatorConfig(
+ 1,
+ 45,
+ 5,
+ Integer.MAX_VALUE,
+ Collections.singletonList(new RangeAssignor()),
+ 1000
+ );
+ }
+
+ @Test
+ public void testStartupShutdown() throws Exception {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ service.startup(() -> 1);
+ service.shutdown();
+
+ verify(runtime, times(1)).close();
+ }
+
+ @Test
+ public void testConsumerGroupHeartbeatWhenNotStarted() {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo");
+
+ assertFutureThrows(
+ service.consumerGroupHeartbeat(
+ requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+ request
+ ),
+ CoordinatorNotAvailableException.class
+ );
+ }
+
+ @Test
+ public void testConsumerGroupHeartbeat() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo");
+
+ service.startup(() -> 1);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("consumer-group-heartbeat"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ new ConsumerGroupHeartbeatResponseData()
+ ));
+
+ CompletableFuture<ConsumerGroupHeartbeatResponseData> future =
service.consumerGroupHeartbeat(
+ requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+ request
+ );
+
+ assertEquals(new ConsumerGroupHeartbeatResponseData(), future.get(5,
TimeUnit.SECONDS));
+ }
+
+ private static Stream<Arguments>
testConsumerGroupHeartbeatWithExceptionSource() {
+ return Stream.of(
+ Arguments.arguments(new UnknownTopicOrPartitionException(),
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new NotEnoughReplicasException(),
Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new NotLeaderOrFollowerException(),
Errors.NOT_COORDINATOR.code(), null),
+ Arguments.arguments(new KafkaStorageException(),
Errors.NOT_COORDINATOR.code(), null),
+ Arguments.arguments(new RecordTooLargeException(),
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new RecordBatchTooLargeException(),
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new InvalidFetchSizeException(""),
Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new InvalidRequestException("Invalid"),
Errors.INVALID_REQUEST.code(), "Invalid")
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
+ public void testConsumerGroupHeartbeatWithException(
+ Throwable exception,
+ short expectedErrorCode,
+ String expectedErrorMessage
+ ) throws ExecutionException, InterruptedException, TimeoutException {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo");
+
+ service.startup(() -> 1);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("consumer-group-heartbeat"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(exception));
+
+ CompletableFuture<ConsumerGroupHeartbeatResponseData> future =
service.consumerGroupHeartbeat(
+ requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+ request
+ );
+
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(expectedErrorCode)
+ .setErrorMessage(expectedErrorMessage),
+ future.get(5, TimeUnit.SECONDS)
+ );
+ }
+
+ @Test
+ public void testPartitionFor() {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ assertThrows(CoordinatorNotAvailableException.class,
+ () -> service.partitionFor("foo"));
+
+ service.startup(() -> 10);
+
+ assertEquals(Utils.abs("foo".hashCode()) % 10,
service.partitionFor("foo"));
+ }
+
+ @Test
+ public void testGroupMetadataTopicConfigs() {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ Properties expectedProperties = new Properties();
+ expectedProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
+ expectedProperties.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.PRODUCER.name);
+ expectedProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000");
+
+ assertEquals(expectedProperties, service.groupMetadataTopicConfigs());
+ }
+
+ @Test
+ public void testOnElection() {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ assertThrows(CoordinatorNotAvailableException.class,
+ () -> service.onElection(5, 10));
+
+ service.startup(() -> 1);
+ service.onElection(5, 10);
+
+ verify(runtime, times(1)).scheduleLoadOperation(
+ new TopicPartition("__consumer_offsets", 5),
+ 10
+ );
+ }
+
+ @Test
+ public void testOnResignation() {
+ CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ assertThrows(CoordinatorNotAvailableException.class,
+ () -> service.onResignation(5, OptionalInt.of(10)));
+
+ service.startup(() -> 1);
+ service.onResignation(5, OptionalInt.of(10));
+
+ verify(runtime, times(1)).scheduleUnloadOperation(
+ new TopicPartition("__consumer_offsets", 5),
+ 10
+ );
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java
new file mode 100644
index 00000000000..0e71b80838d
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReplicatedGroupCoordinatorTest {
+
+ @Test
+ public void testConsumerGroupHeartbeat() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ RequestContext context =
requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
+ ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData();
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
new CoordinatorResult<>(
+ Collections.emptyList(),
+ new ConsumerGroupHeartbeatResponseData()
+ );
+
+ when(coordinator.consumerGroupHeartbeat(
+ context,
+ request
+ )).thenReturn(result);
+
+ assertEquals(result, coordinator.consumerGroupHeartbeat(context,
request));
+ }
+
+ @Test
+ public void testReplayConsumerGroupMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
+ ConsumerGroupMetadataValue value = new ConsumerGroupMetadataValue();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 3),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, value);
+ }
+
+ @Test
+ public void testReplayConsumerGroupMetadataWithNullValue() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 3),
+ null
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, null);
+ }
+
+ @Test
+ public void testReplayConsumerGroupPartitionMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupPartitionMetadataKey key = new
ConsumerGroupPartitionMetadataKey();
+ ConsumerGroupPartitionMetadataValue value = new
ConsumerGroupPartitionMetadataValue();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 4),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, value);
+ }
+
+ @Test
+ public void testReplayConsumerGroupPartitionMetadataWithNullValue() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupPartitionMetadataKey key = new
ConsumerGroupPartitionMetadataKey();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 4),
+ null
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, null);
+ }
+
+ @Test
+ public void testReplayConsumerGroupMemberMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupMemberMetadataKey key = new
ConsumerGroupMemberMetadataKey();
+ ConsumerGroupMemberMetadataValue value = new
ConsumerGroupMemberMetadataValue();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 5),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, value);
+ }
+
+ @Test
+ public void testReplayConsumerGroupMemberMetadataWithNullValue() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupMemberMetadataKey key = new
ConsumerGroupMemberMetadataKey();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 5),
+ null
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, null);
+ }
+
+ @Test
+ public void testReplayConsumerGroupTargetAssignmentMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupTargetAssignmentMetadataKey key = new
ConsumerGroupTargetAssignmentMetadataKey();
+ ConsumerGroupTargetAssignmentMetadataValue value = new
ConsumerGroupTargetAssignmentMetadataValue();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 6),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, value);
+ }
+
+ @Test
+ public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue()
{
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupTargetAssignmentMetadataKey key = new
ConsumerGroupTargetAssignmentMetadataKey();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 6),
+ null
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, null);
+ }
+
+ @Test
+ public void testReplayConsumerGroupTargetAssignmentMember() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupTargetAssignmentMemberKey key = new
ConsumerGroupTargetAssignmentMemberKey();
+ ConsumerGroupTargetAssignmentMemberValue value = new
ConsumerGroupTargetAssignmentMemberValue();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 7),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, value);
+ }
+
+ @Test
+ public void
testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupTargetAssignmentMemberKey key = new
ConsumerGroupTargetAssignmentMemberKey();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 7),
+ null
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, null);
+ }
+
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignment() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupCurrentMemberAssignmentKey key = new
ConsumerGroupCurrentMemberAssignmentKey();
+ ConsumerGroupCurrentMemberAssignmentValue value = new
ConsumerGroupCurrentMemberAssignmentValue();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 8),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, value);
+ }
+
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupCurrentMemberAssignmentKey key = new
ConsumerGroupCurrentMemberAssignmentKey();
+
+ coordinator.replay(new Record(
+ new ApiMessageAndVersion(key, (short) 8),
+ null
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, null);
+ }
+
+ @Test
+ public void testReplayKeyCannotBeNull() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ assertThrows(NullPointerException.class, () -> coordinator.replay(new
Record(null, null)));
+ }
+
+ @Test
+ public void testReplayWithUnsupportedVersion() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ ReplicatedGroupCoordinator coordinator = new
ReplicatedGroupCoordinator(
+ groupMetadataManager
+ );
+
+ ConsumerGroupCurrentMemberAssignmentKey key = new
ConsumerGroupCurrentMemberAssignmentKey();
+ ConsumerGroupCurrentMemberAssignmentValue value = new
ConsumerGroupCurrentMemberAssignmentValue();
+
+ assertThrows(IllegalStateException.class, () -> coordinator.replay(new
Record(
+ new ApiMessageAndVersion(key, (short) 255),
+ new ApiMessageAndVersion(value, (short) 0)
+ )));
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/TestUtil.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/TestUtil.java
new file mode 100644
index 00000000000..7c36c2bf216
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/TestUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+
+public class TestUtil {
+ public static RequestContext requestContext(
+ ApiKeys apiKey
+ ) {
+ return new RequestContext(
+ new RequestHeader(
+ apiKey,
+ apiKey.latestVersion(),
+ "client",
+ 0
+ ),
+ "1",
+ InetAddress.getLoopbackAddress(),
+ KafkaPrincipal.ANONYMOUS,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT,
+ ClientInformation.EMPTY,
+ false
+ );
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 76181fc409e..201e11c0005 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Test;
@@ -151,6 +152,13 @@ public class CoordinatorRuntimeTest {
return this;
}
+ @Override
+ public CoordinatorBuilder<MockCoordinator, String> withLogContext(
+ LogContext logContext
+ ) {
+ return this;
+ }
+
@Override
public MockCoordinator build() {
return new
MockCoordinator(Objects.requireNonNull(this.snapshotRegistry));
@@ -184,6 +192,7 @@ public class CoordinatorRuntimeTest {
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -235,6 +244,7 @@ public class CoordinatorRuntimeTest {
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -279,6 +289,7 @@ public class CoordinatorRuntimeTest {
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -321,6 +332,7 @@ public class CoordinatorRuntimeTest {
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -380,6 +392,7 @@ public class CoordinatorRuntimeTest {
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@@ -421,6 +434,7 @@ public class CoordinatorRuntimeTest {
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);