dajac commented on code in PR #12329:
URL: https://github.com/apache/kafka/pull/12329#discussion_r908155998


##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -554,6 +612,10 @@ class AlterPartitionManagerTest {
           .setPartitions(Collections.singletonList(
             new AlterPartitionResponseData.PartitionData()
               .setPartitionIndex(tp.partition())

Review Comment:
   nit: not related to your PR but we could remove the `()` after `partition`.



##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -178,6 +179,56 @@ class AlterPartitionManagerTest {
     assertEquals(request.data().topics().get(0).partitions().size(), 10)
   }
 
+  @Test
+  def testSubmitFromCallback(): Unit = {
+    // prepare a partition level retriable error response
+    val alterPartitionRespWithPartitionError = partitionResponse(tp0, 
Errors.UNKNOWN_SERVER_ERROR)
+    val errorResponse = 
makeClientResponse(alterPartitionRespWithPartitionError, 
ApiKeys.ALTER_PARTITION.latestVersion)
+
+    val leaderId = 1
+    val leaderEpoch = 1
+    val partitionEpoch = 10
+    val isr = List(1,2,3)
+    val leaderAndIsr = new LeaderAndIsr(leaderId,leaderEpoch, isr, 
LeaderRecoveryState.RECOVERED, partitionEpoch)
+    val callbackCapture = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+
+    val scheduler = new MockScheduler(time)
+    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => IBP_3_2_IV0)
+    alterPartitionManager.start()
+    val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0)
+    val finalFuture = new CompletableFuture[LeaderAndIsr]()
+    future.whenComplete { (_, e) =>
+      if (e != null) {
+        // Retry when error.
+        alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { 
(result, e) =>
+          if (e != null) {
+            finalFuture.completeExceptionally(e)
+          } else {
+            finalFuture.complete(result)
+          }
+        }
+      } else {
+        finalFuture.completeExceptionally(new AssertionError("Expected the 
future to be failed"))
+      }
+    }
+
+    verify(brokerToController).start()
+    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+    reset(brokerToController)
+    callbackCapture.getValue.onComplete(errorResponse)
+
+    // complete the retry request
+    val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE, 
partitionEpoch, leaderId, leaderEpoch, isr)
+    val retryResponse = makeClientResponse(retryAlterPartitionResponse, 
ApiKeys.ALTER_PARTITION.latestVersion)
+
+    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+    callbackCapture.getValue.onComplete(retryResponse)
+
+    assertEquals(leaderAndIsr, finalFuture.get(200, TimeUnit.MILLISECONDS))
+    // No more items in unsentIsrUpdates

Review Comment:
   super nit: Comments sometime start with a capital letter and sometime not. I 
don't have any preferences but I do care about consistency though. Could we use 
one way or the other?



##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -545,7 +596,14 @@ class AlterPartitionManagerTest {
     assertFutureThrows(future2, classOf[InvalidUpdateVersionException])
   }
 
-  private def partitionResponse(tp: TopicIdPartition, error: Errors): 
AlterPartitionResponse = {
+  private def partitionResponse(
+    tp: TopicIdPartition,
+    error: Errors,

Review Comment:
   nit: Should we use `Errors.NONE` as default value?



##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -178,6 +179,56 @@ class AlterPartitionManagerTest {
     assertEquals(request.data().topics().get(0).partitions().size(), 10)
   }
 
+  @Test
+  def testSubmitFromCallback(): Unit = {
+    // prepare a partition level retriable error response
+    val alterPartitionRespWithPartitionError = partitionResponse(tp0, 
Errors.UNKNOWN_SERVER_ERROR)
+    val errorResponse = 
makeClientResponse(alterPartitionRespWithPartitionError, 
ApiKeys.ALTER_PARTITION.latestVersion)
+
+    val leaderId = 1
+    val leaderEpoch = 1
+    val partitionEpoch = 10
+    val isr = List(1,2,3)
+    val leaderAndIsr = new LeaderAndIsr(leaderId,leaderEpoch, isr, 
LeaderRecoveryState.RECOVERED, partitionEpoch)

Review Comment:
   nit: A space is missing before `leaderEpoch`.



##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -178,6 +179,56 @@ class AlterPartitionManagerTest {
     assertEquals(request.data().topics().get(0).partitions().size(), 10)
   }
 
+  @Test
+  def testSubmitFromCallback(): Unit = {
+    // prepare a partition level retriable error response
+    val alterPartitionRespWithPartitionError = partitionResponse(tp0, 
Errors.UNKNOWN_SERVER_ERROR)
+    val errorResponse = 
makeClientResponse(alterPartitionRespWithPartitionError, 
ApiKeys.ALTER_PARTITION.latestVersion)
+
+    val leaderId = 1
+    val leaderEpoch = 1
+    val partitionEpoch = 10
+    val isr = List(1,2,3)

Review Comment:
   nit: We usually add a space after the coma.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to