hachikuji commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r896305171


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -846,6 +862,135 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
     }
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionSource"))
+  def testAlterPartition(metadataVersion: MetadataVersion, 
alterPartitionVersion: Short): Unit = {
+    if (!metadataVersion.isTopicIdsSupported && alterPartitionVersion > 1) {
+      // This combination is not valid. We cannot use alter partition version 
> 1
+      // if the broker is on an IBP < 2.8 because topics don't have id in this 
case.
+      return
+    }
+
+    servers = makeServers(1, interBrokerProtocolVersion = 
Some(metadataVersion))
+
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = 
zkClient.getTopicPartitionStates(Seq(tp))
+    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds.getOrElse(tp.topic, 
Uuid.ZERO_UUID)
+    val brokerId = controllerId
+    val brokerEpoch = 
controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // The caller of the AlterPartition API can only use topics ids iif 1) the 
controller is

Review Comment:
   nit: `iff`?



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,197 +2223,226 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
+  }
 
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
+  private def processAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    val partitionResponses = try {
+      doProcessAlterPartition(
+        alterPartitionRequest,
+        alterPartitionRequestVersion,
+        callback
+      )
+    } catch {
+      case e: Throwable =>
+        error(s"Error when processing AlterPartition: $alterPartitionRequest", 
e)
+        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
+        mutable.Map.empty
     }
 
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
+    // After we have returned the result of the `AlterPartition` request, we 
should check whether
+    // there are any reassignments which can be completed by a successful ISR 
expansion.
+    partitionResponses.forKeyValue { (topicPartition, partitionResponse) =>
+      if 
(controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+        val isSuccessfulUpdate = partitionResponse.isRight
+        if (isSuccessfulUpdate) {
+          maybeCompleteReassignment(topicPartition)
+        }
       }
-      callback.apply(resp)
     }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
   }
 
-  private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
-  ): Unit = {
+  private def doProcessAlterPartition(

Review Comment:
   nit: how about `tryProcessAlterPartition` since we're changing this anyway?



##########
metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextTest.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+public class ControllerRequestContextTest {

Review Comment:
   nit: perhaps this is more of a Util?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +237,75 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  /**
+   * Builds an AlterPartition request.
+   *
+   * While building the request, we don't know which version of the 
AlterPartition API is
+   * supported by the controller. The final decision is taken when the 
AlterPartitionRequest
+   * is built in the network client based on the advertised api versions of 
the controller.
+   *
+   * We could use version 2 or above if all the pending changes have an topic 
id defined;
+   * otherwise we must use version 1 or below.
+   *
+   * @return A tuple containing the AlterPartitionRequest.Builder and a 
mapping from
+   *         topic id to topic name. This mapping is used in the response 
handling.
+   */
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+    brokerEpoch: Long
+  ): (AlterPartitionRequest.Builder, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    // We build this mapping in order to map topic id back to their name when 
we
+    // receive the response. We cannot rely on the metadata cache for this 
because
+    // the metadata cache is updated after the partition state so it might not 
know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+    // We can use topic ids only if all the pending changed have one defined 
and
+    // we use IBP 2.8 or above.
+    var canUseTopicIds = metadataVersion.isTopicIdsSupported
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpoch)
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { 
case (topic, items) =>
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
+
+      // Both the topic name and the topic id are set here because at this 
stage
+      // we don't know which version of the request will be used.
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)
       message.topics.add(topicData)
+
       items.foreach { item =>
         val partitionData = new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(item.topicPartition.partition)
+          .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
-        if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+        if (metadataVersion.isLeaderRecoverySupported) {
           
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
         }
 
         topicData.partitions.add(partitionData)
       }
     }
-    message
+
+    // If we cannot use topic ids, the builder will ensure that version 1 is 
used

Review Comment:
   nit: maybe rephrase
   > If we cannot use topic ids, the builder will ensure that no version higher 
than 1 is used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to