This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e0a005dd47 KAFKA-14367; Add internal APIs to the new
`GroupCoordinator` interface (#13112)
2e0a005dd47 is described below
commit 2e0a005dd47d922d66c1da9b20b6ab7f5d1be5a1
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 20 08:38:21 2023 +0100
KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface
(#13112)
This patch migrates all the internal APIs of the current group coordinator
to the new `GroupCoordinator` interface. It also makes the current
implementation package private to ensure that it is not used anymore.
Reviewers: Justine Olshan <[email protected]>
---
build.gradle | 1 +
checkstyle/import-control-jmh-benchmarks.xml | 1 +
.../kafka/server/builders/KafkaApisBuilder.java | 4 +-
.../kafka/coordinator/group/GroupCoordinator.scala | 46 ++++++-----
.../group/GroupCoordinatorAdapter.scala | 78 +++++++++++++++++-
.../coordinator/group/GroupMetadataManager.scala | 25 +++---
.../kafka/server/AutoTopicCreationManager.scala | 4 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 11 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 42 +++++-----
core/src/main/scala/kafka/server/KafkaBroker.scala | 2 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 11 ++-
.../scala/kafka/server/RequestHandlerHelper.scala | 6 +-
.../server/metadata/BrokerMetadataPublisher.scala | 21 +++--
.../server/DynamicBrokerReconfigurationTest.scala | 2 +-
...ListenersWithSameSecurityProtocolBaseTest.scala | 2 +-
.../coordinator/group/GroupCoordinatorTest.scala | 4 +-
.../group/GroupMetadataManagerTest.scala | 10 +--
.../server/AutoTopicCreationManagerTest.scala | 12 +--
.../scala/unit/kafka/server/KafkaApisTest.scala | 93 +++++++++++-----------
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +-
.../kafka/coordinator/group/GroupCoordinator.java | 90 +++++++++++++++++++++
.../jmh/metadata/MetadataRequestBenchmark.java | 2 +-
22 files changed, 328 insertions(+), 143 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7c1eafdc0c7..e535d291269 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2428,6 +2428,7 @@ project(':jmh-benchmarks') {
}
implementation project(':server-common')
implementation project(':clients')
+ implementation project(':group-coordinator')
implementation project(':metadata')
implementation project(':storage')
implementation project(':streams')
diff --git a/checkstyle/import-control-jmh-benchmarks.xml
b/checkstyle/import-control-jmh-benchmarks.xml
index 983c526eccc..1f58dbd8ee7 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -49,6 +49,7 @@
<allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.clients"/>
+ <allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.timeline" />
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 18cd42c77cb..d5403e0c57a 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -17,8 +17,6 @@
package kafka.server.builders;
-import kafka.coordinator.group.GroupCoordinator;
-import kafka.coordinator.group.GroupCoordinatorAdapter;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager;
@@ -35,6 +33,7 @@ import kafka.server.ReplicaManager;
import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Collections;
@@ -179,7 +178,6 @@ public class KafkaApisBuilder {
metadataSupport,
replicaManager,
groupCoordinator,
- new GroupCoordinatorAdapter(groupCoordinator,
time),
txnCoordinator,
autoTopicCreationManager,
brokerId,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 9cc46c3da15..c52d920b583 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -16,7 +16,7 @@
*/
package kafka.coordinator.group
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.OffsetAndMetadata
import kafka.server._
@@ -48,14 +48,16 @@ import scala.math.max
* used by its callback. The delayed callback may acquire the group lock
* since the delayed operation is completed only if the group lock can be
acquired.
*/
-class GroupCoordinator(val brokerId: Int,
- val groupConfig: GroupConfig,
- val offsetConfig: OffsetConfig,
- val groupManager: GroupMetadataManager,
- val heartbeatPurgatory:
DelayedOperationPurgatory[DelayedHeartbeat],
- val rebalancePurgatory:
DelayedOperationPurgatory[DelayedRebalance],
- time: Time,
- metrics: Metrics) extends Logging {
+private[group] class GroupCoordinator(
+ val brokerId: Int,
+ val groupConfig: GroupConfig,
+ val offsetConfig: OffsetConfig,
+ val groupManager: GroupMetadataManager,
+ val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+ val rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
+ time: Time,
+ metrics: Metrics
+) extends Logging {
import GroupCoordinator._
type JoinCallback = JoinGroupResult => Unit
@@ -1188,7 +1190,7 @@ class GroupCoordinator(val brokerId: Int,
*
* @param offsetTopicPartitionId The partition we are no longer leading
*/
- def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch:
Option[Int]): Unit = {
+ def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch:
OptionalInt): Unit = {
info(s"Resigned as the group coordinator for partition
$offsetTopicPartitionId in epoch $coordinatorEpoch")
groupManager.removeGroupsForPartition(offsetTopicPartitionId,
coordinatorEpoch, onGroupUnloaded)
}
@@ -1707,10 +1709,12 @@ object GroupCoordinator {
val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol,
NoMembers)
val NewMemberJoinTimeoutMs: Int = 5 * 60 * 1000
- def apply(config: KafkaConfig,
- replicaManager: ReplicaManager,
- time: Time,
- metrics: Metrics): GroupCoordinator = {
+ private[group] def apply(
+ config: KafkaConfig,
+ replicaManager: ReplicaManager,
+ time: Time,
+ metrics: Metrics
+ ): GroupCoordinator = {
val heartbeatPurgatory =
DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val rebalancePurgatory =
DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
GroupCoordinator(config, replicaManager, heartbeatPurgatory,
rebalancePurgatory, time, metrics)
@@ -1729,12 +1733,14 @@ object GroupCoordinator {
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
)
- def apply(config: KafkaConfig,
- replicaManager: ReplicaManager,
- heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
- rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
- time: Time,
- metrics: Metrics): GroupCoordinator = {
+ private[group] def apply(
+ config: KafkaConfig,
+ replicaManager: ReplicaManager,
+ heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+ rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
+ time: Time,
+ metrics: Metrics
+ ): GroupCoordinator = {
val offsetConfig = this.offsetConfig(config)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs =
config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 7006217e82c..660070d024c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -17,26 +17,47 @@
package kafka.coordinator.group
import kafka.common.OffsetAndMetadata
-import kafka.server.RequestLocal
+import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData,
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData,
SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitReq [...]
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
+import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext,
TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import java.util
-import java.util.Optional
+import java.util.{Optional, OptionalInt, Properties}
import java.util.concurrent.CompletableFuture
+import java.util.function.IntSupplier
import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._
+object GroupCoordinatorAdapter {
+ def apply(
+ config: KafkaConfig,
+ replicaManager: ReplicaManager,
+ time: Time,
+ metrics: Metrics
+ ): GroupCoordinatorAdapter = {
+ new GroupCoordinatorAdapter(
+ GroupCoordinator(
+ config,
+ replicaManager,
+ time,
+ metrics
+ ),
+ time
+ )
+ }
+}
+
/**
* GroupCoordinatorAdapter is a thin wrapper around
kafka.coordinator.group.GroupCoordinator
* that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator
interface.
*/
-class GroupCoordinatorAdapter(
+private[group] class GroupCoordinatorAdapter(
private val coordinator: GroupCoordinator,
private val time: Time
) extends org.apache.kafka.coordinator.group.GroupCoordinator {
@@ -511,4 +532,53 @@ class GroupCoordinatorAdapter(
future
}
+
+ override def partitionFor(groupId: String): Int = {
+ coordinator.partitionFor(groupId)
+ }
+
+ override def onTransactionCompleted(
+ producerId: Long,
+ partitions: java.lang.Iterable[TopicPartition],
+ transactionResult: TransactionResult
+ ): Unit = {
+ coordinator.scheduleHandleTxnCompletion(
+ producerId,
+ partitions.asScala,
+ transactionResult
+ )
+ }
+
+ override def onPartitionsDeleted(
+ topicPartitions: util.List[TopicPartition],
+ bufferSupplier: BufferSupplier
+ ): Unit = {
+ coordinator.handleDeletedPartitions(topicPartitions.asScala,
RequestLocal(bufferSupplier))
+ }
+
+ override def onElection(
+ groupMetadataPartitionIndex: Int,
+ groupMetadataPartitionLeaderEpoch: Int
+ ): Unit = {
+ coordinator.onElection(groupMetadataPartitionIndex,
groupMetadataPartitionLeaderEpoch)
+ }
+
+ override def onResignation(
+ groupMetadataPartitionIndex: Int,
+ groupMetadataPartitionLeaderEpoch: OptionalInt
+ ): Unit = {
+ coordinator.onResignation(groupMetadataPartitionIndex,
groupMetadataPartitionLeaderEpoch)
+ }
+
+ override def groupMetadataTopicConfigs(): Properties = {
+ coordinator.offsetsTopicConfigs
+ }
+
+ override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = {
+ coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt)
+ }
+
+ override def shutdown(): Unit = {
+ coordinator.shutdown()
+ }
}
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 53820b004e8..a90b160d831 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
-import java.util.Optional
+import java.util.{Optional, OptionalInt}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
@@ -547,7 +547,7 @@ class GroupMetadataManager(brokerId: Int,
onGroupLoaded: GroupMetadata => Unit,
startTimeMs: java.lang.Long
): Unit = {
- if (!maybeUpdateCoordinatorEpoch(topicPartition.partition,
Some(coordinatorEpoch))) {
+ if (!maybeUpdateCoordinatorEpoch(topicPartition.partition,
OptionalInt.of(coordinatorEpoch))) {
info(s"Not loading offsets and group metadata for $topicPartition " +
s"in epoch $coordinatorEpoch since current epoch is
${epochForPartitionId.get(topicPartition.partition)}")
} else if (!addLoadingPartition(topicPartition.partition)) {
@@ -763,7 +763,7 @@ class GroupMetadataManager(brokerId: Int,
* @param offsetsPartition Groups belonging to this partition of the offsets
topic will be deleted from the cache.
*/
def removeGroupsForPartition(offsetsPartition: Int,
- coordinatorEpoch: Option[Int],
+ coordinatorEpoch: OptionalInt,
onGroupUnloaded: GroupMetadata => Unit): Unit =
{
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from
$topicPartition")
@@ -771,7 +771,7 @@ class GroupMetadataManager(brokerId: Int,
}
private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition,
- coordinatorEpoch: Option[Int],
+ coordinatorEpoch: OptionalInt,
onGroupUnloaded: GroupMetadata =>
Unit): Unit = {
val offsetsPartition = topicPartition.partition
if (maybeUpdateCoordinatorEpoch(offsetsPartition, coordinatorEpoch)) {
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
private def maybeUpdateCoordinatorEpoch(
partitionId: Int,
- epochOpt: Option[Int]
+ epochOpt: OptionalInt
): Boolean = {
val updatedEpoch = epochForPartitionId.compute(partitionId, (_,
currentEpoch) => {
if (currentEpoch == null) {
- epochOpt.map(Int.box).orNull
+ if (epochOpt.isPresent) epochOpt.getAsInt
+ else null
} else {
- epochOpt match {
- case Some(epoch) if epoch > currentEpoch => epoch
- case _ => currentEpoch
- }
+ if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch)
epochOpt.getAsInt
+ else currentEpoch
}
})
- epochOpt.forall(_ == updatedEpoch)
+ if (epochOpt.isPresent) {
+ epochOpt.getAsInt == updatedEpoch
+ } else {
+ true
+ }
}
// visible for testing
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 60796abb51a..c9dfa019595 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
@@ -34,6 +33,7 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest,
RequestContext, RequestHeader}
+import org.apache.kafka.coordinator.group.GroupCoordinator
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@@ -237,7 +237,7 @@ class DefaultAutoTopicCreationManager(
.setName(topic)
.setNumPartitions(config.offsetsTopicPartitions)
.setReplicationFactor(config.offsetsTopicReplicationFactor)
-
.setConfigs(convertToTopicConfigCollections(groupCoordinator.offsetsTopicConfigs))
+
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
case TRANSACTION_STATE_TOPIC_NAME =>
new CreatableTopic()
.setName(topic)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c72f297b2fc..035aed7c810 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.cluster.Broker.ServerInfo
-import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
+import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager,
TransactionCoordinator}
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
@@ -37,6 +37,7 @@ import
org.apache.kafka.common.security.scram.internals.ScramMechanism
import
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
+import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft
@@ -283,7 +284,12 @@ class BrokerServer(
// Create group coordinator, but don't start it until we've started
replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it
would be good to fix the underlying issue
- groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM,
metrics)
+ groupCoordinator = GroupCoordinatorAdapter(
+ config,
+ replicaManager,
+ Time.SYSTEM,
+ metrics
+ )
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
@@ -409,7 +415,6 @@ class BrokerServer(
metadataSupport = raftSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
- newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator,
time),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.nodeId,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8bbd7ef1aee..8040d00cbe2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -20,7 +20,6 @@ package kafka.server
import kafka.admin.AdminUtils
import kafka.api.ElectLeadersRequestOps
import kafka.controller.ReplicaAssignment
-import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
@@ -65,6 +64,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken,
TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0,
IBP_2_3_IV0}
@@ -76,7 +76,7 @@ import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, OptionalInt}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
@@ -90,9 +90,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val metadataSupport: MetadataSupport,
val replicaManager: ReplicaManager,
val groupCoordinator: GroupCoordinator,
- // newGroupCoordinator is temporary here. It will be removed
when
- // the migration to the new interface is completed in this
class.
- val newGroupCoordinator:
org.apache.kafka.coordinator.group.GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val autoTopicCreationManager: AutoTopicCreationManager,
val brokerId: Int,
@@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
&& partitionState.deletePartition) {
val leaderEpoch = if (partitionState.leaderEpoch >= 0)
- Some(partitionState.leaderEpoch)
+ OptionalInt.of(partitionState.leaderEpoch)
else
- None
+ OptionalInt.empty
groupCoordinator.onResignation(topicPartition.partition,
leaderEpoch)
} else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
&& partitionState.deletePartition) {
@@ -357,8 +354,9 @@ class KafkaApis(val requestChannel: RequestChannel,
new UpdateMetadataResponse(new
UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
} else {
val deletedPartitions =
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
- if (deletedPartitions.nonEmpty)
- groupCoordinator.handleDeletedPartitions(deletedPartitions,
requestLocal)
+ if (deletedPartitions.nonEmpty) {
+ groupCoordinator.onPartitionsDeleted(deletedPartitions.asJava,
requestLocal.bufferSupplier)
+ }
if (zkSupport.adminManager.hasDelayedTopicOperations) {
updateMetadataRequest.partitionStates.forEach { partitionState =>
@@ -538,7 +536,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
.setTopics(authorizedTopicsRequest.asJava)
- newGroupCoordinator.commitOffsets(
+ groupCoordinator.commitOffsets(
request.context,
offsetCommitRequestData,
requestLocal.bufferSupplier
@@ -1437,7 +1435,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
- newGroupCoordinator.fetchAllOffsets(
+ groupCoordinator.fetchAllOffsets(
requestContext,
groupOffsetFetch.groupId,
requireStable
@@ -1475,7 +1473,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupOffsetFetch.topics.asScala
)(_.name)
- newGroupCoordinator.fetchOffsets(
+ groupCoordinator.fetchOffsets(
requestContext,
groupOffsetFetch.groupId,
authorizedTopics.asJava,
@@ -1628,7 +1626,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- newGroupCoordinator.describeGroups(
+ groupCoordinator.describeGroups(
request.context,
authorizedGroups.asJava
).handle[Unit] { (results, exception) =>
@@ -1663,7 +1661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val listGroupsRequest = request.body[ListGroupsRequest]
val hasClusterDescribe = authHelper.authorize(request.context, DESCRIBE,
CLUSTER, CLUSTER_NAME, logIfDenied = false)
- newGroupCoordinator.listGroups(
+ groupCoordinator.listGroups(
request.context,
listGroupsRequest.data
).handle[Unit] { (response, exception) =>
@@ -1701,7 +1699,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request,
joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
- newGroupCoordinator.joinGroup(
+ groupCoordinator.joinGroup(
request.context,
joinGroupRequest.data,
requestLocal.bufferSupplier
@@ -1735,7 +1733,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request,
syncGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
- newGroupCoordinator.syncGroup(
+ groupCoordinator.syncGroup(
request.context,
syncGroupRequest.data,
requestLocal.bufferSupplier
@@ -1759,7 +1757,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedGroups, unauthorizedGroups) =
authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP,
groups)(identity)
- newGroupCoordinator.deleteGroups(
+ groupCoordinator.deleteGroups(
request.context,
authorizedGroups.toList.asJava,
requestLocal.bufferSupplier
@@ -1800,7 +1798,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request,
heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
- newGroupCoordinator.heartbeat(
+ groupCoordinator.heartbeat(
request.context,
heartbeatRequest.data
).handle[Unit] { (response, exception) =>
@@ -1820,7 +1818,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request,
leaveGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
- newGroupCoordinator.leaveGroup(
+ groupCoordinator.leaveGroup(
request.context,
leaveGroupRequest.normalizedData()
).handle[Unit] { (response, exception) =>
@@ -2312,7 +2310,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// as soon as the end transaction marker has been written for a
transactional offset commit,
// call to the group coordinator to materialize the offsets into the
cache
try {
- groupCoordinator.scheduleHandleTxnCompletion(producerId,
successfulOffsetsPartitions, result)
+ groupCoordinator.onTransactionCompleted(producerId,
successfulOffsetsPartitions.asJava, result)
} catch {
case e: Exception =>
error(s"Received an exception while trying to update the offsets
cache on transaction marker append", e)
@@ -2586,7 +2584,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setTransactionalId(txnOffsetCommitRequest.data.transactionalId)
.setTopics(authorizedTopicCommittedOffsets.asJava)
- newGroupCoordinator.commitTransactionalOffsets(
+ groupCoordinator.commitTransactionalOffsets(
request.context,
txnOffsetCommitRequestData,
requestLocal.bufferSupplier
@@ -3215,7 +3213,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setGroupId(offsetDeleteRequest.data.groupId)
.setTopics(authorizedTopicPartitions)
- newGroupCoordinator.deleteOffsets(
+ groupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequestData,
requestLocal.bufferSupplier
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 40247f76797..db5da9bd906 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -18,7 +18,6 @@
package kafka.server
import com.yammer.metrics.core.MetricName
-import kafka.coordinator.group.GroupCoordinator
import kafka.log.LogManager
import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
import kafka.network.SocketServer
@@ -28,6 +27,7 @@ import
org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index ea1f6b05956..892948481df 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicInteger}
import kafka.cluster.{Broker, EndPoint}
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException,
InconsistentClusterIdException}
import kafka.controller.KafkaController
-import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
+import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager,
TransactionCoordinator}
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsReporter
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, Node}
+import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde,
VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
@@ -444,7 +445,12 @@ class KafkaServer(
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise,
it would be good to fix the underlying issue
- groupCoordinator = GroupCoordinator(config, replicaManager,
Time.SYSTEM, metrics)
+ groupCoordinator = GroupCoordinatorAdapter(
+ config,
+ replicaManager,
+ Time.SYSTEM,
+ metrics
+ )
groupCoordinator.startup(() =>
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
/* create producer ids manager */
@@ -506,7 +512,6 @@ class KafkaServer(
metadataSupport = zkSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
- newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator,
time),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.brokerId,
diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
index 75bd7a32969..dc80b514846 100644
--- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
+++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.Partition
-import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
@@ -27,6 +26,9 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.GroupCoordinator
+
+import java.util.OptionalInt
object RequestHandlerHelper {
@@ -46,7 +48,7 @@ object RequestHandlerHelper {
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
- groupCoordinator.onResignation(partition.partitionId,
Some(partition.getLeaderEpoch))
+ groupCoordinator.onResignation(partition.partitionId,
OptionalInt.of(partition.getLeaderEpoch))
else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.onResignation(partition.partitionId,
Some(partition.getLeaderEpoch))
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 933a6bf8924..43d79e88601 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -17,21 +17,22 @@
package kafka.server.metadata
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicLong
-import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta,
TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.fault.FaultHandler
import scala.collection.mutable
+import scala.jdk.CollectionConverters._
object BrokerMetadataPublisher extends Logging {
@@ -174,7 +175,8 @@ class BrokerMetadataPublisher(
delta,
Topic.GROUP_METADATA_TOPIC_NAME,
groupCoordinator.onElection,
- groupCoordinator.onResignation)
+ (partitionIndex, leaderEpochOpt) =>
groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
+ )
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in ${deltaName}", t)
@@ -200,7 +202,7 @@ class BrokerMetadataPublisher(
}
}
if (deletedTopicPartitions.nonEmpty) {
- groupCoordinator.handleDeletedPartitions(deletedTopicPartitions,
RequestLocal.NoCaching)
+
groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava,
RequestLocal.NoCaching.bufferSupplier)
}
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating group " +
@@ -270,6 +272,13 @@ class BrokerMetadataPublisher(
}
}
+ private def toOptionalInt(option: Option[Int]): OptionalInt = {
+ option match {
+ case Some(leaderEpoch) => OptionalInt.of(leaderEpoch)
+ case None => OptionalInt.empty
+ }
+ }
+
override def publishedOffset: Long = publishedOffsetAtomic.get()
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
@@ -342,8 +351,8 @@ class BrokerMetadataPublisher(
}
try {
// Start the group coordinator.
- groupCoordinator.startup(() => metadataCache.numPartitions(
-
Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(conf.offsetsTopicPartitions))
+ groupCoordinator.startup(() =>
metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
+ .getOrElse(conf.offsetsTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting
GroupCoordinator", t)
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 3f1f1c8054d..ba35694f782 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -158,7 +158,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
TestUtils.createTopicWithAdmin(adminClients.head,
Topic.GROUP_METADATA_TOPIC_NAME, servers,
numPartitions = servers.head.config.offsetsTopicPartitions,
replicationFactor = numServers,
- topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
+ topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)
TestMetricsReporter.testReporters.clear()
}
diff --git
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index b908a1352b4..378f55e70b2 100644
---
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -109,7 +109,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
}
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
OffsetConfig.DefaultOffsetsTopicNumPartitions,
- replicationFactor = 2, servers,
servers.head.groupCoordinator.offsetsTopicConfigs)
+ replicationFactor = 2, servers,
servers.head.groupCoordinator.groupMetadataTopicConfigs)
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser,
JaasTestUtils.KafkaScramPassword)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 98f89477b1e..6dd77d256a9 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.group
-import java.util.Optional
+import java.util.{Optional, OptionalInt}
import kafka.common.OffsetAndMetadata
import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig,
ReplicaManager, RequestLocal}
import kafka.utils._
@@ -190,7 +190,7 @@ class GroupCoordinatorTest {
val otherGroupMetadataTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, otherGroupPartitionId)
when(replicaManager.getLog(otherGroupMetadataTopicPartition)).thenReturn(None)
// Call removeGroupsAndOffsets so that partition removed from
loadingPartitions
-
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition,
Some(1), group => {})
+
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition,
OptionalInt.of(1), group => {})
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
1, group => {}, 0L)
assertEquals(Errors.NONE,
groupCoordinator.handleDescribeGroup(otherGroupId)._1)
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index d10c5b630fc..b4e2e519679 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.locks.ReentrantLock
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, OptionalInt}
import com.yammer.metrics.core.Gauge
import javax.management.ObjectName
@@ -669,7 +669,7 @@ class GroupMetadataManagerTest {
loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
- groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
Some(groupEpoch), _ => ())
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
OptionalInt.of(groupEpoch), _ => ())
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
@@ -685,7 +685,7 @@ class GroupMetadataManagerTest {
val groupEpoch = 2
loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
- groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
Some(groupEpoch), _ => ())
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
OptionalInt.of(groupEpoch), _ => ())
assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
@@ -696,7 +696,7 @@ class GroupMetadataManagerTest {
val groupEpoch = 2
val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
- groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
Some(groupEpoch - 1), _ => ())
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
OptionalInt.of(groupEpoch - 1), _ => ())
assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(initiallyLoaded.groupId, group.groupId)
@@ -718,7 +718,7 @@ class GroupMetadataManagerTest {
val groupEpoch = 2
val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
- groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, None, _
=> ())
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
OptionalInt.empty, _ => ())
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()),
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 8c6b922972d..11f697b232d 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Optional, Properties}
import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions,
RequestCompletionHandler}
@@ -38,6 +37,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.coordinator.group.GroupCoordinator
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@@ -82,7 +82,7 @@ class AutoTopicCreationManagerTest {
@Test
def testCreateOffsetTopic(): Unit = {
- Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new
Properties)
testCreateTopic(GROUP_METADATA_TOPIC_NAME, true, internalTopicPartitions,
internalTopicReplicationFactor)
}
@@ -159,7 +159,7 @@ class AutoTopicCreationManagerTest {
@Test
def testInvalidReplicationFactorForConsumerOffsetsTopic(): Unit = {
- Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new
Properties)
testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
}
@@ -177,7 +177,7 @@ class AutoTopicCreationManagerTest {
@Test
def testTopicExistsErrorSwapForConsumerOffsetsTopic(): Unit = {
- Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new
Properties)
testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
}
@@ -197,7 +197,7 @@ class AutoTopicCreationManagerTest {
@Test
def testRequestTimeoutErrorSwapForConsumerOffsetTopic(): Unit = {
- Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new
Properties)
testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
}
@@ -216,7 +216,7 @@ class AutoTopicCreationManagerTest {
@Test
def testUnknownTopicPartitionForConsumerOffsetTopic(): Unit = {
- Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new
Properties)
+ Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new
Properties)
testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION,
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 92bbcf015a7..58c99567926 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -26,7 +26,6 @@ import java.util.{Collections, Optional, OptionalInt,
OptionalLong, Properties}
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{ControllerContext, KafkaController}
-import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
@@ -91,6 +90,7 @@ import scala.jdk.CollectionConverters._
import
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic,
OffsetDeleteResponseTopicCollection}
+import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0,
IBP_2_2_IV1}
import org.apache.kafka.server.log.internals.{AppendOrigin, FetchParams,
FetchPartitionData}
@@ -100,8 +100,6 @@ class KafkaApisTest {
private val requestChannelMetrics: RequestChannel.Metrics =
mock(classOf[RequestChannel.Metrics])
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
private val groupCoordinator: GroupCoordinator =
mock(classOf[GroupCoordinator])
- private val newGroupCoordinator:
org.apache.kafka.coordinator.group.GroupCoordinator =
- mock(classOf[org.apache.kafka.coordinator.group.GroupCoordinator])
private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
private val txnCoordinator: TransactionCoordinator =
mock(classOf[TransactionCoordinator])
private val controller: KafkaController = mock(classOf[KafkaController])
@@ -191,7 +189,6 @@ class KafkaApisTest {
metadataSupport = metadataSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
- newGroupCoordinator = newGroupCoordinator,
txnCoordinator = txnCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = brokerId,
@@ -1049,7 +1046,7 @@ class KafkaApisTest {
case CoordinatorType.GROUP =>
topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp,
numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp,
numBrokersNeeded.toString)
- when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
+ when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new
Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE,
ResourceType.GROUP,
groupId, AuthorizationResult.ALLOWED)
Topic.GROUP_METADATA_TOPIC_NAME
@@ -1160,7 +1157,7 @@ class KafkaApisTest {
case Topic.GROUP_METADATA_TOPIC_NAME =>
topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp,
numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp,
numBrokersNeeded.toString)
- when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
+ when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new
Properties)
true
case Topic.TRANSACTION_STATE_TOPIC_NAME =>
@@ -1283,7 +1280,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
OffsetCommitRequest.Builder(offsetCommitRequest).build())
val future = new CompletableFuture[OffsetCommitResponseData]()
- when(newGroupCoordinator.commitOffsets(
+ when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
offsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -1327,7 +1324,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
OffsetCommitRequest.Builder(offsetCommitRequest).build())
val future = new CompletableFuture[OffsetCommitResponseData]()
- when(newGroupCoordinator.commitOffsets(
+ when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
offsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -1423,7 +1420,7 @@ class KafkaApisTest {
.setCommittedOffset(50)).asJava)).asJava)
val future = new CompletableFuture[OffsetCommitResponseData]()
- when(newGroupCoordinator.commitOffsets(
+ when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
expectedOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -1588,7 +1585,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
- when(newGroupCoordinator.commitTransactionalOffsets(
+ when(groupCoordinator.commitTransactionalOffsets(
requestChannelRequest.context,
txnOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -1632,7 +1629,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
- when(newGroupCoordinator.commitTransactionalOffsets(
+ when(groupCoordinator.commitTransactionalOffsets(
requestChannelRequest.context,
txnOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -1728,7 +1725,7 @@ class KafkaApisTest {
.setCommittedOffset(50)).asJava)).asJava)
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
- when(newGroupCoordinator.commitTransactionalOffsets(
+ when(groupCoordinator.commitTransactionalOffsets(
requestChannelRequest.context,
expectedTnxOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -1829,7 +1826,7 @@ class KafkaApisTest {
val requestLocal = RequestLocal.withThreadConfinedCaching
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
- when(newGroupCoordinator.commitTransactionalOffsets(
+ when(groupCoordinator.commitTransactionalOffsets(
request.context,
offsetCommitRequest.data,
requestLocal.bufferSupplier
@@ -2364,10 +2361,10 @@ class KafkaApisTest {
if (deletePartition) {
if (leaderEpoch >= 0) {
verify(txnCoordinator).onResignation(txnStatePartition.partition,
Some(leaderEpoch))
-
verify(groupCoordinator).onResignation(groupMetadataPartition.partition,
Some(leaderEpoch))
+
verify(groupCoordinator).onResignation(groupMetadataPartition.partition,
OptionalInt.of(leaderEpoch))
} else {
verify(txnCoordinator).onResignation(txnStatePartition.partition, None)
-
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, None)
+
verify(groupCoordinator).onResignation(groupMetadataPartition.partition,
OptionalInt.empty)
}
}
}
@@ -2462,7 +2459,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
val future = new
CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
- when(newGroupCoordinator.deleteGroups(
+ when(groupCoordinator.deleteGroups(
requestChannelRequest.context,
List("group-1", "group-2", "group-3").asJava,
RequestLocal.NoCaching.bufferSupplier
@@ -2505,7 +2502,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
val future = new
CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
- when(newGroupCoordinator.deleteGroups(
+ when(groupCoordinator.deleteGroups(
requestChannelRequest.context,
List("group-1", "group-2", "group-3").asJava,
RequestLocal.NoCaching.bufferSupplier
@@ -2564,7 +2561,7 @@ class KafkaApisTest {
}
val future = new
CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
- when(newGroupCoordinator.deleteGroups(
+ when(groupCoordinator.deleteGroups(
requestChannelRequest.context,
List("group-2", "group-3").asJava,
RequestLocal.NoCaching.bufferSupplier
@@ -2611,7 +2608,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
DescribeGroupsRequest.Builder(describeGroupsRequest).build())
val future = new
CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
- when(newGroupCoordinator.describeGroups(
+ when(groupCoordinator.describeGroups(
requestChannelRequest.context,
describeGroupsRequest.groups
)).thenReturn(future)
@@ -2653,7 +2650,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
DescribeGroupsRequest.Builder(describeGroupsRequest).build())
val future = new
CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
- when(newGroupCoordinator.describeGroups(
+ when(groupCoordinator.describeGroups(
requestChannelRequest.context,
describeGroupsRequest.groups
)).thenReturn(future)
@@ -2707,7 +2704,7 @@ class KafkaApisTest {
}
val future = new
CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
- when(newGroupCoordinator.describeGroups(
+ when(groupCoordinator.describeGroups(
requestChannelRequest.context,
List("group-2").asJava
)).thenReturn(future)
@@ -2764,7 +2761,7 @@ class KafkaApisTest {
val requestLocal = RequestLocal.withThreadConfinedCaching
val future = new CompletableFuture[OffsetDeleteResponseData]()
- when(newGroupCoordinator.deleteOffsets(
+ when(groupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequest.data,
requestLocal.bufferSupplier
@@ -2857,7 +2854,7 @@ class KafkaApisTest {
).asJava.iterator))
val future = new CompletableFuture[OffsetDeleteResponseData]()
- when(newGroupCoordinator.deleteOffsets(
+ when(groupCoordinator.deleteOffsets(
requestChannelRequest.context,
expectedOffsetDeleteRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -2962,7 +2959,7 @@ class KafkaApisTest {
// The group coordinator is called even if there are no
// topic-partitions left after the validation.
- when(newGroupCoordinator.deleteOffsets(
+ when(groupCoordinator.deleteOffsets(
request.context,
new OffsetDeleteRequestData().setGroupId(group),
RequestLocal.NoCaching.bufferSupplier
@@ -2994,7 +2991,7 @@ class KafkaApisTest {
val request = buildRequest(offsetDeleteRequest)
val future = new CompletableFuture[OffsetDeleteResponseData]()
- when(newGroupCoordinator.deleteOffsets(
+ when(groupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequest.data,
RequestLocal.NoCaching.bufferSupplier
@@ -3386,7 +3383,7 @@ class KafkaApisTest {
.setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
val future = new CompletableFuture[JoinGroupResponseData]()
- when(newGroupCoordinator.joinGroup(
+ when(groupCoordinator.joinGroup(
requestChannelRequest.context,
expectedJoinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -3429,7 +3426,7 @@ class KafkaApisTest {
.setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
val future = new CompletableFuture[JoinGroupResponseData]()
- when(newGroupCoordinator.joinGroup(
+ when(groupCoordinator.joinGroup(
requestChannelRequest.context,
expectedJoinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -3448,7 +3445,7 @@ class KafkaApisTest {
val expectedJoinGroupResponse = new JoinGroupResponseData()
.setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
.setMemberId("member")
- .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
+ .setProtocolName(if (version >= 7) null else
kafka.coordinator.group.GroupCoordinator.NoProtocol)
future.complete(joinGroupResponse)
val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest)
@@ -3467,7 +3464,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
JoinGroupRequest.Builder(joinGroupRequest).build())
val future = new CompletableFuture[JoinGroupResponseData]()
- when(newGroupCoordinator.joinGroup(
+ when(groupCoordinator.joinGroup(
requestChannelRequest.context,
joinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -3519,7 +3516,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new
JoinGroupRequest.Builder(joinGroupRequest).build())
val future = new CompletableFuture[JoinGroupResponseData]()
- when(newGroupCoordinator.joinGroup(
+ when(groupCoordinator.joinGroup(
requestChannelRequest.context,
joinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -3562,7 +3559,7 @@ class KafkaApisTest {
.setProtocolName(if (version >= 5) "range" else null)
val future = new CompletableFuture[SyncGroupResponseData]()
- when(newGroupCoordinator.syncGroup(
+ when(groupCoordinator.syncGroup(
requestChannelRequest.context,
expectedSyncGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -3599,7 +3596,7 @@ class KafkaApisTest {
.setProtocolName("range")
val future = new CompletableFuture[SyncGroupResponseData]()
- when(newGroupCoordinator.syncGroup(
+ when(groupCoordinator.syncGroup(
requestChannelRequest.context,
expectedSyncGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -3652,7 +3649,7 @@ class KafkaApisTest {
.setMemberId("member")
val future = new CompletableFuture[SyncGroupResponseData]()
- when(newGroupCoordinator.syncGroup(
+ when(groupCoordinator.syncGroup(
requestChannelRequest.context,
expectedSyncGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@@ -3694,7 +3691,7 @@ class KafkaApisTest {
.setGenerationId(0)
val future = new CompletableFuture[HeartbeatResponseData]()
- when(newGroupCoordinator.heartbeat(
+ when(groupCoordinator.heartbeat(
requestChannelRequest.context,
expectedHeartbeatRequest
)).thenReturn(future)
@@ -3722,7 +3719,7 @@ class KafkaApisTest {
.setGenerationId(0)
val future = new CompletableFuture[HeartbeatResponseData]()
- when(newGroupCoordinator.heartbeat(
+ when(groupCoordinator.heartbeat(
requestChannelRequest.context,
expectedHeartbeatRequest
)).thenReturn(future)
@@ -3882,7 +3879,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
- when(newGroupCoordinator.leaveGroup(
+ when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@@ -3927,7 +3924,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
- when(newGroupCoordinator.leaveGroup(
+ when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@@ -3980,7 +3977,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
- when(newGroupCoordinator.leaveGroup(
+ when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@@ -4012,7 +4009,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
- when(newGroupCoordinator.leaveGroup(
+ when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@@ -4053,7 +4050,7 @@ class KafkaApisTest {
val requestChannelRequest = makeRequest(version)
val group1Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
- when(newGroupCoordinator.fetchOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@@ -4064,14 +4061,14 @@ class KafkaApisTest {
)).thenReturn(group1Future)
val group2Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
- when(newGroupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-2",
false
)).thenReturn(group2Future)
val group3Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
- when(newGroupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-3",
false
@@ -4155,7 +4152,7 @@ class KafkaApisTest {
val requestChannelRequest = makeRequest(version)
val future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
- when(newGroupCoordinator.fetchOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@@ -4254,7 +4251,7 @@ class KafkaApisTest {
// group-1 is allowed and bar is allowed.
val group1Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
- when(newGroupCoordinator.fetchOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@@ -4266,7 +4263,7 @@ class KafkaApisTest {
// group-3 is allowed and bar is allowed.
val group3Future = new
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
- when(newGroupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-3",
false
@@ -4656,7 +4653,7 @@ class KafkaApisTest {
.setStatesFilter(if (version >= 4) List("Stable", "Empty").asJava else
List.empty.asJava)
val future = new CompletableFuture[ListGroupsResponseData]()
- when(newGroupCoordinator.listGroups(
+ when(groupCoordinator.listGroups(
requestChannelRequest.context,
expectedListGroupsRequest
)).thenReturn(future)
@@ -4691,7 +4688,7 @@ class KafkaApisTest {
.setStatesFilter(List("Stable", "Empty").asJava)
val future = new CompletableFuture[ListGroupsResponseData]()
- when(newGroupCoordinator.listGroups(
+ when(groupCoordinator.listGroups(
requestChannelRequest.context,
expectedListGroupsRequest
)).thenReturn(future)
@@ -4787,7 +4784,7 @@ class KafkaApisTest {
val expectedListGroupsRequest = new ListGroupsRequestData()
val future = new CompletableFuture[ListGroupsResponseData]()
- when(newGroupCoordinator.listGroups(
+ when(groupCoordinator.listGroups(
requestChannelRequest.context,
expectedListGroupsRequest
)).thenReturn(future)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 42d3b4f6865..58efab4b948 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -505,7 +505,7 @@ object TestUtils extends Logging {
numPartitions =
broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
replicationFactor =
broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
brokers = brokers,
- topicConfig = broker.groupCoordinator.offsetsTopicConfigs,
+ topicConfig = broker.groupCoordinator.groupMetadataTopicConfigs,
)
}
@@ -611,7 +611,7 @@ object TestUtils extends Logging {
server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
- server.groupCoordinator.offsetsTopicConfigs)
+ server.groupCoordinator.groupMetadataTopicConfigs)
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 3a772ab3d2b..c9cfadaba2f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -37,11 +38,18 @@ import
org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.function.IntSupplier;
+/**
+ * Group Coordinator's internal API.
+ */
public interface GroupCoordinator {
/**
@@ -215,4 +223,86 @@ public interface GroupCoordinator {
OffsetDeleteRequestData request,
BufferSupplier bufferSupplier
);
+
+ /**
+ * Return the partition index for the given Group.
+ *
+ * @param groupId The group id.
+ *
+ * @return The partition index.
+ */
+ int partitionFor(String groupId);
+
+ /**
+ * Commit or abort the pending transactional offsets for the given
partitions.
+ *
+ * @param producerId The producer id.
+ * @param partitions The partitions.
+ * @param transactionResult The result of the transaction.
+ */
+ void onTransactionCompleted(
+ long producerId,
+ Iterable<TopicPartition> partitions,
+ TransactionResult transactionResult
+ );
+
+ /**
+ * Remove the provided deleted partitions offsets.
+ *
+ * @param topicPartitions The deleted partitions.
+ * @param bufferSupplier The buffer supplier tight to the request
thread.
+ */
+ void onPartitionsDeleted(
+ List<TopicPartition> topicPartitions,
+ BufferSupplier bufferSupplier
+ );
+
+ /**
+ * Group coordinator is now the leader for the given partition at the
+ * given leader epoch. It should load cached state from the partition
+ * and begin handling requests for groups mapped to it.
+ *
+ * @param groupMetadataPartitionIndex The partition index.
+ * @param groupMetadataPartitionLeaderEpoch The leader epoch of the
partition.
+ */
+ void onElection(
+ int groupMetadataPartitionIndex,
+ int groupMetadataPartitionLeaderEpoch
+ );
+
+ /**
+ * Group coordinator is no longer the leader for the given partition
+ * at the given leader epoch. It should unload cached state and stop
+ * handling requests for groups mapped to it.
+ *
+ * @param groupMetadataPartitionIndex The partition index.
+ * @param groupMetadataPartitionLeaderEpoch The leader epoch of the
partition as an
+ * optional value. An empty
value means that
+ * the topic was deleted.
+ */
+ void onResignation(
+ int groupMetadataPartitionIndex,
+ OptionalInt groupMetadataPartitionLeaderEpoch
+ );
+
+ /**
+ * Return the configuration properties of the internal group
+ * metadata topic.
+ *
+ * @return Properties of the internal topic.
+ */
+ Properties groupMetadataTopicConfigs();
+
+ /**
+ * Startup the group coordinator.
+ *
+ * @param groupMetadataTopicPartitionCount A supplier to get the number
of partitions
+ * of the consumer offsets topic.
+ */
+ void startup(IntSupplier groupMetadataTopicPartitionCount);
+
+ /**
+ * Shutdown the group coordinator.
+ */
+ void shutdown();
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 9b907ef7f9e..5648ce2fb12 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -18,7 +18,6 @@
package org.apache.kafka.jmh.metadata;
import kafka.controller.KafkaController;
-import kafka.coordinator.group.GroupCoordinator;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
@@ -60,6 +59,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.common.MetadataVersion;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;