This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new dcc6e9e2d2d KAFKA-18399 Remove ZooKeeper from KafkaApis (1/N):
`LEADER_AND_ISR`, `STOP_REPLICA`, `UPDATE_METADATA` (#18417)
dcc6e9e2d2d is described below
commit dcc6e9e2d2d77e01bc29c95c425a6fa11f4ec226
Author: Ken Huang <[email protected]>
AuthorDate: Fri Jan 10 00:01:38 2025 +0800
KAFKA-18399 Remove ZooKeeper from KafkaApis (1/N): `LEADER_AND_ISR`,
`STOP_REPLICA`, `UPDATE_METADATA` (#18417)
Delete the handlers for LEADER_AND_ISR, STOP_REPLICA, and UPDATE_METADATA.
Also, remove the corresponding unit tests in KafkaApisTest.
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 139 +---------
.../scala/unit/kafka/server/KafkaApisTest.scala | 305 +--------------------
2 files changed, 5 insertions(+), 439 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1e8807de9d..b11df5e16f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,7 +24,7 @@ import kafka.server.QuotaFactory.{QuotaManagers,
UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import kafka.server.share.SharePartitionManager
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -52,7 +52,7 @@ import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset,
OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName}
+import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
@@ -73,7 +73,6 @@ import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion,
RequestLocal, TransactionVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0,
IBP_2_3_IV0}
-import org.apache.kafka.server.purgatory.TopicPartitionOperationKey
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -86,7 +85,7 @@ import java.time.Duration
import java.util
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
-import java.util.{Collections, Optional, OptionalInt}
+import java.util.{Collections, Optional}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
@@ -197,9 +196,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
- case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
- case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
- case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request,
requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN =>
handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request,
requestLocal).exceptionally(handleError)
case ApiKeys.OFFSET_FETCH =>
handleOffsetFetchRequest(request).exceptionally(handleError)
@@ -295,135 +291,6 @@ class KafkaApis(val requestChannel: RequestChannel,
replicaManager.tryCompleteActions()
}
- def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
- // ensureTopicExists is only for client facing requests
- // We can't have the ensureTopicExists check here since the controller
sends it as an advisory to all brokers so they
- // stop serving data to clients for the topic being deleted
- val correlationId = request.header.correlationId
- val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
-
- authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
- if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch,
leaderAndIsrRequest.isKRaftController)) {
- // When the broker restarts very quickly, it is possible for this broker
to receive request intended
- // for its previous generation so the broker should skip the stale
request.
- info(s"Received LeaderAndIsr request with broker epoch
${leaderAndIsrRequest.brokerEpoch} " +
- s"smaller than the current broker epoch
${zkSupport.controller.brokerEpoch} from " +
- s"controller ${leaderAndIsrRequest.controllerId} with epoch
${leaderAndIsrRequest.controllerEpoch}.")
- requestHelper.sendResponseExemptThrottle(request,
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_BROKER_EPOCH.exception))
- } else {
- val response = replicaManager.becomeLeaderOrFollower(correlationId,
leaderAndIsrRequest,
- RequestHandlerHelper.onLeadershipChange(groupCoordinator,
txnCoordinator, _, _))
- requestHelper.sendResponseExemptThrottle(request, response)
- }
- }
-
- def handleStopReplicaRequest(request: RequestChannel.Request): Unit = {
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
- // ensureTopicExists is only for client facing requests
- // We can't have the ensureTopicExists check here since the controller
sends it as an advisory to all brokers so they
- // stop serving data to clients for the topic being deleted
- val stopReplicaRequest = request.body[StopReplicaRequest]
- authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
- if (zkSupport.isBrokerEpochStale(stopReplicaRequest.brokerEpoch,
stopReplicaRequest.isKRaftController)) {
- // When the broker restarts very quickly, it is possible for this broker
to receive request intended
- // for its previous generation so the broker should skip the stale
request.
- info(s"Received StopReplica request with broker epoch
${stopReplicaRequest.brokerEpoch} " +
- s"smaller than the current broker epoch
${zkSupport.controller.brokerEpoch} from " +
- s"controller ${stopReplicaRequest.controllerId} with epoch
${stopReplicaRequest.controllerEpoch}.")
- requestHelper.sendResponseExemptThrottle(request, new
StopReplicaResponse(
- new
StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
- } else {
- val partitionStates = stopReplicaRequest.partitionStates().asScala
- val (result, error) = replicaManager.stopReplicas(
- request.context.correlationId,
- stopReplicaRequest.controllerId,
- stopReplicaRequest.controllerEpoch,
- partitionStates)
- // Clear the coordinator caches in case we were the leader. In the case
of a reassignment, we
- // cannot rely on the LeaderAndIsr API for this since it is only sent to
active replicas.
- result.foreachEntry { (topicPartition, error) =>
- if (error == Errors.NONE) {
- val partitionState = partitionStates(topicPartition)
- if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
- && partitionState.deletePartition) {
- val leaderEpoch = if (partitionState.leaderEpoch >= 0)
- OptionalInt.of(partitionState.leaderEpoch)
- else
- OptionalInt.empty
- groupCoordinator.onResignation(topicPartition.partition,
leaderEpoch)
- } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
- && partitionState.deletePartition) {
- val leaderEpoch = if (partitionState.leaderEpoch >= 0)
- Some(partitionState.leaderEpoch)
- else
- None
- txnCoordinator.onResignation(topicPartition.partition,
coordinatorEpoch = leaderEpoch)
- }
- }
- }
-
- def toStopReplicaPartition(tp: TopicPartition, error: Errors) =
- new StopReplicaResponseData.StopReplicaPartitionError()
- .setTopicName(tp.topic)
- .setPartitionIndex(tp.partition)
- .setErrorCode(error.code)
-
- requestHelper.sendResponseExemptThrottle(request, new
StopReplicaResponse(new StopReplicaResponseData()
- .setErrorCode(error.code)
- .setPartitionErrors(result.map {
- case (tp, error) => toStopReplicaPartition(tp, error)
- }.toBuffer.asJava)))
- }
-
-
CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads(),
this)
- }
-
- def handleUpdateMetadataRequest(request: RequestChannel.Request,
requestLocal: RequestLocal): Unit = {
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
- val correlationId = request.header.correlationId
- val updateMetadataRequest = request.body[UpdateMetadataRequest]
-
- authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
- if (zkSupport.isBrokerEpochStale(updateMetadataRequest.brokerEpoch,
updateMetadataRequest.isKRaftController)) {
- // When the broker restarts very quickly, it is possible for this broker
to receive request intended
- // for its previous generation so the broker should skip the stale
request.
- info(s"Received UpdateMetadata request with broker epoch
${updateMetadataRequest.brokerEpoch} " +
- s"smaller than the current broker epoch
${zkSupport.controller.brokerEpoch} from " +
- s"controller ${updateMetadataRequest.controllerId} with epoch
${updateMetadataRequest.controllerEpoch}.")
- requestHelper.sendResponseExemptThrottle(request,
- new UpdateMetadataResponse(new
UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
- } else {
- val deletedPartitions =
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
- if (deletedPartitions.nonEmpty) {
- groupCoordinator.onPartitionsDeleted(deletedPartitions.asJava,
requestLocal.bufferSupplier)
- }
-
- if (zkSupport.adminManager.hasDelayedTopicOperations) {
- updateMetadataRequest.partitionStates.forEach { partitionState =>
-
zkSupport.adminManager.tryCompleteDelayedTopicOperations(partitionState.topicName)
- }
- }
-
- quotas.clientQuotaCallback.ifPresent { callback =>
- if
(callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId,
request.context.listenerName))) {
- quotas.fetch.updateQuotaMetricConfigs()
- quotas.produce.updateQuotaMetricConfigs()
- quotas.request.updateQuotaMetricConfigs()
- quotas.controllerMutation.updateQuotaMetricConfigs()
- }
- }
- if (replicaManager.hasDelayedElectionOperations) {
- updateMetadataRequest.partitionStates.forEach { partitionState =>
- val tp = new TopicPartition(partitionState.topicName,
partitionState.partitionIndex)
- replicaManager.tryCompleteElection(new
TopicPartitionOperationKey(tp))
- }
- }
- requestHelper.sendResponseExemptThrottle(request, new
UpdateMetadataResponse(
- new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code)))
- }
- }
-
def handleControlledShutdownRequest(request: RequestChannel.Request): Unit =
{
val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
// ensureTopicExists is only for client facing requests
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 229b1a18a19..6542749beeb 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -57,7 +57,6 @@ import
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
import
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic,
OffsetDeleteResponseTopicCollection}
import
org.apache.kafka.common.message.ShareFetchRequestData.{AcknowledgementBatch,
ForgottenTopic}
import
org.apache.kafka.common.message.ShareFetchResponseData.{AcquiredRecords,
PartitionData, ShareFetchableTopicResponse}
-import
org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState,
StopReplicaTopicState}
import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
@@ -79,7 +78,6 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator,
GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
@@ -3034,99 +3032,7 @@ class KafkaApisTest extends Logging {
val markersResponse = capturedResponse.getValue
assertEquals(2, markersResponse.errorsByProducerId.size())
}
-
- @Test
- def
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch():
Unit = {
- shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
- LeaderAndIsr.INITIAL_LEADER_EPOCH + 2, deletePartition = true)
- }
-
- @Test
- def
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndDeleteSentinel():
Unit = {
- shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
- LeaderAndIsr.EPOCH_DURING_DELETE, deletePartition = true)
- }
-
- @Test
- def
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndNoEpochSentinel():
Unit = {
- shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
- LeaderAndIsr.NO_EPOCH, deletePartition = true)
- }
-
- @Test
- def shouldNotResignCoordinatorsIfStopReplicaReceivedWithoutDeleteFlag():
Unit = {
- shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
- LeaderAndIsr.INITIAL_LEADER_EPOCH + 2, deletePartition = false)
- }
-
- def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(leaderEpoch:
Int,
-
deletePartition: Boolean): Unit = {
- val controllerId = 0
- val controllerEpoch = 5
- val brokerEpoch = 230498320L
-
- val fooPartition = new TopicPartition("foo", 0)
- val groupMetadataPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
- val txnStatePartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 0)
-
- val topicStates = Seq(
- new StopReplicaTopicState()
- .setTopicName(groupMetadataPartition.topic)
- .setPartitionStates(Seq(new StopReplicaPartitionState()
- .setPartitionIndex(groupMetadataPartition.partition)
- .setLeaderEpoch(leaderEpoch)
- .setDeletePartition(deletePartition)).asJava),
- new StopReplicaTopicState()
- .setTopicName(txnStatePartition.topic)
- .setPartitionStates(Seq(new StopReplicaPartitionState()
- .setPartitionIndex(txnStatePartition.partition)
- .setLeaderEpoch(leaderEpoch)
- .setDeletePartition(deletePartition)).asJava),
- new StopReplicaTopicState()
- .setTopicName(fooPartition.topic)
- .setPartitionStates(Seq(new StopReplicaPartitionState()
- .setPartitionIndex(fooPartition.partition)
- .setLeaderEpoch(leaderEpoch)
- .setDeletePartition(deletePartition)).asJava)
- ).asJava
-
- val stopReplicaRequest = new StopReplicaRequest.Builder(
- ApiKeys.STOP_REPLICA.latestVersion,
- controllerId,
- controllerEpoch,
- brokerEpoch,
- false,
- topicStates
- ).build()
- val request = buildRequest(stopReplicaRequest)
-
- when(replicaManager.stopReplicas(
- ArgumentMatchers.eq(request.context.correlationId),
- ArgumentMatchers.eq(controllerId),
- ArgumentMatchers.eq(controllerEpoch),
- ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
- )).thenReturn(
- (mutable.Map(
- groupMetadataPartition -> Errors.NONE,
- txnStatePartition -> Errors.NONE,
- fooPartition -> Errors.NONE
- ), Errors.NONE)
- )
- when(controller.brokerEpoch).thenReturn(brokerEpoch)
- kafkaApis = createKafkaApis()
- kafkaApis.handleStopReplicaRequest(request)
-
- if (deletePartition) {
- if (leaderEpoch >= 0) {
- verify(txnCoordinator).onResignation(txnStatePartition.partition,
Some(leaderEpoch))
-
verify(groupCoordinator).onResignation(groupMetadataPartition.partition,
OptionalInt.of(leaderEpoch))
- } else {
- verify(txnCoordinator).onResignation(txnStatePartition.partition, None)
-
verify(groupCoordinator).onResignation(groupMetadataPartition.partition,
OptionalInt.empty)
- }
- }
- }
-
+
@Test
def
shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition():
Unit = {
val tp1 = new TopicPartition("t", 0)
@@ -9590,192 +9496,6 @@ class KafkaApisTest extends Logging {
assertEquals(records.sizeInBytes(),
brokerTopicStats.allTopicsStats.replicationBytesOutRate.get.count())
}
- @Test
- def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
- val currentBrokerEpoch = 1239875L
- testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch,
Errors.NONE)
- }
-
- @Test
- def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
- val currentBrokerEpoch = 1239875L
- testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1,
Errors.NONE)
- }
-
- @Test
- def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
- val currentBrokerEpoch = 1239875L
- testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1,
Errors.STALE_BROKER_EPOCH)
- }
-
- def testUpdateMetadataRequest(currentBrokerEpoch: Long,
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
- val updateMetadataRequest = createBasicMetadataRequest("topicA", 1,
brokerEpochInRequest, 1)
- val request = buildRequest(updateMetadataRequest)
-
- val capturedResponse: ArgumentCaptor[UpdateMetadataResponse] =
ArgumentCaptor.forClass(classOf[UpdateMetadataResponse])
-
- when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
- when(replicaManager.maybeUpdateMetadataCache(
- ArgumentMatchers.eq(request.context.correlationId),
- any()
- )).thenReturn(
- Seq()
- )
- kafkaApis = createKafkaApis()
- kafkaApis.handleUpdateMetadataRequest(request,
RequestLocal.withThreadConfinedCaching)
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None)
- )
- val updateMetadataResponse = capturedResponse.getValue
- assertEquals(expectedError, updateMetadataResponse.error())
- if (expectedError == Errors.NONE) {
- verify(replicaManager).maybeUpdateMetadataCache(
- ArgumentMatchers.eq(request.context.correlationId),
- any()
- )
- }
- }
-
- @Test
- def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
- val currentBrokerEpoch = 1239875L
- testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch,
Errors.NONE)
- }
-
- @Test
- def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
- val currentBrokerEpoch = 1239875L
- testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1,
Errors.NONE)
- }
-
- @Test
- def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
- val currentBrokerEpoch = 1239875L
- testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1,
Errors.STALE_BROKER_EPOCH)
- }
-
- def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest:
Long, expectedError: Errors): Unit = {
- val controllerId = 2
- val controllerEpoch = 6
- val capturedResponse: ArgumentCaptor[LeaderAndIsrResponse] =
ArgumentCaptor.forClass(classOf[LeaderAndIsrResponse])
- val partitionStates = Seq(
- new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
- .setTopicName("topicW")
- .setPartitionIndex(1)
- .setControllerEpoch(1)
- .setLeader(0)
- .setLeaderEpoch(1)
- .setIsr(asList(0, 1))
- .setPartitionEpoch(2)
- .setReplicas(asList(0, 1, 2))
- .setIsNew(false)
- ).asJava
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
- ApiKeys.LEADER_AND_ISR.latestVersion,
- controllerId,
- controllerEpoch,
- brokerEpochInRequest,
- partitionStates,
- Collections.singletonMap("topicW", Uuid.randomUuid()),
- asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
- ).build()
- val request = buildRequest(leaderAndIsrRequest)
- val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
- .setErrorCode(Errors.NONE.code)
- .setPartitionErrors(asList()), leaderAndIsrRequest.version())
-
- when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
- when(replicaManager.becomeLeaderOrFollower(
- ArgumentMatchers.eq(request.context.correlationId),
- any(),
- any()
- )).thenReturn(
- response
- )
- kafkaApis = createKafkaApis()
- kafkaApis.handleLeaderAndIsrRequest(request)
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None)
- )
- val leaderAndIsrResponse = capturedResponse.getValue
- assertEquals(expectedError, leaderAndIsrResponse.error())
- }
-
- @Test
- def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
- val currentBrokerEpoch = 1239875L
- testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
- }
-
- @Test
- def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
- val currentBrokerEpoch = 1239875L
- testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1,
Errors.NONE)
- }
-
- @Test
- def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
- val currentBrokerEpoch = 1239875L
- testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1,
Errors.STALE_BROKER_EPOCH)
- }
-
- def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest:
Long, expectedError: Errors): Unit = {
- val controllerId = 0
- val controllerEpoch = 5
- val capturedResponse: ArgumentCaptor[StopReplicaResponse] =
ArgumentCaptor.forClass(classOf[StopReplicaResponse])
- val fooPartition = new TopicPartition("foo", 0)
- val topicStates = Seq(
- new StopReplicaTopicState()
- .setTopicName(fooPartition.topic)
- .setPartitionStates(Seq(new StopReplicaPartitionState()
- .setPartitionIndex(fooPartition.partition)
- .setLeaderEpoch(1)
- .setDeletePartition(false)).asJava)
- ).asJava
- val stopReplicaRequest = new StopReplicaRequest.Builder(
- ApiKeys.STOP_REPLICA.latestVersion,
- controllerId,
- controllerEpoch,
- brokerEpochInRequest,
- false,
- topicStates
- ).build()
- val request = buildRequest(stopReplicaRequest)
-
- when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
- when(replicaManager.stopReplicas(
- ArgumentMatchers.eq(request.context.correlationId),
- ArgumentMatchers.eq(controllerId),
- ArgumentMatchers.eq(controllerEpoch),
- ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
- )).thenReturn(
- (mutable.Map(
- fooPartition -> Errors.NONE
- ), Errors.NONE)
- )
- kafkaApis = createKafkaApis()
- kafkaApis.handleStopReplicaRequest(request)
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None)
- )
- val stopReplicaResponse = capturedResponse.getValue
- assertEquals(expectedError, stopReplicaResponse.error())
- if (expectedError != Errors.STALE_BROKER_EPOCH) {
- verify(replicaManager).stopReplicas(
- ArgumentMatchers.eq(request.context.correlationId),
- ArgumentMatchers.eq(controllerId),
- ArgumentMatchers.eq(controllerEpoch),
- ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
- )
- }
- }
-
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.LIST_GROUPS)
def testListGroupsRequest(version: Short): Unit = {
@@ -10228,7 +9948,7 @@ class KafkaApisTest extends Logging {
numPartitions: Int,
brokerEpoch: Long,
numBrokers: Int,
- topicId: Uuid = Uuid.ZERO_UUID):
UpdateMetadataRequest = {
+ topicId: Uuid): UpdateMetadataRequest
= {
val replicas = List(0.asInstanceOf[Integer]).asJava
def createPartitionState(partition: Int) = new
UpdateMetadataPartitionState()
@@ -10796,27 +10516,6 @@ class KafkaApisTest extends Logging {
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage,
e.getMessage)
}
- @Test
- def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
- kafkaApis = createKafkaApis(raftSupport = true)
- verifyShouldNeverHandleErrorMessage(kafkaApis.handleLeaderAndIsrRequest)
- }
-
- @Test
- def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
- kafkaApis = createKafkaApis(raftSupport = true)
- verifyShouldNeverHandleErrorMessage(kafkaApis.handleStopReplicaRequest)
- }
-
- @Test
- def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
- kafkaApis = createKafkaApis(raftSupport = true)
-
verifyShouldNeverHandleErrorMessage(kafkaApis.handleUpdateMetadataRequest(_,
RequestLocal.withThreadConfinedCaching))
- }
-
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)