dajac commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892157826
##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,210 @@ class KafkaController(val config: KafkaConfig,
}
}
- def alterPartitions(alterPartitionRequest: AlterPartitionRequestData,
callback: AlterPartitionResponseData => Unit): Unit = {
- val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
- alterPartitionRequest.topics.forEach { topicReq =>
- topicReq.partitions.forEach { partitionReq =>
- partitionsToAlter.put(
- new TopicPartition(topicReq.name, partitionReq.partitionIndex),
- LeaderAndIsr(
- alterPartitionRequest.brokerId,
- partitionReq.leaderEpoch,
- partitionReq.newIsr().asScala.toList.map(_.toInt),
- LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
- partitionReq.partitionEpoch
- )
- )
- }
- }
-
- def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
- val resp = new AlterPartitionResponseData()
- results match {
- case Right(error) =>
- resp.setErrorCode(error.code)
- case Left(partitionResults) =>
- resp.setTopics(new util.ArrayList())
- partitionResults
- .groupBy { case (tp, _) => tp.topic } // Group by topic
- .foreach { case (topic, partitions) =>
- // Add each topic part to the response
- val topicResp = new AlterPartitionResponseData.TopicData()
- .setName(topic)
- .setPartitions(new util.ArrayList())
- resp.topics.add(topicResp)
- partitions.foreach { case (tp, errorOrIsr) =>
- // Add each partition part to the response (new ISR or error)
- errorOrIsr match {
- case Left(error) => topicResp.partitions.add(
- new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setErrorCode(error.code))
- case Right(leaderAndIsr) =>
- /* Setting the LeaderRecoveryState field is always safe
because it will always be the same
- * as the value set in the request. For version 0, that is
always the default RECOVERED
- * which is ignored when serializing to version 0. For any
other version, the
- * LeaderRecoveryState field is supported.
- */
- topicResp.partitions.add(
- new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setLeaderId(leaderAndIsr.leader)
- .setLeaderEpoch(leaderAndIsr.leaderEpoch)
- .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
- .setPartitionEpoch(leaderAndIsr.partitionEpoch)
- )
- }
- }
- }
- }
- callback.apply(resp)
- }
-
- eventManager.put(
- AlterPartitionReceived(alterPartitionRequest.brokerId,
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
- )
+ def alterPartitions(
+ alterPartitionRequest: AlterPartitionRequestData,
+ alterPartitionRequestVersion: Short,
+ callback: AlterPartitionResponseData => Unit
+ ): Unit = {
+ eventManager.put(AlterPartitionReceived(
+ alterPartitionRequest,
+ alterPartitionRequestVersion,
+ callback
+ ))
}
private def processAlterPartition(
- brokerId: Int,
- brokerEpoch: Long,
- partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
- callback: AlterPartitionCallback
+ alterPartitionRequest: AlterPartitionRequestData,
+ alterPartitionRequestVersion: Short,
+ callback: AlterPartitionResponseData => Unit
): Unit = {
+ val useTopicsIds = alterPartitionRequestVersion > 1
// Handle a few short-circuits
if (!isActive) {
- callback.apply(Right(Errors.NOT_CONTROLLER))
+ callback(new
AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
return
}
+ val brokerId = alterPartitionRequest.brokerId
+ val brokerEpoch = alterPartitionRequest.brokerEpoch
val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
if (brokerEpochOpt.isEmpty) {
info(s"Ignoring AlterPartition due to unknown broker $brokerId")
- callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ callback(new
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
return
}
if (!brokerEpochOpt.contains(brokerEpoch)) {
info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch
and local broker epoch $brokerEpochOpt for broker $brokerId")
- callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ callback(new
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
return
}
- val response = try {
- val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors,
LeaderAndIsr]]()
+ val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+ val alterPartitionResponse = new AlterPartitionResponseData()
+ alterPartitionRequest.topics.forEach { topicReq =>
+ val topicNameOpt = if (useTopicsIds) {
+ controllerContext.topicName(topicReq.topicId)
+ } else {
+ Some(topicReq.topicName)
+ }
+
+ topicNameOpt match {
+ case None =>
+ val topicResponse = new AlterPartitionResponseData.TopicData()
+ .setTopicId(topicReq.topicId)
+ alterPartitionResponse.topics.add(topicResponse)
+ topicReq.partitions.forEach { partitionReq =>
+ topicResponse.partitions.add(new
AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(partitionReq.partitionIndex)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
Review Comment:
Good point. I forgot about the `UNKNOWN_TOPIC_ID` error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]