dajac commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324437789
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - - val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception(message)) - ) - } - val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception()) - ) + def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], + useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( + result.error, + result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), + result.info.lastOffset, + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage Review Comment: It is interesting to note that `result.info.errorMessage` was only set in a few cases which means that the error field in the response was not always populated. With this change, it will always be so I wonder if the previous handling was done on purpose, perhaps to avoid returning the default error message. Thoughts? Long term, we should remove `result.info.errorMessage` and rely on the message provided in the exception. We could compare with the default to avoid sending the default message if we want to as well. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - - val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception(message)) - ) - } - val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception()) - ) + def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], + useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( + result.error, + result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), + result.info.lastOffset, + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage + ) + ) // response status + } } - val allResults = localProduceResults ++ unverifiedResults ++ errorResults + val unverifiedResults = unverifiedEntries.map { + case (topicPartition, error) => + val finalError = + error match { + case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => Errors.NOT_ENOUGH_REPLICAS + case _ => error + } + val message = + error match { + case Errors.INVALID_TXN_STATE => "Partition was not added to the transaction" + case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" + case _ => error.message() + } Review Comment: Would it make sense to have only one match? ``` val exception = error match { case Errors.INVALID_TXN_STATE => error.exception("new message") case .... case _ => error.exception() } ``` ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testVerificationErrorConversions(): Unit = { + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + val node = new Node(0, "host1", 0) + val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + + val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) + try { + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava + )) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { + val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) + val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) + verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + + // Confirm we did not write to the log and instead returned the converted error with the correct error message. + val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() + callback(Map(tp0 -> error).toMap) + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedMessage, result.assertFired.errorMessage) + invocations = invocations + 1 + } + + Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_)) Review Comment: Should we also add `COORDINATOR_NOT_AVAILABLE` for completeness? ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testVerificationErrorConversions(): Unit = { + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + val node = new Node(0, "host1", 0) + val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + + val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) + try { + replicaManager.becomeLeaderOrFollower(1, + makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), + (_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava + )) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { + val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) + val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) + verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + + // Confirm we did not write to the log and instead returned the converted error with the correct error message. + val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() + callback(Map(tp0 -> error).toMap) + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedMessage, result.assertFired.errorMessage) + invocations = invocations + 1 Review Comment: It took me a while to understand why this `invocations` is doing here. Did you consider using a parameterized test instead? ``` @ParameterizedTest @EnumSource(value=classOf[Errors], names=Array("NONE", "CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER")) ``` ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -832,28 +856,30 @@ class ReplicaManager(val config: KafkaConfig, val (error, node) = getTransactionCoordinator(transactionStatePartition.get) if (error != Errors.NONE) { - throw error.exception() // Can throw coordinator not available -- which is retriable - } + appendEntries(entriesPerPartition)(notYetVerifiedEntriesPerPartition.map { + case (tp, _) => (tp, Errors.COORDINATOR_NOT_AVAILABLE) Review Comment: Wouldn't it be better to use `error` here? I understand that getTransactionCoordinator only returns COORDINATOR_NOT_AVAILABLE at the moment but this may change in the future and we would lose it here. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - - val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception(message)) - ) - } - val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception()) - ) + def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], + useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( + result.error, + result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), + result.info.lastOffset, + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage + ) + ) // response status + } } - val allResults = localProduceResults ++ unverifiedResults ++ errorResults + val unverifiedResults = unverifiedEntries.map { + case (topicPartition, error) => + val finalError = + error match { + case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => Errors.NOT_ENOUGH_REPLICAS + case _ => error + } + val message = + error match { + case Errors.INVALID_TXN_STATE => "Partition was not added to the transaction" + case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" Review Comment: nit: I would add a space after the `:`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org