jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1120986902


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (config.interBrokerProtocolVersion.isLessThan(version))
       throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(IBP_0_11_0_IV0)
-    val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-    val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-    val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-    else {
-      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val authorizedPartitions = mutable.Set[TopicPartition]()
-
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-        partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-      for (topicPartition <- partitionsToAdd) {
-        if (!authorizedTopics.contains(topicPartition.topic))
-          unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-        else
-          authorizedPartitions.add(topicPartition)
+    val addPartitionsToTxnRequest =
+      if (request.context.apiVersion() < 4) 
+        request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+      else 
+        request.body[AddPartitionsToTxnRequest]
+    val version = addPartitionsToTxnRequest.version
+    val responses = new AddPartitionsToTxnResultCollection()
+    val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+    
+    // Newer versions of the request should only come from other brokers.
+    if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+    // V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+    // response so there are a few differences in handling errors and sending 
responses.
+    def createResponse(requestThrottleMs: Int): AbstractResponse = {
+      if (version < 4) {
+        // There will only be one response in data. Add it to the response 
data object.
+        val data = new AddPartitionsToTxnResponseData()
+        responses.forEach(result => {
+          data.setResultsByTopicV3AndBelow(result.topicResults())
+          data.setThrottleTimeMs(requestThrottleMs)
+        })
+        new AddPartitionsToTxnResponse(data)
+      } else {
+        new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
       }
+    }
 
-      if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-        // Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-        // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-        // the authorization check to indicate that they were not added to the 
transaction.
-        val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-          authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+    val txns = addPartitionsToTxnRequest.data.transactions
+    def maybeSendResponse(): Unit = {
+      var canSend = false
+      responses.synchronized {
+        if (responses.size() == txns.size()) {
+          canSend = true
+        }
+      }
+      if (canSend) {
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+
+    txns.forEach( transaction => {
+      val transactionalId = transaction.transactionalId
+      val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+      
+      // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+      if (version < 4 && !authHelper.authorize(request.context, WRITE, 
TRANSACTIONAL_ID, transactionalId)) {
+        
responses.add(addPartitionsToTxnRequest.errorResponseForTransaction(transactionalId,
 Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+        maybeSendResponse()
       } else {
-        def sendResponseCallback(error: Errors): Unit = {
-          def createResponse(requestThrottleMs: Int): AbstractResponse = {
-            val finalError =
-              if (addPartitionsToTxnRequest.version < 2 && error == 
Errors.PRODUCER_FENCED) {
+        val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+        val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+        val authorizedPartitions = mutable.Set[TopicPartition]()
+
+        val authorizedTopics = if (version < 4) 
authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
+          partitionsToAdd.filterNot(tp => 
Topic.isInternal(tp.topic)))(_.topic) else partitionsToAdd.map(_.topic).toSet
+        for (topicPartition <- partitionsToAdd) {
+          if (!authorizedTopics.contains(topicPartition.topic))
+            unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
+          else if (!metadataCache.contains(topicPartition))
+            nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
+          else
+            authorizedPartitions.add(topicPartition)
+        }
+
+        if (unauthorizedTopicErrors.nonEmpty || 
nonExistingTopicErrors.nonEmpty) {
+          // Any failed partition check causes the entire transaction to fail. 
We send the appropriate error codes for the
+          // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
+          // the authorization check to indicate that they were not added to 
the transaction.
+          val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
+            authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
+          
responses.add(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, 
partitionErrors.asJava))
+          maybeSendResponse()
+        } else {
+          def sendResponseCallback(error: Errors): Unit = {
+            val finalError = {
+              if (version < 2 && error == Errors.PRODUCER_FENCED) {
                 // For older clients, they could not understand the new 
PRODUCER_FENCED error code,
                 // so we need to return the old INVALID_PRODUCER_EPOCH to have 
the same client handling logic.
                 Errors.INVALID_PRODUCER_EPOCH
               } else {
                 error
               }
-
-            val responseBody: AddPartitionsToTxnResponse = new 
AddPartitionsToTxnResponse(requestThrottleMs,
-              partitionsToAdd.map{tp => (tp, finalError)}.toMap.asJava)
-            trace(s"Completed $transactionalId's AddPartitionsToTxnRequest 
with partitions $partitionsToAdd: errors: $error from client 
${request.header.clientId}")
-            responseBody
+            }
+            responses.synchronized {
+              
responses.add(addPartitionsToTxnRequest.errorResponseForTransaction(transactionalId,
 finalError))
+            }
+            maybeSendResponse()
+          }
+          
+          def sendVerifyResponseCallback(errors: Map[TopicPartition, Errors]): 
Unit = {
+            responses.synchronized {
+              
responses.add(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, 
errors.asJava))
+            }
+            maybeSendResponse()
           }
 
-          requestHelper.sendResponseMaybeThrottle(request, createResponse)
+          txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
+            transaction.producerId,
+            transaction.producerEpoch,
+            authorizedPartitions,
+            transaction.verifyOnly,
+            sendResponseCallback,
+            sendVerifyResponseCallback,

Review Comment:
   Is the request to make a mapping of every topic partition to the same error? 
That doesn't seem super efficient, but I guess it makes things simpler in the 
parameters. We would also need to define a new callback in 
`handleAddPartitionsToTransaction` that calls the original callback (so we have 
two still)
   
   I suppose this is doable though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to