jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324792282


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
       assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
       assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+      // Test we convert the error correctly when trying to append and 
coordinator is not available
+      val tp0 = new TopicPartition(topic, 0)
+      val producerId = 24L
+      val producerEpoch = 0.toShort
+      val sequence = 0
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+      val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedError, result.assertFired.errorMessage)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 0
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // Start verification and return the coordinator related errors.
+      var invocations = 1
+      def verifyError(error: Errors): Unit = {
+        val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+        val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+        val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+        verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+        // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+        val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+        callback(Map(tp0 -> error).toMap)
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+        assertEquals(expectedMessage, result.assertFired.errorMessage)
+        invocations = invocations + 1
+      }
+
+      Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to