dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324437789


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
         val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
           origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
         debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-        
-        val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-          val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception(message))
-          )
-        }
 
-        val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception())
-          )
+        def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+                                useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+          appendResult.map { case (topicPartition, result) =>
+            topicPartition -> ProducePartitionStatus(
+              result.info.lastOffset + 1, // required offset
+              new PartitionResponse(
+                result.error,
+                result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+                result.info.lastOffset,
+                result.info.logAppendTime,
+                result.info.logStartOffset,
+                result.info.recordErrors,
+                if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   It is interesting to note that `result.info.errorMessage` was only set in a 
few cases which means that the error field in the response was not always 
populated. With this change, it will always be so I wonder if the previous 
handling was done on purpose, perhaps to avoid returning the default error 
message. Thoughts?
   
   Long term, we should remove `result.info.errorMessage` and rely on the 
message provided in the exception. We could compare with the default to avoid 
sending the default message if we want to as well.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
         val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
           origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
         debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-        
-        val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-          val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception(message))
-          )
-        }
 
-        val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception())
-          )
+        def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+                                useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+          appendResult.map { case (topicPartition, result) =>
+            topicPartition -> ProducePartitionStatus(
+              result.info.lastOffset + 1, // required offset
+              new PartitionResponse(
+                result.error,
+                result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+                result.info.lastOffset,
+                result.info.logAppendTime,
+                result.info.logStartOffset,
+                result.info.recordErrors,
+                if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
+              )
+            ) // response status
+          }
         }
         
-        val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+        val unverifiedResults = unverifiedEntries.map {
+          case (topicPartition, error) =>
+            val finalError =
+              error match {
+                case Errors.CONCURRENT_TRANSACTIONS |
+                     Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                     Errors.COORDINATOR_NOT_AVAILABLE |
+                     Errors.NOT_COORDINATOR => Errors.NOT_ENOUGH_REPLICAS
+                case _ => error
+            }
+            val message =
+              error match {
+                case Errors.INVALID_TXN_STATE => "Partition was not added to 
the transaction"
+                case Errors.CONCURRENT_TRANSACTIONS |
+                     Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                     Errors.COORDINATOR_NOT_AVAILABLE |
+                     Errors.NOT_COORDINATOR => s"Unable to verify the 
partition has been added to the transaction. Underlying error:${error.toString}"
+                case _ => error.message()
+              }

Review Comment:
   Would it make sense to have only one match?
   
   ```
   val exception = error match {
     case Errors.INVALID_TXN_STATE => error.exception("new message")
     case ....
     case _ => error.exception()
   }
   ```



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
       assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
       assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+      // Test we convert the error correctly when trying to append and 
coordinator is not available
+      val tp0 = new TopicPartition(topic, 0)
+      val producerId = 24L
+      val producerEpoch = 0.toShort
+      val sequence = 0
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+      val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedError, result.assertFired.errorMessage)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 0
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // Start verification and return the coordinator related errors.
+      var invocations = 1
+      def verifyError(error: Errors): Unit = {
+        val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+        val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+        val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+        verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+        // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+        val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+        callback(Map(tp0 -> error).toMap)
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+        assertEquals(expectedMessage, result.assertFired.errorMessage)
+        invocations = invocations + 1
+      }
+
+      Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   Should we also add `COORDINATOR_NOT_AVAILABLE` for completeness? 



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
       assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
       assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+      // Test we convert the error correctly when trying to append and 
coordinator is not available
+      val tp0 = new TopicPartition(topic, 0)
+      val producerId = 24L
+      val producerEpoch = 0.toShort
+      val sequence = 0
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+      val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedError, result.assertFired.errorMessage)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 0
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // Start verification and return the coordinator related errors.
+      var invocations = 1
+      def verifyError(error: Errors): Unit = {
+        val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+        val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+        val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+        verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+        // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+        val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+        callback(Map(tp0 -> error).toMap)
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+        assertEquals(expectedMessage, result.assertFired.errorMessage)
+        invocations = invocations + 1

Review Comment:
   It took me a while to understand why this `invocations` is doing here. Did 
you consider using a parameterized test instead? 
   
   ```
     @ParameterizedTest
     @EnumSource(value=classOf[Errors], names=Array("NONE", 
"CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
   ```
   



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -832,28 +856,30 @@ class ReplicaManager(val config: KafkaConfig,
         val (error, node) = 
getTransactionCoordinator(transactionStatePartition.get)
 
         if (error != Errors.NONE) {
-          throw error.exception() // Can throw coordinator not available -- 
which is retriable
-        }
+          
appendEntries(entriesPerPartition)(notYetVerifiedEntriesPerPartition.map {
+            case (tp, _) => (tp, Errors.COORDINATOR_NOT_AVAILABLE)

Review Comment:
   Wouldn't it be better to use `error` here? I understand that 
getTransactionCoordinator only returns COORDINATOR_NOT_AVAILABLE at the moment 
but this may change in the future and we would lose it here.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
         val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
           origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
         debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-        
-        val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-          val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception(message))
-          )
-        }
 
-        val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception())
-          )
+        def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+                                useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+          appendResult.map { case (topicPartition, result) =>
+            topicPartition -> ProducePartitionStatus(
+              result.info.lastOffset + 1, // required offset
+              new PartitionResponse(
+                result.error,
+                result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+                result.info.lastOffset,
+                result.info.logAppendTime,
+                result.info.logStartOffset,
+                result.info.recordErrors,
+                if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
+              )
+            ) // response status
+          }
         }
         
-        val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+        val unverifiedResults = unverifiedEntries.map {
+          case (topicPartition, error) =>
+            val finalError =
+              error match {
+                case Errors.CONCURRENT_TRANSACTIONS |
+                     Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                     Errors.COORDINATOR_NOT_AVAILABLE |
+                     Errors.NOT_COORDINATOR => Errors.NOT_ENOUGH_REPLICAS
+                case _ => error
+            }
+            val message =
+              error match {
+                case Errors.INVALID_TXN_STATE => "Partition was not added to 
the transaction"
+                case Errors.CONCURRENT_TRANSACTIONS |
+                     Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                     Errors.COORDINATOR_NOT_AVAILABLE |
+                     Errors.NOT_COORDINATOR => s"Unable to verify the 
partition has been added to the transaction. Underlying error:${error.toString}"

Review Comment:
   nit: I would add a space after the `:`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to