This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e0a005dd47 KAFKA-14367; Add internal APIs to the new 
`GroupCoordinator` interface (#13112)
2e0a005dd47 is described below

commit 2e0a005dd47d922d66c1da9b20b6ab7f5d1be5a1
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 20 08:38:21 2023 +0100

    KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface 
(#13112)
    
    This patch migrates all the internal APIs of the current group coordinator 
to the new `GroupCoordinator` interface. It also makes the current 
implementation package private to ensure that it is not used anymore.
    
    Reviewers: Justine Olshan <[email protected]>
---
 build.gradle                                       |  1 +
 checkstyle/import-control-jmh-benchmarks.xml       |  1 +
 .../kafka/server/builders/KafkaApisBuilder.java    |  4 +-
 .../kafka/coordinator/group/GroupCoordinator.scala | 46 ++++++-----
 .../group/GroupCoordinatorAdapter.scala            | 78 +++++++++++++++++-
 .../coordinator/group/GroupMetadataManager.scala   | 25 +++---
 .../kafka/server/AutoTopicCreationManager.scala    |  4 +-
 .../src/main/scala/kafka/server/BrokerServer.scala | 11 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 42 +++++-----
 core/src/main/scala/kafka/server/KafkaBroker.scala |  2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala | 11 ++-
 .../scala/kafka/server/RequestHandlerHelper.scala  |  6 +-
 .../server/metadata/BrokerMetadataPublisher.scala  | 21 +++--
 .../server/DynamicBrokerReconfigurationTest.scala  |  2 +-
 ...ListenersWithSameSecurityProtocolBaseTest.scala |  2 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |  4 +-
 .../group/GroupMetadataManagerTest.scala           | 10 +--
 .../server/AutoTopicCreationManagerTest.scala      | 12 +--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 93 +++++++++++-----------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  4 +-
 .../kafka/coordinator/group/GroupCoordinator.java  | 90 +++++++++++++++++++++
 .../jmh/metadata/MetadataRequestBenchmark.java     |  2 +-
 22 files changed, 328 insertions(+), 143 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7c1eafdc0c7..e535d291269 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2428,6 +2428,7 @@ project(':jmh-benchmarks') {
     }
     implementation project(':server-common')
     implementation project(':clients')
+    implementation project(':group-coordinator')
     implementation project(':metadata')
     implementation project(':storage')
     implementation project(':streams')
diff --git a/checkstyle/import-control-jmh-benchmarks.xml 
b/checkstyle/import-control-jmh-benchmarks.xml
index 983c526eccc..1f58dbd8ee7 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -49,6 +49,7 @@
     <allow pkg="kafka.security.authorizer"/>
     <allow pkg="org.apache.kafka.server"/>
     <allow pkg="org.apache.kafka.clients"/>
+    <allow pkg="org.apache.kafka.coordinator.group"/>
     <allow pkg="org.apache.kafka.metadata"/>
     <allow pkg="org.apache.kafka.timeline" />
 
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 18cd42c77cb..d5403e0c57a 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -17,8 +17,6 @@
 
 package kafka.server.builders;
 
-import kafka.coordinator.group.GroupCoordinator;
-import kafka.coordinator.group.GroupCoordinatorAdapter;
 import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
 import kafka.server.ApiVersionManager;
@@ -35,6 +33,7 @@ import kafka.server.ReplicaManager;
 import kafka.server.metadata.ConfigRepository;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.server.authorizer.Authorizer;
 
 import java.util.Collections;
@@ -179,7 +178,6 @@ public class KafkaApisBuilder {
                              metadataSupport,
                              replicaManager,
                              groupCoordinator,
-                             new GroupCoordinatorAdapter(groupCoordinator, 
time),
                              txnCoordinator,
                              autoTopicCreationManager,
                              brokerId,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 9cc46c3da15..c52d920b583 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -16,7 +16,7 @@
  */
 package kafka.coordinator.group
 
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.OffsetAndMetadata
 import kafka.server._
@@ -48,14 +48,16 @@ import scala.math.max
  * used by its callback.  The delayed callback may acquire the group lock
  * since the delayed operation is completed only if the group lock can be 
acquired.
  */
-class GroupCoordinator(val brokerId: Int,
-                       val groupConfig: GroupConfig,
-                       val offsetConfig: OffsetConfig,
-                       val groupManager: GroupMetadataManager,
-                       val heartbeatPurgatory: 
DelayedOperationPurgatory[DelayedHeartbeat],
-                       val rebalancePurgatory: 
DelayedOperationPurgatory[DelayedRebalance],
-                       time: Time,
-                       metrics: Metrics) extends Logging {
+private[group] class GroupCoordinator(
+  val brokerId: Int,
+  val groupConfig: GroupConfig,
+  val offsetConfig: OffsetConfig,
+  val groupManager: GroupMetadataManager,
+  val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+  val rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
+  time: Time,
+  metrics: Metrics
+) extends Logging {
   import GroupCoordinator._
 
   type JoinCallback = JoinGroupResult => Unit
@@ -1188,7 +1190,7 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: 
Option[Int]): Unit = {
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: 
OptionalInt): Unit = {
     info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
     groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
coordinatorEpoch, onGroupUnloaded)
   }
@@ -1707,10 +1709,12 @@ object GroupCoordinator {
   val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, 
NoMembers)
   val NewMemberJoinTimeoutMs: Int = 5 * 60 * 1000
 
-  def apply(config: KafkaConfig,
-            replicaManager: ReplicaManager,
-            time: Time,
-            metrics: Metrics): GroupCoordinator = {
+  private[group] def apply(
+    config: KafkaConfig,
+    replicaManager: ReplicaManager,
+    time: Time,
+    metrics: Metrics
+  ): GroupCoordinator = {
     val heartbeatPurgatory = 
DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
     val rebalancePurgatory = 
DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
     GroupCoordinator(config, replicaManager, heartbeatPurgatory, 
rebalancePurgatory, time, metrics)
@@ -1729,12 +1733,14 @@ object GroupCoordinator {
     offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
   )
 
-  def apply(config: KafkaConfig,
-            replicaManager: ReplicaManager,
-            heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
-            rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
-            time: Time,
-            metrics: Metrics): GroupCoordinator = {
+  private[group] def apply(
+    config: KafkaConfig,
+    replicaManager: ReplicaManager,
+    heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+    rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
+    time: Time,
+    metrics: Metrics
+  ): GroupCoordinator = {
     val offsetConfig = this.offsetConfig(config)
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = 
config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 7006217e82c..660070d024c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -17,26 +17,47 @@
 package kafka.coordinator.group
 
 import kafka.common.OffsetAndMetadata
-import kafka.server.RequestLocal
+import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Implicits.MapExtensionMethods
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, 
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, 
SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitReq [...]
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
+import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, 
TransactionResult}
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
 
 import java.util
-import java.util.Optional
+import java.util.{Optional, OptionalInt, Properties}
 import java.util.concurrent.CompletableFuture
+import java.util.function.IntSupplier
 import scala.collection.{immutable, mutable}
 import scala.jdk.CollectionConverters._
 
+object GroupCoordinatorAdapter {
+  def apply(
+    config: KafkaConfig,
+    replicaManager: ReplicaManager,
+    time: Time,
+    metrics: Metrics
+  ): GroupCoordinatorAdapter = {
+    new GroupCoordinatorAdapter(
+      GroupCoordinator(
+        config,
+        replicaManager,
+        time,
+        metrics
+      ),
+      time
+    )
+  }
+}
+
 /**
  * GroupCoordinatorAdapter is a thin wrapper around 
kafka.coordinator.group.GroupCoordinator
  * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator 
interface.
  */
-class GroupCoordinatorAdapter(
+private[group] class GroupCoordinatorAdapter(
   private val coordinator: GroupCoordinator,
   private val time: Time
 ) extends org.apache.kafka.coordinator.group.GroupCoordinator {
@@ -511,4 +532,53 @@ class GroupCoordinatorAdapter(
 
     future
   }
+
+  override def partitionFor(groupId: String): Int = {
+    coordinator.partitionFor(groupId)
+  }
+
+  override def onTransactionCompleted(
+    producerId: Long,
+    partitions: java.lang.Iterable[TopicPartition],
+    transactionResult: TransactionResult
+  ): Unit = {
+    coordinator.scheduleHandleTxnCompletion(
+      producerId,
+      partitions.asScala,
+      transactionResult
+    )
+  }
+
+  override def onPartitionsDeleted(
+    topicPartitions: util.List[TopicPartition],
+    bufferSupplier: BufferSupplier
+  ): Unit = {
+    coordinator.handleDeletedPartitions(topicPartitions.asScala, 
RequestLocal(bufferSupplier))
+  }
+
+  override def onElection(
+    groupMetadataPartitionIndex: Int,
+    groupMetadataPartitionLeaderEpoch: Int
+  ): Unit = {
+    coordinator.onElection(groupMetadataPartitionIndex, 
groupMetadataPartitionLeaderEpoch)
+  }
+
+  override def onResignation(
+    groupMetadataPartitionIndex: Int,
+    groupMetadataPartitionLeaderEpoch: OptionalInt
+  ): Unit = {
+    coordinator.onResignation(groupMetadataPartitionIndex, 
groupMetadataPartitionLeaderEpoch)
+  }
+
+  override def groupMetadataTopicConfigs(): Properties = {
+    coordinator.offsetsTopicConfigs
+  }
+
+  override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = {
+    coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt)
+  }
+
+  override def shutdown(): Unit = {
+    coordinator.shutdown()
+  }
 }
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 53820b004e8..a90b160d831 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
 import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
-import java.util.Optional
+import java.util.{Optional, OptionalInt}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.ConcurrentHashMap
@@ -547,7 +547,7 @@ class GroupMetadataManager(brokerId: Int,
     onGroupLoaded: GroupMetadata => Unit,
     startTimeMs: java.lang.Long
   ): Unit = {
-    if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, 
Some(coordinatorEpoch))) {
+    if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, 
OptionalInt.of(coordinatorEpoch))) {
       info(s"Not loading offsets and group metadata for $topicPartition " +
         s"in epoch $coordinatorEpoch since current epoch is 
${epochForPartitionId.get(topicPartition.partition)}")
     } else if (!addLoadingPartition(topicPartition.partition)) {
@@ -763,7 +763,7 @@ class GroupMetadataManager(brokerId: Int,
    * @param offsetsPartition Groups belonging to this partition of the offsets 
topic will be deleted from the cache.
    */
   def removeGroupsForPartition(offsetsPartition: Int,
-                               coordinatorEpoch: Option[Int],
+                               coordinatorEpoch: OptionalInt,
                                onGroupUnloaded: GroupMetadata => Unit): Unit = 
{
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
offsetsPartition)
     info(s"Scheduling unloading of offsets and group metadata from 
$topicPartition")
@@ -771,7 +771,7 @@ class GroupMetadataManager(brokerId: Int,
   }
 
   private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition,
-                                             coordinatorEpoch: Option[Int],
+                                             coordinatorEpoch: OptionalInt,
                                              onGroupUnloaded: GroupMetadata => 
Unit): Unit = {
     val offsetsPartition = topicPartition.partition
     if (maybeUpdateCoordinatorEpoch(offsetsPartition, coordinatorEpoch)) {
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
    */
   private def maybeUpdateCoordinatorEpoch(
     partitionId: Int,
-    epochOpt: Option[Int]
+    epochOpt: OptionalInt
   ): Boolean = {
     val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
       if (currentEpoch == null) {
-        epochOpt.map(Int.box).orNull
+        if (epochOpt.isPresent) epochOpt.getAsInt
+        else null
       } else {
-        epochOpt match {
-          case Some(epoch) if epoch > currentEpoch => epoch
-          case _ => currentEpoch
-        }
+        if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+        else currentEpoch
       }
     })
-    epochOpt.forall(_ == updatedEpoch)
+    if (epochOpt.isPresent) {
+      epochOpt.getAsInt == updatedEpoch
+    } else {
+      true
+    }
   }
 
   // visible for testing
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 60796abb51a..c9dfa019595 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Properties}
 
 import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.Logging
 import org.apache.kafka.clients.ClientResponse
@@ -34,6 +33,7 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, 
RequestContext, RequestHeader}
+import org.apache.kafka.coordinator.group.GroupCoordinator
 
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
@@ -237,7 +237,7 @@ class DefaultAutoTopicCreationManager(
           .setName(topic)
           .setNumPartitions(config.offsetsTopicPartitions)
           .setReplicationFactor(config.offsetsTopicReplicationFactor)
-          
.setConfigs(convertToTopicConfigCollections(groupCoordinator.offsetsTopicConfigs))
+          
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
       case TRANSACTION_STATE_TOPIC_NAME =>
         new CreatableTopic()
           .setName(topic)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c72f297b2fc..035aed7c810 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.cluster.Broker.ServerInfo
-import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
+import kafka.coordinator.group.GroupCoordinatorAdapter
 import kafka.coordinator.transaction.{ProducerIdManager, 
TransactionCoordinator}
 import kafka.log.LogManager
 import kafka.log.remote.RemoteLogManager
@@ -37,6 +37,7 @@ import 
org.apache.kafka.common.security.scram.internals.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
+import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
 import org.apache.kafka.raft
@@ -283,7 +284,12 @@ class BrokerServer(
 
       // Create group coordinator, but don't start it until we've started 
replica manager.
       // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it 
would be good to fix the underlying issue
-      groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, 
metrics)
+      groupCoordinator = GroupCoordinatorAdapter(
+        config,
+        replicaManager,
+        Time.SYSTEM,
+        metrics
+      )
 
       val producerIdManagerSupplier = () => ProducerIdManager.rpc(
         config.brokerId,
@@ -409,7 +415,6 @@ class BrokerServer(
         metadataSupport = raftSupport,
         replicaManager = replicaManager,
         groupCoordinator = groupCoordinator,
-        newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, 
time),
         txnCoordinator = transactionCoordinator,
         autoTopicCreationManager = autoTopicCreationManager,
         brokerId = config.nodeId,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8bbd7ef1aee..8040d00cbe2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -20,7 +20,6 @@ package kafka.server
 import kafka.admin.AdminUtils
 import kafka.api.ElectLeadersRequestOps
 import kafka.controller.ReplicaAssignment
-import kafka.coordinator.group._
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
@@ -65,6 +64,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
 import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, 
IBP_2_3_IV0}
@@ -76,7 +76,7 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, OptionalInt}
 import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, Set, mutable}
@@ -90,9 +90,6 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val metadataSupport: MetadataSupport,
                 val replicaManager: ReplicaManager,
                 val groupCoordinator: GroupCoordinator,
-                // newGroupCoordinator is temporary here. It will be removed 
when
-                // the migration to the new interface is completed in this 
class.
-                val newGroupCoordinator: 
org.apache.kafka.coordinator.group.GroupCoordinator,
                 val txnCoordinator: TransactionCoordinator,
                 val autoTopicCreationManager: AutoTopicCreationManager,
                 val brokerId: Int,
@@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
               && partitionState.deletePartition) {
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-              Some(partitionState.leaderEpoch)
+              OptionalInt.of(partitionState.leaderEpoch)
             else
-              None
+              OptionalInt.empty
             groupCoordinator.onResignation(topicPartition.partition, 
leaderEpoch)
           } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
                      && partitionState.deletePartition) {
@@ -357,8 +354,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         new UpdateMetadataResponse(new 
UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val deletedPartitions = 
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
-      if (deletedPartitions.nonEmpty)
-        groupCoordinator.handleDeletedPartitions(deletedPartitions, 
requestLocal)
+      if (deletedPartitions.nonEmpty) {
+        groupCoordinator.onPartitionsDeleted(deletedPartitions.asJava, 
requestLocal.bufferSupplier)
+      }
 
       if (zkSupport.adminManager.hasDelayedTopicOperations) {
         updateMetadataRequest.partitionStates.forEach { partitionState =>
@@ -538,7 +536,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
       .setTopics(authorizedTopicsRequest.asJava)
 
-    newGroupCoordinator.commitOffsets(
+    groupCoordinator.commitOffsets(
       request.context,
       offsetCommitRequestData,
       requestLocal.bufferSupplier
@@ -1437,7 +1435,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
     requireStable: Boolean
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
-    newGroupCoordinator.fetchAllOffsets(
+    groupCoordinator.fetchAllOffsets(
       requestContext,
       groupOffsetFetch.groupId,
       requireStable
@@ -1475,7 +1473,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       groupOffsetFetch.topics.asScala
     )(_.name)
 
-    newGroupCoordinator.fetchOffsets(
+    groupCoordinator.fetchOffsets(
       requestContext,
       groupOffsetFetch.groupId,
       authorizedTopics.asJava,
@@ -1628,7 +1626,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    newGroupCoordinator.describeGroups(
+    groupCoordinator.describeGroups(
       request.context,
       authorizedGroups.asJava
     ).handle[Unit] { (results, exception) =>
@@ -1663,7 +1661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val listGroupsRequest = request.body[ListGroupsRequest]
     val hasClusterDescribe = authHelper.authorize(request.context, DESCRIBE, 
CLUSTER, CLUSTER_NAME, logIfDenied = false)
 
-    newGroupCoordinator.listGroups(
+    groupCoordinator.listGroups(
       request.context,
       listGroupsRequest.data
     ).handle[Unit] { (response, exception) =>
@@ -1701,7 +1699,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      newGroupCoordinator.joinGroup(
+      groupCoordinator.joinGroup(
         request.context,
         joinGroupRequest.data,
         requestLocal.bufferSupplier
@@ -1735,7 +1733,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
syncGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      newGroupCoordinator.syncGroup(
+      groupCoordinator.syncGroup(
         request.context,
         syncGroupRequest.data,
         requestLocal.bufferSupplier
@@ -1759,7 +1757,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val (authorizedGroups, unauthorizedGroups) =
       authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP, 
groups)(identity)
 
-    newGroupCoordinator.deleteGroups(
+    groupCoordinator.deleteGroups(
       request.context,
       authorizedGroups.toList.asJava,
       requestLocal.bufferSupplier
@@ -1800,7 +1798,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      newGroupCoordinator.heartbeat(
+      groupCoordinator.heartbeat(
         request.context,
         heartbeatRequest.data
       ).handle[Unit] { (response, exception) =>
@@ -1820,7 +1818,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
leaveGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      newGroupCoordinator.leaveGroup(
+      groupCoordinator.leaveGroup(
         request.context,
         leaveGroupRequest.normalizedData()
       ).handle[Unit] { (response, exception) =>
@@ -2312,7 +2310,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // as soon as the end transaction marker has been written for a 
transactional offset commit,
         // call to the group coordinator to materialize the offsets into the 
cache
         try {
-          groupCoordinator.scheduleHandleTxnCompletion(producerId, 
successfulOffsetsPartitions, result)
+          groupCoordinator.onTransactionCompleted(producerId, 
successfulOffsetsPartitions.asJava, result)
         } catch {
           case e: Exception =>
             error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", e)
@@ -2586,7 +2584,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setTransactionalId(txnOffsetCommitRequest.data.transactionalId)
           .setTopics(authorizedTopicCommittedOffsets.asJava)
 
-        newGroupCoordinator.commitTransactionalOffsets(
+        groupCoordinator.commitTransactionalOffsets(
           request.context,
           txnOffsetCommitRequestData,
           requestLocal.bufferSupplier
@@ -3215,7 +3213,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         .setGroupId(offsetDeleteRequest.data.groupId)
         .setTopics(authorizedTopicPartitions)
 
-      newGroupCoordinator.deleteOffsets(
+      groupCoordinator.deleteOffsets(
         request.context,
         offsetDeleteRequestData,
         requestLocal.bufferSupplier
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala 
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 40247f76797..db5da9bd906 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import com.yammer.metrics.core.MetricName
-import kafka.coordinator.group.GroupCoordinator
 import kafka.log.LogManager
 import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
 import kafka.network.SocketServer
@@ -28,6 +27,7 @@ import 
org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index ea1f6b05956..892948481df 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, 
AtomicInteger}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, 
InconsistentClusterIdException}
 import kafka.controller.KafkaController
-import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
+import kafka.coordinator.group.GroupCoordinatorAdapter
 import kafka.coordinator.transaction.{ProducerIdManager, 
TransactionCoordinator}
 import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsReporter
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, KafkaException, Node}
+import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, 
VersionRange}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
@@ -444,7 +445,12 @@ class KafkaServer(
 
         /* start group coordinator */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, 
it would be good to fix the underlying issue
-        groupCoordinator = GroupCoordinator(config, replicaManager, 
Time.SYSTEM, metrics)
+        groupCoordinator = GroupCoordinatorAdapter(
+          config,
+          replicaManager,
+          Time.SYSTEM,
+          metrics
+        )
         groupCoordinator.startup(() => 
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
 
         /* create producer ids manager */
@@ -506,7 +512,6 @@ class KafkaServer(
           metadataSupport = zkSupport,
           replicaManager = replicaManager,
           groupCoordinator = groupCoordinator,
-          newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, 
time),
           txnCoordinator = transactionCoordinator,
           autoTopicCreationManager = autoTopicCreationManager,
           brokerId = config.brokerId,
diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala 
b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
index 75bd7a32969..dc80b514846 100644
--- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
+++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.cluster.Partition
-import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
@@ -27,6 +26,9 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.GroupCoordinator
+
+import java.util.OptionalInt
 
 object RequestHandlerHelper {
 
@@ -46,7 +48,7 @@ object RequestHandlerHelper {
 
     updatedFollowers.foreach { partition =>
       if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
-        groupCoordinator.onResignation(partition.partitionId, 
Some(partition.getLeaderEpoch))
+        groupCoordinator.onResignation(partition.partitionId, 
OptionalInt.of(partition.getLeaderEpoch))
       else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
         txnCoordinator.onResignation(partition.partitionId, 
Some(partition.getLeaderEpoch))
     }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 933a6bf8924..43d79e88601 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -17,21 +17,22 @@
 
 package kafka.server.metadata
 
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
 import java.util.concurrent.atomic.AtomicLong
-import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, 
TopicsImage}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.fault.FaultHandler
 
 import scala.collection.mutable
+import scala.jdk.CollectionConverters._
 
 
 object BrokerMetadataPublisher extends Logging {
@@ -174,7 +175,8 @@ class BrokerMetadataPublisher(
             delta,
             Topic.GROUP_METADATA_TOPIC_NAME,
             groupCoordinator.onElection,
-            groupCoordinator.onResignation)
+            (partitionIndex, leaderEpochOpt) => 
groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
+          )
         } catch {
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating group " +
             s"coordinator with local changes in ${deltaName}", t)
@@ -200,7 +202,7 @@ class BrokerMetadataPublisher(
             }
           }
           if (deletedTopicPartitions.nonEmpty) {
-            groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, 
RequestLocal.NoCaching)
+            
groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava, 
RequestLocal.NoCaching.bufferSupplier)
           }
         } catch {
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating group " +
@@ -270,6 +272,13 @@ class BrokerMetadataPublisher(
     }
   }
 
+  private def toOptionalInt(option: Option[Int]): OptionalInt = {
+    option match {
+      case Some(leaderEpoch) => OptionalInt.of(leaderEpoch)
+      case None => OptionalInt.empty
+    }
+  }
+
   override def publishedOffset: Long = publishedOffsetAtomic.get()
 
   def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
@@ -342,8 +351,8 @@ class BrokerMetadataPublisher(
     }
     try {
       // Start the group coordinator.
-      groupCoordinator.startup(() => metadataCache.numPartitions(
-        
Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(conf.offsetsTopicPartitions))
+      groupCoordinator.startup(() => 
metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
+        .getOrElse(conf.offsetsTopicPartitions))
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
GroupCoordinator", t)
     }
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 3f1f1c8054d..ba35694f782 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -158,7 +158,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     TestUtils.createTopicWithAdmin(adminClients.head, 
Topic.GROUP_METADATA_TOPIC_NAME, servers,
       numPartitions = servers.head.config.offsetsTopicPartitions,
       replicationFactor = numServers,
-      topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
+      topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)
 
     TestMetricsReporter.testReporters.clear()
   }
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index b908a1352b4..378f55e70b2 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -109,7 +109,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
     }
 
     TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 
OffsetConfig.DefaultOffsetsTopicNumPartitions,
-      replicationFactor = 2, servers, 
servers.head.groupCoordinator.offsetsTopicConfigs)
+      replicationFactor = 2, servers, 
servers.head.groupCoordinator.groupMetadataTopicConfigs)
 
     createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 98f89477b1e..6dd77d256a9 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.coordinator.group
 
-import java.util.Optional
+import java.util.{Optional, OptionalInt}
 import kafka.common.OffsetAndMetadata
 import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, 
ReplicaManager, RequestLocal}
 import kafka.utils._
@@ -190,7 +190,7 @@ class GroupCoordinatorTest {
     val otherGroupMetadataTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, otherGroupPartitionId)
     
when(replicaManager.getLog(otherGroupMetadataTopicPartition)).thenReturn(None)
     // Call removeGroupsAndOffsets so that partition removed from 
loadingPartitions
-    
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition,
 Some(1), group => {})
+    
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition,
 OptionalInt.of(1), group => {})
     
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
 1, group => {}, 0L)
     assertEquals(Errors.NONE, 
groupCoordinator.handleDescribeGroup(otherGroupId)._1)
   }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index d10c5b630fc..b4e2e519679 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
 import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, OptionalInt}
 import com.yammer.metrics.core.Gauge
 
 import javax.management.ObjectName
@@ -669,7 +669,7 @@ class GroupMetadataManagerTest {
     loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
     assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
 
-    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
Some(groupEpoch), _ => ())
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
OptionalInt.of(groupEpoch), _ => ())
     assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
       "Removed group remained in cache")
     assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
@@ -685,7 +685,7 @@ class GroupMetadataManagerTest {
     val groupEpoch = 2
     loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
 
-    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
Some(groupEpoch), _ => ())
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
OptionalInt.of(groupEpoch), _ => ())
     assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
     assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
     "Removed group remained in cache")
@@ -696,7 +696,7 @@ class GroupMetadataManagerTest {
     val groupEpoch = 2
     val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
 
-    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
Some(groupEpoch - 1), _ => ())
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
OptionalInt.of(groupEpoch - 1), _ => ())
     assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(initiallyLoaded.groupId, group.groupId)
@@ -718,7 +718,7 @@ class GroupMetadataManagerTest {
     val groupEpoch = 2
     val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
 
-    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, None, _ 
=> ())
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
OptionalInt.empty, _ => ())
     assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
       "Removed group remained in cache")
     assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()),
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 8c6b922972d..11f697b232d 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.{Collections, Optional, Properties}
 
 import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.{ClientResponse, NodeApiVersions, 
RequestCompletionHandler}
@@ -38,6 +37,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
@@ -82,7 +82,7 @@ class AutoTopicCreationManagerTest {
 
   @Test
   def testCreateOffsetTopic(): Unit = {
-    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new 
Properties)
     testCreateTopic(GROUP_METADATA_TOPIC_NAME, true, internalTopicPartitions, 
internalTopicReplicationFactor)
   }
 
@@ -159,7 +159,7 @@ class AutoTopicCreationManagerTest {
 
   @Test
   def testInvalidReplicationFactorForConsumerOffsetsTopic(): Unit = {
-    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new 
Properties)
     testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
   }
 
@@ -177,7 +177,7 @@ class AutoTopicCreationManagerTest {
 
   @Test
   def testTopicExistsErrorSwapForConsumerOffsetsTopic(): Unit = {
-    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new 
Properties)
     testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
       expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
   }
@@ -197,7 +197,7 @@ class AutoTopicCreationManagerTest {
 
   @Test
   def testRequestTimeoutErrorSwapForConsumerOffsetTopic(): Unit = {
-    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new 
Properties)
     testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
       expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
   }
@@ -216,7 +216,7 @@ class AutoTopicCreationManagerTest {
 
   @Test
   def testUnknownTopicPartitionForConsumerOffsetTopic(): Unit = {
-    Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+    Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new 
Properties)
     testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 92bbcf015a7..58c99567926 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -26,7 +26,6 @@ import java.util.{Collections, Optional, OptionalInt, 
OptionalLong, Properties}
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
 import kafka.controller.{ControllerContext, KafkaController}
-import kafka.coordinator.group._
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
@@ -91,6 +90,7 @@ import scala.jdk.CollectionConverters._
 import 
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
 OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, 
OffsetDeleteResponseTopicCollection}
+import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, 
IBP_2_2_IV1}
 import org.apache.kafka.server.log.internals.{AppendOrigin, FetchParams, 
FetchPartitionData}
@@ -100,8 +100,6 @@ class KafkaApisTest {
   private val requestChannelMetrics: RequestChannel.Metrics = 
mock(classOf[RequestChannel.Metrics])
   private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
   private val groupCoordinator: GroupCoordinator = 
mock(classOf[GroupCoordinator])
-  private val newGroupCoordinator: 
org.apache.kafka.coordinator.group.GroupCoordinator =
-    mock(classOf[org.apache.kafka.coordinator.group.GroupCoordinator])
   private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
   private val txnCoordinator: TransactionCoordinator = 
mock(classOf[TransactionCoordinator])
   private val controller: KafkaController = mock(classOf[KafkaController])
@@ -191,7 +189,6 @@ class KafkaApisTest {
       metadataSupport = metadataSupport,
       replicaManager = replicaManager,
       groupCoordinator = groupCoordinator,
-      newGroupCoordinator = newGroupCoordinator,
       txnCoordinator = txnCoordinator,
       autoTopicCreationManager = autoTopicCreationManager,
       brokerId = brokerId,
@@ -1049,7 +1046,7 @@ class KafkaApisTest {
         case CoordinatorType.GROUP =>
           topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, 
numBrokersNeeded.toString)
           
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, 
numBrokersNeeded.toString)
-          when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
+          when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new 
Properties)
           authorizeResource(authorizer, AclOperation.DESCRIBE, 
ResourceType.GROUP,
             groupId, AuthorizationResult.ALLOWED)
           Topic.GROUP_METADATA_TOPIC_NAME
@@ -1160,7 +1157,7 @@ class KafkaApisTest {
         case Topic.GROUP_METADATA_TOPIC_NAME =>
           topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, 
numBrokersNeeded.toString)
           
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, 
numBrokersNeeded.toString)
-          when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
+          when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new 
Properties)
           true
 
         case Topic.TRANSACTION_STATE_TOPIC_NAME =>
@@ -1283,7 +1280,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
 
     val future = new CompletableFuture[OffsetCommitResponseData]()
-    when(newGroupCoordinator.commitOffsets(
+    when(groupCoordinator.commitOffsets(
       requestChannelRequest.context,
       offsetCommitRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -1327,7 +1324,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
 
     val future = new CompletableFuture[OffsetCommitResponseData]()
-    when(newGroupCoordinator.commitOffsets(
+    when(groupCoordinator.commitOffsets(
       requestChannelRequest.context,
       offsetCommitRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -1423,7 +1420,7 @@ class KafkaApisTest {
               .setCommittedOffset(50)).asJava)).asJava)
 
     val future = new CompletableFuture[OffsetCommitResponseData]()
-    when(newGroupCoordinator.commitOffsets(
+    when(groupCoordinator.commitOffsets(
       requestChannelRequest.context,
       expectedOffsetCommitRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -1588,7 +1585,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
 
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
-    when(newGroupCoordinator.commitTransactionalOffsets(
+    when(groupCoordinator.commitTransactionalOffsets(
       requestChannelRequest.context,
       txnOffsetCommitRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -1632,7 +1629,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
 
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
-    when(newGroupCoordinator.commitTransactionalOffsets(
+    when(groupCoordinator.commitTransactionalOffsets(
       requestChannelRequest.context,
       txnOffsetCommitRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -1728,7 +1725,7 @@ class KafkaApisTest {
               .setCommittedOffset(50)).asJava)).asJava)
 
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
-    when(newGroupCoordinator.commitTransactionalOffsets(
+    when(groupCoordinator.commitTransactionalOffsets(
       requestChannelRequest.context,
       expectedTnxOffsetCommitRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -1829,7 +1826,7 @@ class KafkaApisTest {
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
-    when(newGroupCoordinator.commitTransactionalOffsets(
+    when(groupCoordinator.commitTransactionalOffsets(
       request.context,
       offsetCommitRequest.data,
       requestLocal.bufferSupplier
@@ -2364,10 +2361,10 @@ class KafkaApisTest {
     if (deletePartition) {
       if (leaderEpoch >= 0) {
         verify(txnCoordinator).onResignation(txnStatePartition.partition, 
Some(leaderEpoch))
-        
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, 
Some(leaderEpoch))
+        
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, 
OptionalInt.of(leaderEpoch))
       } else {
         verify(txnCoordinator).onResignation(txnStatePartition.partition, None)
-        
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, None)
+        
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, 
OptionalInt.empty)
       }
     }
   }
@@ -2462,7 +2459,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
 
     val future = new 
CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
-    when(newGroupCoordinator.deleteGroups(
+    when(groupCoordinator.deleteGroups(
       requestChannelRequest.context,
       List("group-1", "group-2", "group-3").asJava,
       RequestLocal.NoCaching.bufferSupplier
@@ -2505,7 +2502,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
 
     val future = new 
CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
-    when(newGroupCoordinator.deleteGroups(
+    when(groupCoordinator.deleteGroups(
       requestChannelRequest.context,
       List("group-1", "group-2", "group-3").asJava,
       RequestLocal.NoCaching.bufferSupplier
@@ -2564,7 +2561,7 @@ class KafkaApisTest {
     }
 
     val future = new 
CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
-    when(newGroupCoordinator.deleteGroups(
+    when(groupCoordinator.deleteGroups(
       requestChannelRequest.context,
       List("group-2", "group-3").asJava,
       RequestLocal.NoCaching.bufferSupplier
@@ -2611,7 +2608,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
DescribeGroupsRequest.Builder(describeGroupsRequest).build())
 
     val future = new 
CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
-    when(newGroupCoordinator.describeGroups(
+    when(groupCoordinator.describeGroups(
       requestChannelRequest.context,
       describeGroupsRequest.groups
     )).thenReturn(future)
@@ -2653,7 +2650,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
DescribeGroupsRequest.Builder(describeGroupsRequest).build())
 
     val future = new 
CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
-    when(newGroupCoordinator.describeGroups(
+    when(groupCoordinator.describeGroups(
       requestChannelRequest.context,
       describeGroupsRequest.groups
     )).thenReturn(future)
@@ -2707,7 +2704,7 @@ class KafkaApisTest {
     }
 
     val future = new 
CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
-    when(newGroupCoordinator.describeGroups(
+    when(groupCoordinator.describeGroups(
       requestChannelRequest.context,
       List("group-2").asJava
     )).thenReturn(future)
@@ -2764,7 +2761,7 @@ class KafkaApisTest {
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
     val future = new CompletableFuture[OffsetDeleteResponseData]()
-    when(newGroupCoordinator.deleteOffsets(
+    when(groupCoordinator.deleteOffsets(
       request.context,
       offsetDeleteRequest.data,
       requestLocal.bufferSupplier
@@ -2857,7 +2854,7 @@ class KafkaApisTest {
       ).asJava.iterator))
 
     val future = new CompletableFuture[OffsetDeleteResponseData]()
-    when(newGroupCoordinator.deleteOffsets(
+    when(groupCoordinator.deleteOffsets(
       requestChannelRequest.context,
       expectedOffsetDeleteRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -2962,7 +2959,7 @@ class KafkaApisTest {
 
       // The group coordinator is called even if there are no
       // topic-partitions left after the validation.
-      when(newGroupCoordinator.deleteOffsets(
+      when(groupCoordinator.deleteOffsets(
         request.context,
         new OffsetDeleteRequestData().setGroupId(group),
         RequestLocal.NoCaching.bufferSupplier
@@ -2994,7 +2991,7 @@ class KafkaApisTest {
     val request = buildRequest(offsetDeleteRequest)
 
     val future = new CompletableFuture[OffsetDeleteResponseData]()
-    when(newGroupCoordinator.deleteOffsets(
+    when(groupCoordinator.deleteOffsets(
       request.context,
       offsetDeleteRequest.data,
       RequestLocal.NoCaching.bufferSupplier
@@ -3386,7 +3383,7 @@ class KafkaApisTest {
       .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
     val future = new CompletableFuture[JoinGroupResponseData]()
-    when(newGroupCoordinator.joinGroup(
+    when(groupCoordinator.joinGroup(
       requestChannelRequest.context,
       expectedJoinGroupRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -3429,7 +3426,7 @@ class KafkaApisTest {
       .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
     val future = new CompletableFuture[JoinGroupResponseData]()
-    when(newGroupCoordinator.joinGroup(
+    when(groupCoordinator.joinGroup(
       requestChannelRequest.context,
       expectedJoinGroupRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -3448,7 +3445,7 @@ class KafkaApisTest {
     val expectedJoinGroupResponse = new JoinGroupResponseData()
       .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
       .setMemberId("member")
-      .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
+      .setProtocolName(if (version >= 7) null else 
kafka.coordinator.group.GroupCoordinator.NoProtocol)
 
     future.complete(joinGroupResponse)
     val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest)
@@ -3467,7 +3464,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build())
 
     val future = new CompletableFuture[JoinGroupResponseData]()
-    when(newGroupCoordinator.joinGroup(
+    when(groupCoordinator.joinGroup(
       requestChannelRequest.context,
       joinGroupRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -3519,7 +3516,7 @@ class KafkaApisTest {
     val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build())
 
     val future = new CompletableFuture[JoinGroupResponseData]()
-    when(newGroupCoordinator.joinGroup(
+    when(groupCoordinator.joinGroup(
       requestChannelRequest.context,
       joinGroupRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -3562,7 +3559,7 @@ class KafkaApisTest {
       .setProtocolName(if (version >= 5) "range" else null)
 
     val future = new CompletableFuture[SyncGroupResponseData]()
-    when(newGroupCoordinator.syncGroup(
+    when(groupCoordinator.syncGroup(
       requestChannelRequest.context,
       expectedSyncGroupRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -3599,7 +3596,7 @@ class KafkaApisTest {
       .setProtocolName("range")
 
     val future = new CompletableFuture[SyncGroupResponseData]()
-    when(newGroupCoordinator.syncGroup(
+    when(groupCoordinator.syncGroup(
       requestChannelRequest.context,
       expectedSyncGroupRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -3652,7 +3649,7 @@ class KafkaApisTest {
       .setMemberId("member")
 
     val future = new CompletableFuture[SyncGroupResponseData]()
-    when(newGroupCoordinator.syncGroup(
+    when(groupCoordinator.syncGroup(
       requestChannelRequest.context,
       expectedSyncGroupRequest,
       RequestLocal.NoCaching.bufferSupplier
@@ -3694,7 +3691,7 @@ class KafkaApisTest {
       .setGenerationId(0)
 
     val future = new CompletableFuture[HeartbeatResponseData]()
-    when(newGroupCoordinator.heartbeat(
+    when(groupCoordinator.heartbeat(
       requestChannelRequest.context,
       expectedHeartbeatRequest
     )).thenReturn(future)
@@ -3722,7 +3719,7 @@ class KafkaApisTest {
       .setGenerationId(0)
 
     val future = new CompletableFuture[HeartbeatResponseData]()
-    when(newGroupCoordinator.heartbeat(
+    when(groupCoordinator.heartbeat(
       requestChannelRequest.context,
       expectedHeartbeatRequest
     )).thenReturn(future)
@@ -3882,7 +3879,7 @@ class KafkaApisTest {
         ).asJava)
 
       val future = new CompletableFuture[LeaveGroupResponseData]()
-      when(newGroupCoordinator.leaveGroup(
+      when(groupCoordinator.leaveGroup(
         requestChannelRequest.context,
         expectedLeaveGroupRequest
       )).thenReturn(future)
@@ -3927,7 +3924,7 @@ class KafkaApisTest {
       ).asJava)
 
     val future = new CompletableFuture[LeaveGroupResponseData]()
-    when(newGroupCoordinator.leaveGroup(
+    when(groupCoordinator.leaveGroup(
       requestChannelRequest.context,
       expectedLeaveGroupRequest
     )).thenReturn(future)
@@ -3980,7 +3977,7 @@ class KafkaApisTest {
       ).asJava)
 
     val future = new CompletableFuture[LeaveGroupResponseData]()
-    when(newGroupCoordinator.leaveGroup(
+    when(groupCoordinator.leaveGroup(
       requestChannelRequest.context,
       expectedLeaveGroupRequest
     )).thenReturn(future)
@@ -4012,7 +4009,7 @@ class KafkaApisTest {
       ).asJava)
 
     val future = new CompletableFuture[LeaveGroupResponseData]()
-    when(newGroupCoordinator.leaveGroup(
+    when(groupCoordinator.leaveGroup(
       requestChannelRequest.context,
       expectedLeaveGroupRequest
     )).thenReturn(future)
@@ -4053,7 +4050,7 @@ class KafkaApisTest {
       val requestChannelRequest = makeRequest(version)
 
       val group1Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
-      when(newGroupCoordinator.fetchOffsets(
+      when(groupCoordinator.fetchOffsets(
         requestChannelRequest.context,
         "group-1",
         List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@@ -4064,14 +4061,14 @@ class KafkaApisTest {
       )).thenReturn(group1Future)
 
       val group2Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
-      when(newGroupCoordinator.fetchAllOffsets(
+      when(groupCoordinator.fetchAllOffsets(
         requestChannelRequest.context,
         "group-2",
         false
       )).thenReturn(group2Future)
 
       val group3Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
-      when(newGroupCoordinator.fetchAllOffsets(
+      when(groupCoordinator.fetchAllOffsets(
         requestChannelRequest.context,
         "group-3",
         false
@@ -4155,7 +4152,7 @@ class KafkaApisTest {
     val requestChannelRequest = makeRequest(version)
 
     val future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
-    when(newGroupCoordinator.fetchOffsets(
+    when(groupCoordinator.fetchOffsets(
       requestChannelRequest.context,
       "group-1",
       List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@@ -4254,7 +4251,7 @@ class KafkaApisTest {
 
     // group-1 is allowed and bar is allowed.
     val group1Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
-    when(newGroupCoordinator.fetchOffsets(
+    when(groupCoordinator.fetchOffsets(
       requestChannelRequest.context,
       "group-1",
       List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@@ -4266,7 +4263,7 @@ class KafkaApisTest {
 
     // group-3 is allowed and bar is allowed.
     val group3Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
-    when(newGroupCoordinator.fetchAllOffsets(
+    when(groupCoordinator.fetchAllOffsets(
       requestChannelRequest.context,
       "group-3",
       false
@@ -4656,7 +4653,7 @@ class KafkaApisTest {
       .setStatesFilter(if (version >= 4) List("Stable", "Empty").asJava else 
List.empty.asJava)
 
     val future = new CompletableFuture[ListGroupsResponseData]()
-    when(newGroupCoordinator.listGroups(
+    when(groupCoordinator.listGroups(
       requestChannelRequest.context,
       expectedListGroupsRequest
     )).thenReturn(future)
@@ -4691,7 +4688,7 @@ class KafkaApisTest {
       .setStatesFilter(List("Stable", "Empty").asJava)
 
     val future = new CompletableFuture[ListGroupsResponseData]()
-    when(newGroupCoordinator.listGroups(
+    when(groupCoordinator.listGroups(
       requestChannelRequest.context,
       expectedListGroupsRequest
     )).thenReturn(future)
@@ -4787,7 +4784,7 @@ class KafkaApisTest {
     val expectedListGroupsRequest = new ListGroupsRequestData()
 
     val future = new CompletableFuture[ListGroupsResponseData]()
-    when(newGroupCoordinator.listGroups(
+    when(groupCoordinator.listGroups(
       requestChannelRequest.context,
       expectedListGroupsRequest
     )).thenReturn(future)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 42d3b4f6865..58efab4b948 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -505,7 +505,7 @@ object TestUtils extends Logging {
       numPartitions = 
broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
       replicationFactor = 
broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       brokers = brokers,
-      topicConfig = broker.groupCoordinator.offsetsTopicConfigs,
+      topicConfig = broker.groupCoordinator.groupMetadataTopicConfigs,
     )
   }
 
@@ -611,7 +611,7 @@ object TestUtils extends Logging {
       server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
       
server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,
-      server.groupCoordinator.offsetsTopicConfigs)
+      server.groupCoordinator.groupMetadataTopicConfigs)
   }
 
   /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 3a772ab3d2b..c9cfadaba2f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -37,11 +38,18 @@ import 
org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.BufferSupplier;
 
 import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.IntSupplier;
 
+/**
+ * Group Coordinator's internal API.
+ */
 public interface GroupCoordinator {
 
     /**
@@ -215,4 +223,86 @@ public interface GroupCoordinator {
         OffsetDeleteRequestData request,
         BufferSupplier bufferSupplier
     );
+
+    /**
+     * Return the partition index for the given Group.
+     *
+     * @param groupId           The group id.
+     *
+     * @return The partition index.
+     */
+    int partitionFor(String groupId);
+
+    /**
+     * Commit or abort the pending transactional offsets for the given 
partitions.
+     *
+     * @param producerId        The producer id.
+     * @param partitions        The partitions.
+     * @param transactionResult The result of the transaction.
+     */
+    void onTransactionCompleted(
+        long producerId,
+        Iterable<TopicPartition> partitions,
+        TransactionResult transactionResult
+    );
+
+    /**
+     * Remove the provided deleted partitions offsets.
+     *
+     * @param topicPartitions   The deleted partitions.
+     * @param bufferSupplier    The buffer supplier tight to the request 
thread.
+     */
+    void onPartitionsDeleted(
+        List<TopicPartition> topicPartitions,
+        BufferSupplier bufferSupplier
+    );
+
+    /**
+     * Group coordinator is now the leader for the given partition at the
+     * given leader epoch. It should load cached state from the partition
+     * and begin handling requests for groups mapped to it.
+     *
+     * @param groupMetadataPartitionIndex         The partition index.
+     * @param groupMetadataPartitionLeaderEpoch   The leader epoch of the 
partition.
+     */
+    void onElection(
+        int groupMetadataPartitionIndex,
+        int groupMetadataPartitionLeaderEpoch
+    );
+
+    /**
+     * Group coordinator is no longer the leader for the given partition
+     * at the given leader epoch. It should unload cached state and stop
+     * handling requests for groups mapped to it.
+     *
+     * @param groupMetadataPartitionIndex         The partition index.
+     * @param groupMetadataPartitionLeaderEpoch   The leader epoch of the 
partition as an
+     *                                            optional value. An empty 
value means that
+     *                                            the topic was deleted.
+     */
+    void onResignation(
+        int groupMetadataPartitionIndex,
+        OptionalInt groupMetadataPartitionLeaderEpoch
+    );
+
+    /**
+     * Return the configuration properties of the internal group
+     * metadata topic.
+     *
+     * @return Properties of the internal topic.
+     */
+    Properties groupMetadataTopicConfigs();
+
+    /**
+     * Startup the group coordinator.
+     *
+     * @param groupMetadataTopicPartitionCount  A supplier to get the number 
of partitions
+     *                                          of the consumer offsets topic.
+     */
+    void startup(IntSupplier groupMetadataTopicPartitionCount);
+
+    /**
+     * Shutdown the group coordinator.
+     */
+    void shutdown();
 }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 9b907ef7f9e..5648ce2fb12 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.jmh.metadata;
 
 import kafka.controller.KafkaController;
-import kafka.coordinator.group.GroupCoordinator;
 import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
 import kafka.network.RequestConvertToJson;
@@ -60,6 +59,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;

Reply via email to