This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new dcc6e9e2d2d KAFKA-18399 Remove ZooKeeper from KafkaApis (1/N): 
`LEADER_AND_ISR`, `STOP_REPLICA`, `UPDATE_METADATA` (#18417)
dcc6e9e2d2d is described below

commit dcc6e9e2d2d77e01bc29c95c425a6fa11f4ec226
Author: Ken Huang <[email protected]>
AuthorDate: Fri Jan 10 00:01:38 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (1/N): `LEADER_AND_ISR`, 
`STOP_REPLICA`, `UPDATE_METADATA` (#18417)
    
    Delete the handlers for LEADER_AND_ISR, STOP_REPLICA, and UPDATE_METADATA. 
Also, remove the corresponding unit tests in KafkaApisTest.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 139 +---------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 305 +--------------------
 2 files changed, 5 insertions(+), 439 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1e8807de9d..b11df5e16f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,7 +24,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, 
UNBOUNDED_QUOTA}
 import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
 import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
 import kafka.server.share.SharePartitionManager
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.Logging
 import org.apache.kafka.admin.AdminUtils
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -52,7 +52,7 @@ import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset,
 OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
@@ -73,7 +73,6 @@ import org.apache.kafka.server.ClientMetricsManager
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, 
RequestLocal, TransactionVersion}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, 
IBP_2_3_IV0}
-import org.apache.kafka.server.purgatory.TopicPartitionOperationKey
 import org.apache.kafka.server.share.context.ShareFetchContext
 import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
SharePartitionKey}
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -86,7 +85,7 @@ import java.time.Duration
 import java.util
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
-import java.util.{Collections, Optional, OptionalInt}
+import java.util.{Collections, Optional}
 import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, Set, mutable}
@@ -197,9 +196,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.FETCH => handleFetchRequest(request)
         case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
         case ApiKeys.METADATA => handleTopicMetadataRequest(request)
-        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
-        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
-        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, 
requestLocal)
         case ApiKeys.CONTROLLED_SHUTDOWN => 
handleControlledShutdownRequest(request)
         case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, 
requestLocal).exceptionally(handleError)
         case ApiKeys.OFFSET_FETCH => 
handleOffsetFetchRequest(request).exceptionally(handleError)
@@ -295,135 +291,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     replicaManager.tryCompleteActions()
   }
 
-  def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-    // ensureTopicExists is only for client facing requests
-    // We can't have the ensureTopicExists check here since the controller 
sends it as an advisory to all brokers so they
-    // stop serving data to clients for the topic being deleted
-    val correlationId = request.header.correlationId
-    val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
-
-    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, 
leaderAndIsrRequest.isKRaftController)) {
-      // When the broker restarts very quickly, it is possible for this broker 
to receive request intended
-      // for its previous generation so the broker should skip the stale 
request.
-      info(s"Received LeaderAndIsr request with broker epoch 
${leaderAndIsrRequest.brokerEpoch} " +
-        s"smaller than the current broker epoch 
${zkSupport.controller.brokerEpoch} from " +
-        s"controller ${leaderAndIsrRequest.controllerId} with epoch 
${leaderAndIsrRequest.controllerEpoch}.")
-      requestHelper.sendResponseExemptThrottle(request, 
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_BROKER_EPOCH.exception))
-    } else {
-      val response = replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest,
-        RequestHandlerHelper.onLeadershipChange(groupCoordinator, 
txnCoordinator, _, _))
-      requestHelper.sendResponseExemptThrottle(request, response)
-    }
-  }
-
-  def handleStopReplicaRequest(request: RequestChannel.Request): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-    // ensureTopicExists is only for client facing requests
-    // We can't have the ensureTopicExists check here since the controller 
sends it as an advisory to all brokers so they
-    // stop serving data to clients for the topic being deleted
-    val stopReplicaRequest = request.body[StopReplicaRequest]
-    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    if (zkSupport.isBrokerEpochStale(stopReplicaRequest.brokerEpoch, 
stopReplicaRequest.isKRaftController)) {
-      // When the broker restarts very quickly, it is possible for this broker 
to receive request intended
-      // for its previous generation so the broker should skip the stale 
request.
-      info(s"Received StopReplica request with broker epoch 
${stopReplicaRequest.brokerEpoch} " +
-        s"smaller than the current broker epoch 
${zkSupport.controller.brokerEpoch} from " +
-        s"controller ${stopReplicaRequest.controllerId} with epoch 
${stopReplicaRequest.controllerEpoch}.")
-      requestHelper.sendResponseExemptThrottle(request, new 
StopReplicaResponse(
-        new 
StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
-    } else {
-      val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case 
of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to 
active replicas.
-      result.foreachEntry { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          val partitionState = partitionStates(topicPartition)
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionState.deletePartition) {
-            val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-              OptionalInt.of(partitionState.leaderEpoch)
-            else
-              OptionalInt.empty
-            groupCoordinator.onResignation(topicPartition.partition, 
leaderEpoch)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionState.deletePartition) {
-            val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-              Some(partitionState.leaderEpoch)
-            else
-              None
-            txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
-          }
-        }
-      }
-
-      def toStopReplicaPartition(tp: TopicPartition, error: Errors) =
-        new StopReplicaResponseData.StopReplicaPartitionError()
-          .setTopicName(tp.topic)
-          .setPartitionIndex(tp.partition)
-          .setErrorCode(error.code)
-
-      requestHelper.sendResponseExemptThrottle(request, new 
StopReplicaResponse(new StopReplicaResponseData()
-        .setErrorCode(error.code)
-        .setPartitionErrors(result.map {
-          case (tp, error) => toStopReplicaPartition(tp, error)
-        }.toBuffer.asJava)))
-    }
-
-    
CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads(),
 this)
-  }
-
-  def handleUpdateMetadataRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-    val correlationId = request.header.correlationId
-    val updateMetadataRequest = request.body[UpdateMetadataRequest]
-
-    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    if (zkSupport.isBrokerEpochStale(updateMetadataRequest.brokerEpoch, 
updateMetadataRequest.isKRaftController)) {
-      // When the broker restarts very quickly, it is possible for this broker 
to receive request intended
-      // for its previous generation so the broker should skip the stale 
request.
-      info(s"Received UpdateMetadata request with broker epoch 
${updateMetadataRequest.brokerEpoch} " +
-        s"smaller than the current broker epoch 
${zkSupport.controller.brokerEpoch} from " +
-        s"controller ${updateMetadataRequest.controllerId} with epoch 
${updateMetadataRequest.controllerEpoch}.")
-      requestHelper.sendResponseExemptThrottle(request,
-        new UpdateMetadataResponse(new 
UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
-    } else {
-      val deletedPartitions = 
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
-      if (deletedPartitions.nonEmpty) {
-        groupCoordinator.onPartitionsDeleted(deletedPartitions.asJava, 
requestLocal.bufferSupplier)
-      }
-
-      if (zkSupport.adminManager.hasDelayedTopicOperations) {
-        updateMetadataRequest.partitionStates.forEach { partitionState =>
-          
zkSupport.adminManager.tryCompleteDelayedTopicOperations(partitionState.topicName)
-        }
-      }
-
-      quotas.clientQuotaCallback.ifPresent { callback =>
-        if 
(callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, 
request.context.listenerName))) {
-          quotas.fetch.updateQuotaMetricConfigs()
-          quotas.produce.updateQuotaMetricConfigs()
-          quotas.request.updateQuotaMetricConfigs()
-          quotas.controllerMutation.updateQuotaMetricConfigs()
-        }
-      }
-      if (replicaManager.hasDelayedElectionOperations) {
-        updateMetadataRequest.partitionStates.forEach { partitionState =>
-          val tp = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
-          replicaManager.tryCompleteElection(new 
TopicPartitionOperationKey(tp))
-        }
-      }
-      requestHelper.sendResponseExemptThrottle(request, new 
UpdateMetadataResponse(
-        new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code)))
-    }
-  }
-
   def handleControlledShutdownRequest(request: RequestChannel.Request): Unit = 
{
     val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
     // ensureTopicExists is only for client facing requests
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 229b1a18a19..6542749beeb 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -57,7 +57,6 @@ import 
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
 OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, 
OffsetDeleteResponseTopicCollection}
 import 
org.apache.kafka.common.message.ShareFetchRequestData.{AcknowledgementBatch, 
ForgottenTopic}
 import 
org.apache.kafka.common.message.ShareFetchResponseData.{AcquiredRecords, 
PartitionData, ShareFetchableTopicResponse}
-import 
org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState,
 StopReplicaTopicState}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.metrics.Metrics
@@ -79,7 +78,6 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorTestConfig}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.LeaderAndIsr
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AclEntry
@@ -3034,99 +3032,7 @@ class KafkaApisTest extends Logging {
     val markersResponse = capturedResponse.getValue
     assertEquals(2, markersResponse.errorsByProducerId.size())
   }
-
-  @Test
-  def 
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch(): 
Unit = {
-    shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
-      LeaderAndIsr.INITIAL_LEADER_EPOCH + 2, deletePartition = true)
-  }
-
-  @Test
-  def 
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndDeleteSentinel(): 
Unit = {
-    shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
-      LeaderAndIsr.EPOCH_DURING_DELETE, deletePartition = true)
-  }
-
-  @Test
-  def 
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndNoEpochSentinel():
 Unit = {
-    shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
-      LeaderAndIsr.NO_EPOCH, deletePartition = true)
-  }
-
-  @Test
-  def shouldNotResignCoordinatorsIfStopReplicaReceivedWithoutDeleteFlag(): 
Unit = {
-    shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
-      LeaderAndIsr.INITIAL_LEADER_EPOCH + 2, deletePartition = false)
-  }
-
-  def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(leaderEpoch: 
Int,
-                                                                  
deletePartition: Boolean): Unit = {
-    val controllerId = 0
-    val controllerEpoch = 5
-    val brokerEpoch = 230498320L
-
-    val fooPartition = new TopicPartition("foo", 0)
-    val groupMetadataPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
-    val txnStatePartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 0)
-
-    val topicStates = Seq(
-      new StopReplicaTopicState()
-        .setTopicName(groupMetadataPartition.topic)
-        .setPartitionStates(Seq(new StopReplicaPartitionState()
-          .setPartitionIndex(groupMetadataPartition.partition)
-          .setLeaderEpoch(leaderEpoch)
-          .setDeletePartition(deletePartition)).asJava),
-      new StopReplicaTopicState()
-        .setTopicName(txnStatePartition.topic)
-        .setPartitionStates(Seq(new StopReplicaPartitionState()
-          .setPartitionIndex(txnStatePartition.partition)
-          .setLeaderEpoch(leaderEpoch)
-          .setDeletePartition(deletePartition)).asJava),
-      new StopReplicaTopicState()
-        .setTopicName(fooPartition.topic)
-        .setPartitionStates(Seq(new StopReplicaPartitionState()
-          .setPartitionIndex(fooPartition.partition)
-          .setLeaderEpoch(leaderEpoch)
-          .setDeletePartition(deletePartition)).asJava)
-    ).asJava
-
-    val stopReplicaRequest = new StopReplicaRequest.Builder(
-      ApiKeys.STOP_REPLICA.latestVersion,
-      controllerId,
-      controllerEpoch,
-      brokerEpoch,
-      false,
-      topicStates
-    ).build()
-    val request = buildRequest(stopReplicaRequest)
-
-    when(replicaManager.stopReplicas(
-      ArgumentMatchers.eq(request.context.correlationId),
-      ArgumentMatchers.eq(controllerId),
-      ArgumentMatchers.eq(controllerEpoch),
-      ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
-    )).thenReturn(
-      (mutable.Map(
-        groupMetadataPartition -> Errors.NONE,
-        txnStatePartition -> Errors.NONE,
-        fooPartition -> Errors.NONE
-      ), Errors.NONE)
-    )
-    when(controller.brokerEpoch).thenReturn(brokerEpoch)
-    kafkaApis = createKafkaApis()
-    kafkaApis.handleStopReplicaRequest(request)
-
-    if (deletePartition) {
-      if (leaderEpoch >= 0) {
-        verify(txnCoordinator).onResignation(txnStatePartition.partition, 
Some(leaderEpoch))
-        
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, 
OptionalInt.of(leaderEpoch))
-      } else {
-        verify(txnCoordinator).onResignation(txnStatePartition.partition, None)
-        
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, 
OptionalInt.empty)
-      }
-    }
-  }
-
+  
   @Test
   def 
shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition():
 Unit = {
     val tp1 = new TopicPartition("t", 0)
@@ -9590,192 +9496,6 @@ class KafkaApisTest extends Logging {
     assertEquals(records.sizeInBytes(), 
brokerTopicStats.allTopicsStats.replicationBytesOutRate.get.count())
   }
 
-  @Test
-  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
-  }
-
-  @Test
-  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
-  }
-
-  @Test
-  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
-  }
-
-  def testUpdateMetadataRequest(currentBrokerEpoch: Long, 
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
-    val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, 
brokerEpochInRequest, 1)
-    val request = buildRequest(updateMetadataRequest)
-
-    val capturedResponse: ArgumentCaptor[UpdateMetadataResponse] = 
ArgumentCaptor.forClass(classOf[UpdateMetadataResponse])
-
-    when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
-    when(replicaManager.maybeUpdateMetadataCache(
-      ArgumentMatchers.eq(request.context.correlationId),
-      any()
-    )).thenReturn(
-      Seq()
-    )
-    kafkaApis = createKafkaApis()
-    kafkaApis.handleUpdateMetadataRequest(request, 
RequestLocal.withThreadConfinedCaching)
-    verify(requestChannel).sendResponse(
-      ArgumentMatchers.eq(request),
-      capturedResponse.capture(),
-      ArgumentMatchers.eq(None)
-    )
-    val updateMetadataResponse = capturedResponse.getValue
-    assertEquals(expectedError, updateMetadataResponse.error())
-    if (expectedError == Errors.NONE) {
-      verify(replicaManager).maybeUpdateMetadataCache(
-        ArgumentMatchers.eq(request.context.correlationId),
-        any()
-      )
-    }
-  }
-
-  @Test
-  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
-  }
-
-  @Test
-  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
-  }
-
-  @Test
-  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
-  }
-
-  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
-    val controllerId = 2
-    val controllerEpoch = 6
-    val capturedResponse: ArgumentCaptor[LeaderAndIsrResponse] = 
ArgumentCaptor.forClass(classOf[LeaderAndIsrResponse])
-    val partitionStates = Seq(
-      new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
-        .setTopicName("topicW")
-        .setPartitionIndex(1)
-        .setControllerEpoch(1)
-        .setLeader(0)
-        .setLeaderEpoch(1)
-        .setIsr(asList(0, 1))
-        .setPartitionEpoch(2)
-        .setReplicas(asList(0, 1, 2))
-        .setIsNew(false)
-    ).asJava
-    val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
-      ApiKeys.LEADER_AND_ISR.latestVersion,
-      controllerId,
-      controllerEpoch,
-      brokerEpochInRequest,
-      partitionStates,
-      Collections.singletonMap("topicW", Uuid.randomUuid()),
-      asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
-    ).build()
-    val request = buildRequest(leaderAndIsrRequest)
-    val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-      .setErrorCode(Errors.NONE.code)
-      .setPartitionErrors(asList()), leaderAndIsrRequest.version())
-
-    when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
-    when(replicaManager.becomeLeaderOrFollower(
-      ArgumentMatchers.eq(request.context.correlationId),
-      any(),
-      any()
-    )).thenReturn(
-      response
-    )
-    kafkaApis = createKafkaApis()
-    kafkaApis.handleLeaderAndIsrRequest(request)
-    verify(requestChannel).sendResponse(
-      ArgumentMatchers.eq(request),
-      capturedResponse.capture(),
-      ArgumentMatchers.eq(None)
-    )
-    val leaderAndIsrResponse = capturedResponse.getValue
-    assertEquals(expectedError, leaderAndIsrResponse.error())
-  }
-
-  @Test
-  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
-  }
-
-  @Test
-  def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
-  }
-
-  @Test
-  def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
-    val currentBrokerEpoch = 1239875L
-    testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
-  }
-
-  def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
-    val controllerId = 0
-    val controllerEpoch = 5
-    val capturedResponse: ArgumentCaptor[StopReplicaResponse] = 
ArgumentCaptor.forClass(classOf[StopReplicaResponse])
-    val fooPartition = new TopicPartition("foo", 0)
-    val topicStates = Seq(
-      new StopReplicaTopicState()
-        .setTopicName(fooPartition.topic)
-        .setPartitionStates(Seq(new StopReplicaPartitionState()
-          .setPartitionIndex(fooPartition.partition)
-          .setLeaderEpoch(1)
-          .setDeletePartition(false)).asJava)
-    ).asJava
-    val stopReplicaRequest = new StopReplicaRequest.Builder(
-      ApiKeys.STOP_REPLICA.latestVersion,
-      controllerId,
-      controllerEpoch,
-      brokerEpochInRequest,
-      false,
-      topicStates
-    ).build()
-    val request = buildRequest(stopReplicaRequest)
-
-    when(controller.brokerEpoch).thenReturn(currentBrokerEpoch)
-    when(replicaManager.stopReplicas(
-      ArgumentMatchers.eq(request.context.correlationId),
-      ArgumentMatchers.eq(controllerId),
-      ArgumentMatchers.eq(controllerEpoch),
-      ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
-    )).thenReturn(
-      (mutable.Map(
-        fooPartition -> Errors.NONE
-      ), Errors.NONE)
-    )
-    kafkaApis = createKafkaApis()
-    kafkaApis.handleStopReplicaRequest(request)
-    verify(requestChannel).sendResponse(
-      ArgumentMatchers.eq(request),
-      capturedResponse.capture(),
-      ArgumentMatchers.eq(None)
-    )
-    val stopReplicaResponse = capturedResponse.getValue
-    assertEquals(expectedError, stopReplicaResponse.error())
-    if (expectedError != Errors.STALE_BROKER_EPOCH) {
-      verify(replicaManager).stopReplicas(
-        ArgumentMatchers.eq(request.context.correlationId),
-        ArgumentMatchers.eq(controllerId),
-        ArgumentMatchers.eq(controllerEpoch),
-        ArgumentMatchers.eq(stopReplicaRequest.partitionStates().asScala)
-      )
-    }
-  }
-
   @ParameterizedTest
   @ApiKeyVersionsSource(apiKey = ApiKeys.LIST_GROUPS)
   def testListGroupsRequest(version: Short): Unit = {
@@ -10228,7 +9948,7 @@ class KafkaApisTest extends Logging {
                                          numPartitions: Int,
                                          brokerEpoch: Long,
                                          numBrokers: Int,
-                                         topicId: Uuid = Uuid.ZERO_UUID): 
UpdateMetadataRequest = {
+                                         topicId: Uuid): UpdateMetadataRequest 
= {
     val replicas = List(0.asInstanceOf[Integer]).asJava
 
     def createPartitionState(partition: Int) = new 
UpdateMetadataPartitionState()
@@ -10796,27 +10516,6 @@ class KafkaApisTest extends Logging {
     assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, 
e.getMessage)
   }
 
-  @Test
-  def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldNeverHandleErrorMessage(kafkaApis.handleLeaderAndIsrRequest)
-  }
-
-  @Test
-  def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    verifyShouldNeverHandleErrorMessage(kafkaApis.handleStopReplicaRequest)
-  }
-
-  @Test
-  def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
-    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
-    kafkaApis = createKafkaApis(raftSupport = true)
-    
verifyShouldNeverHandleErrorMessage(kafkaApis.handleUpdateMetadataRequest(_, 
RequestLocal.withThreadConfinedCaching))
-  }
-
   @Test
   def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)

Reply via email to