dajac commented on code in PR #12329:
URL: https://github.com/apache/kafka/pull/12329#discussion_r905139398


##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -178,6 +178,46 @@ class AlterPartitionManagerTest {
     assertEquals(request.data().topics().get(0).partitions().size(), 10)
   }
 
+  @Test
+  def testRetryOnPartitionLevelError(): Unit = {

Review Comment:
   nit: How about `testSubmitFromCallback`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1936,6 +1937,108 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  private def createClientResponseWithAlterPartitionResponse(
+    topicPartition: TopicPartition,
+    partitionErrorCode: Short,
+    isr: List[Int] = List.empty,
+    leaderEpoch: Int = 0,
+    partitionEpoch: Int = 0
+  ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setTopicName(topicPartition.topic)
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition)
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics.add(topicResponse)
+
+    val alterPartitionResponse = new 
AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 
1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterPartitionManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // Fail the first alter partition request with a retryable error to 
trigger a retry from the partition callback
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, 
Errors.UNKNOWN_SERVER_ERROR.code())

Review Comment:
   nit: We can remove the `()` here and at L2008.



##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -178,6 +178,46 @@ class AlterPartitionManagerTest {
     assertEquals(request.data().topics().get(0).partitions().size(), 10)
   }
 
+  @Test
+  def testRetryOnPartitionLevelError(): Unit = {
+    // prepare a partition level retriable error response
+    val alterPartitionRespWithPartitionError = partitionResponse(tp0, 
Errors.UNKNOWN_SERVER_ERROR)
+    val errorResponse = 
makeClientResponse(alterPartitionRespWithPartitionError, 
ApiKeys.ALTER_PARTITION.latestVersion)
+
+    val leaderAndIsr = new LeaderAndIsr(1, 1, List(1,2,3), 
LeaderRecoveryState.RECOVERED, 10)
+    val callbackCapture = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+
+    val scheduler = new MockScheduler(time)
+    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => IBP_3_2_IV0)
+    alterPartitionManager.start()
+    val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0)
+
+    future.whenComplete { (leaderAndIsrResp, e) =>
+      // When entering callback, the `unsentIsrUpdates` element should be 
removed for possible retry
+      
assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0.topicPartition))
+      assertNull(leaderAndIsrResp)
+      // When receiving retriable error, re-submit the request
+      assertNotNull(e)
+      assertEquals(classOf[UnknownServerException], e.getClass)

Review Comment:
   I ran this test without the fix and I noticed that those assertions, when 
false, goes nowhere. The only assertion which fails is 
`verify(brokerToController).sendRequest(any(), callbackCapture.capture())` at 
L214. There are a big misleading if there are not actionable. Could you double 
check?
   
   If that is true, an alternative would be to do the following:
   
   ```
       val finalFuture = new CompletableFuture[LeaderAndIsr]()
       future.whenComplete { (_, e) =>
         if (e != null) {
           alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { 
(result, e) =>
             if (e != null) {
               finalFuture.completeExceptionally(e)
             } else {
               finalFuture.complete(result)
             }
           }
         } else {
           finalFuture.completeExceptionally(new AssertionError("Expected the 
future to be failed"))
         }
       }
   ```
   
   and later, we can verify that `finalFuture` contains what we expect: 
`assertEquals(..., finalFuture.get(100, TimeUnit.MILLISECONDS))`. This also 
ensures that we get the expected result from the last call to submit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to