dajac commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r893250615


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,228 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
+  }
 
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
+  private def processAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    try {
+      doProcessAlterPartition(
+        alterPartitionRequest,
+        alterPartitionRequestVersion,
+        callback
+      )
+    } catch {
+      case e: Throwable =>
+        error(s"Error when processing AlterPartition: $alterPartitionRequest", 
e)
+        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
     }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
   }
 
-  private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
+  private def doProcessAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
   ): Unit = {
+    val useTopicsIds = alterPartitionRequestVersion > 1
 
     // Handle a few short-circuits
     if (!isActive) {
-      callback.apply(Right(Errors.NOT_CONTROLLER))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
       return
     }
 
+    val brokerId = alterPartitionRequest.brokerId
+    val brokerEpoch = alterPartitionRequest.brokerEpoch
     val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
     if (brokerEpochOpt.isEmpty) {
       info(s"Ignoring AlterPartition due to unknown broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
     if (!brokerEpochOpt.contains(brokerEpoch)) {
       info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch 
and local broker epoch $brokerEpochOpt for broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
-    val response = try {
-      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+    val alterPartitionResponse = new AlterPartitionResponseData()
 
+    alterPartitionRequest.topics.forEach { topicReq =>
+      val topicNameOpt = if (useTopicsIds) {
+        controllerContext.topicName(topicReq.topicId)
+      } else {
+        Some(topicReq.topicName)
+      }
+
+      topicNameOpt match {
+        case None =>
+          val topicResponse = new AlterPartitionResponseData.TopicData()
+            .setTopicId(topicReq.topicId)
+          alterPartitionResponse.topics.add(topicResponse)
+          topicReq.partitions.forEach { partitionReq =>
+            topicResponse.partitions.add(new 
AlterPartitionResponseData.PartitionData()
+              .setPartitionIndex(partitionReq.partitionIndex)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+          }
+
+        case Some(topicName) =>
+          topicReq.partitions.forEach { partitionReq =>
+            partitionsToAlter.put(
+              new TopicPartition(topicName, partitionReq.partitionIndex),
+              LeaderAndIsr(
+                alterPartitionRequest.brokerId,
+                partitionReq.leaderEpoch,
+                partitionReq.newIsr.asScala.toList.map(_.toInt),
+                LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
+                partitionReq.partitionEpoch
+              )
+            )
+          }
+      }
+    }
+
+    val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    try {
       // Determine which partitions we will accept the new ISR for
-      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
partitionsToAlter.flatMap {
-        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-          controllerContext.partitionLeadershipInfo(tp) match {
-            case Some(leaderIsrAndControllerEpoch) =>
-              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
-              if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
-                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
-                None
-              } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
-                partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
-                None
-              } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
-                // If a partition is already in the desired state, just return 
it
-                partitionResponses(tp) = Right(currentLeaderAndIsr)
-                None
-              } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because leader is recovering and ISR is greater than 1: " +
-                  s"$newLeaderAndIsr"
-                )
-                None
-              } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
-                newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
-
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because the leader recovery state cannot change from " +
-                  s"RECOVERED to RECOVERING: $newLeaderAndIsr"
-                )
-                None
-              } else {
-                Some(tp -> newLeaderAndIsr)
-              }
-            case None =>
-              partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      val adjustedIsrs = partitionsToAlter.flatMap { case (tp, 
newLeaderAndIsr) =>
+        controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
+              partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
               None
-          }
+            } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
+              partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+              None
+            } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+              // If a partition is already in the desired state, just return it
+              partitionResponses(tp) = Right(currentLeaderAndIsr)
+              None
+            } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+              info(
+                s"Rejecting AlterPartition from node $brokerId for $tp because 
leader is recovering and ISR is greater than 1: " +
+                s"$newLeaderAndIsr"
+              )
+              None
+            } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
+              newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
+
+              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+              info(
+                s"Rejecting AlterPartition from node $brokerId for $tp because 
the leader recovery state cannot change from " +
+                s"RECOVERED to RECOVERING: $newLeaderAndIsr"
+              )
+              None
+            } else {
+              Some(tp -> newLeaderAndIsr)
+            }
+
+          case None =>
+            partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            None
+        }
       }
 
       // Do the updates in ZK
       debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
       val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
         adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
 
-      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
-        case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
-          isrOrError match {
-            case Right(updatedIsr) =>
-              debug(s"ISR for partition $partition updated to 
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to 
[${updatedIsr.partitionEpoch}]")
-              partitionResponses(partition) = Right(updatedIsr)
-              Some(partition -> updatedIsr)
-            case Left(e) =>
-              error(s"Failed to update ISR for partition $partition", e)
-              partitionResponses(partition) = Left(Errors.forException(e))
-              None
-          }
+      val successfulUpdates = finishedUpdates.flatMap { case (partition, 
isrOrError) =>
+        isrOrError match {
+          case Right(updatedIsr) =>
+            debug(s"ISR for partition $partition updated to 
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to 
[${updatedIsr.partitionEpoch}]")
+            partitionResponses(partition) = Right(updatedIsr)
+            Some(partition -> updatedIsr)
+          case Left(e) =>
+            error(s"Failed to update ISR for partition $partition", e)
+            partitionResponses(partition) = Left(Errors.forException(e))
+            None
+        }
       }
 
       badVersionUpdates.foreach { partition =>
         info(s"Failed to update ISR to ${adjustedIsrs(partition)} for 
partition $partition, bad ZK version.")
         partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
       }
 
-      def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
-        val liveBrokers: Seq[Int] = 
controllerContext.liveOrShuttingDownBrokerIds.toSeq
-        sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
-      }
-
       // Update our cache and send out metadata updates
       updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
-      processUpdateNotifications(partitionsToAlter.keys.toSeq)
+      sendUpdateMetadataRequest(
+        controllerContext.liveOrShuttingDownBrokerIds.toSeq,
+        partitionsToAlter.keySet
+      )
+
+      partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, 
partitionResponses) =>
+        // Add each topic part to the response
+        val topicResponse = if (useTopicsIds) {
+          new AlterPartitionResponseData.TopicData()
+            .setTopicId(controllerContext.topicIds.getOrElse(topicName, 
Uuid.ZERO_UUID))
+        } else {
+          new AlterPartitionResponseData.TopicData()
+            .setTopicName(topicName)
+        }
+        alterPartitionResponse.topics.add(topicResponse)
+
+        partitionResponses.forKeyValue { (tp, errorOrIsr) =>
+          // Add each partition part to the response (new ISR or error)
+          errorOrIsr match {
+            case Left(error) =>
+              topicResponse.partitions.add(
+                new AlterPartitionResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  .setErrorCode(error.code))
+            case Right(leaderAndIsr) =>
+              /* Setting the LeaderRecoveryState field is always safe because 
it will always be the same
+               * as the value set in the request. For version 0, that is 
always the default RECOVERED
+               * which is ignored when serializing to version 0. For any other 
version, the
+               * LeaderRecoveryState field is supported.
+               */
+              topicResponse.partitions.add(
+                new AlterPartitionResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  .setLeaderId(leaderAndIsr.leader)
+                  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                  
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
+                  .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+              )
+          }
+        }
+      }
 
-      Left(partitionResponses)
+      callback(alterPartitionResponse)
     } catch {
       case e: Throwable =>
         error(s"Error when processing AlterPartition for partitions: 
${partitionsToAlter.keys.toSeq}", e)
-        Right(Errors.UNKNOWN_SERVER_ERROR)
+        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))

Review Comment:
   Yeah.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to