junrao commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161909242


##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+    nodesToTransactions.synchronized {

Review Comment:
   Our long term goal is to replace the scala code with java. Could we write 
this new class and the corresponding test in java?



##########
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##########
@@ -29,7 +29,7 @@
   // The AddPartitionsToTxnRequest version 4 API is added as part of KIP-890 
and is still
   // under developement. Hence, the API is not exposed by default by brokers
   // unless explicitely enabled.
-  "latestVersionUnstable": true,
+  "latestVersionUnstable": false,

Review Comment:
   The above comment still says "is still under developement". Is the latest 
version indeed stable? Or should we change the comment accordingly?



##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -354,6 +361,7 @@ class RequestChannel(val queueSize: Int,
   private val processors = new ConcurrentHashMap[Int, Processor]()
   val requestQueueSizeMetricName = 
metricNamePrefix.concat(RequestQueueSizeMetric)
   val responseQueueSizeMetricName = 
metricNamePrefix.concat(ResponseQueueSizeMetric)
+  private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)

Review Comment:
   This seems to be a more general mechanism than ActionQueue. Could we move 
all existing ActionQueue usage to callback queue and get rid of ActionQueue? 
This could be done in a separate PR.



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+    nodesToTransactions.synchronized {
+      // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+      val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+        new TransactionDataAndCallbacks(
+          new AddPartitionsToTxnTransactionCollection(1),
+          mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+      val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+      // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+      // reconnected so return the retriable network exception.
+      if (currentTransactionData != null) {
+        val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+          Errors.INVALID_PRODUCER_EPOCH
+        else 
+          Errors.NETWORK_EXCEPTION

Review Comment:
   NETWORK_EXCEPTION is not listed in ProduceResponse. Should we add it there 
and verify that it's handled as expected by existing clients?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -616,66 +619,128 @@ class ReplicaManager(val config: KafkaConfig,
                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
                     delayedProduceLock: Option[Lock] = None,
                     recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit = _ => (),
-                    requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
+                    requestLocal: RequestLocal = RequestLocal.NoCaching,
+                    transactionalId: String = null,
+                    transactionStatePartition: Option[Int] = None): Unit = {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
-      val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
-        origin, entriesPerPartition, requiredAcks, requestLocal)
-      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
-
-      val produceStatus = localProduceResults.map { case (topicPartition, 
result) =>
-        topicPartition -> ProducePartitionStatus(
-          result.info.lastOffset + 1, // required offset
-          new PartitionResponse(
-            result.error,
-            result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
-            result.info.logAppendTime,
-            result.info.logStartOffset,
-            result.info.recordErrors,
-            result.info.errorMessage
+      
+      val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = 
+        if (transactionStatePartition.isEmpty || 
!config.transactionPartitionVerificationEnable)
+          (entriesPerPartition, Map.empty)
+        else
+          entriesPerPartition.partition { case (topicPartition, records) =>
+            
getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+          }
+
+      def appendEntries(allEntries: Map[TopicPartition, 
MemoryRecords])(unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
+        val verifiedEntries = 
+          if (unverifiedEntries.isEmpty) 
+            allEntries 
+          else
+            allEntries.filter { case (tp, _) =>
+              !unverifiedEntries.contains(tp)
+            }
+        
+        val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
+          origin, verifiedEntries, requiredAcks, requestLocal)
+        debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
+        
+        val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
+          // NOTE: Older clients return INVALID_RECORD, but newer clients will 
return INVALID_TXN_STATE
+          val message = if (error.equals(Errors.INVALID_RECORD)) "Partition 
was not added to the transaction" else error.message()
+          topicPartition -> LogAppendResult(
+            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+            Some(error.exception(message))
           )
-        ) // response status
-      }
+        }
+        
+        val allResults = localProduceResults ++ unverifiedResults
+
+        val produceStatus = allResults.map { case (topicPartition, result) =>
+          topicPartition -> ProducePartitionStatus(
+            result.info.lastOffset + 1, // required offset
+            new PartitionResponse(
+              result.error,
+              result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+              result.info.logAppendTime,
+              result.info.logStartOffset,
+              result.info.recordErrors,
+              result.info.errorMessage
+            )
+          ) // response status
+        }
 
-      actionQueue.add {
-        () =>
-          localProduceResults.foreach {
-            case (topicPartition, result) =>
-              val requestKey = TopicPartitionOperationKey(topicPartition)
-              result.info.leaderHwChange match {
-                case LeaderHwChange.INCREASED =>
-                  // some delayed operations may be unblocked after HW changed
-                  delayedProducePurgatory.checkAndComplete(requestKey)
-                  delayedFetchPurgatory.checkAndComplete(requestKey)
-                  delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
-                case LeaderHwChange.SAME =>
-                  // probably unblock some follower fetch requests since log 
end offset has been updated
-                  delayedFetchPurgatory.checkAndComplete(requestKey)
-                case LeaderHwChange.NONE =>
+        actionQueue.add {
+          () =>
+            allResults.foreach {
+              case (topicPartition, result) =>
+                val requestKey = TopicPartitionOperationKey(topicPartition)
+                result.info.leaderHwChange match {
+                  case LeaderHwChange.INCREASED =>
+                    // some delayed operations may be unblocked after HW 
changed
+                    delayedProducePurgatory.checkAndComplete(requestKey)
+                    delayedFetchPurgatory.checkAndComplete(requestKey)
+                    delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+                  case LeaderHwChange.SAME =>
+                    // probably unblock some follower fetch requests since log 
end offset has been updated
+                    delayedFetchPurgatory.checkAndComplete(requestKey)
+                  case LeaderHwChange.NONE =>
                   // nothing
-              }
-          }
-      }
+                }
+            }
+        }
+
+        recordConversionStatsCallback(localProduceResults.map { case (k, v) => 
k -> v.info.recordConversionStats })
 
-      recordConversionStatsCallback(localProduceResults.map { case (k, v) => k 
-> v.info.recordConversionStats })
+        if (delayedProduceRequestRequired(requiredAcks, allEntries, 
allResults)) {
+          // create delayed produce operation
+          val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
+          val delayedProduce = new DelayedProduce(timeout, produceMetadata, 
this, responseCallback, delayedProduceLock)
 
-      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
localProduceResults)) {
-        // create delayed produce operation
-        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-        val delayedProduce = new DelayedProduce(timeout, produceMetadata, 
this, responseCallback, delayedProduceLock)
+          // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
+          val producerRequestKeys = 
allEntries.keys.map(TopicPartitionOperationKey(_)).toSeq
 
-        // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
-        val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
+          // try to complete the request immediately, otherwise put it into 
the purgatory
+          // this is because while the delayed produce operation is being 
created, new
+          // requests may arrive and hence make this operation completable.
+          delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
 
-        // try to complete the request immediately, otherwise put it into the 
purgatory
-        // this is because while the delayed produce operation is being 
created, new
-        // requests may arrive and hence make this operation completable.
-        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+        } else {
+          // we can respond immediately
+          val produceResponseStatus = produceStatus.map { case (k, status) => 
k -> status.responseStatus }
+          responseCallback(produceResponseStatus)
+        }
+      }
 
+      if (notYetVerifiedEntriesPerPartition.isEmpty || 
addPartitionsToTxnManager.isEmpty) {
+        appendEntries(verifiedEntriesPerPartition)(Map.empty)
       } else {
-        // we can respond immediately
-        val produceResponseStatus = produceStatus.map { case (k, status) => k 
-> status.responseStatus }
-        responseCallback(produceResponseStatus)
+        // For unverified entries, send a request to verify. When verified, 
the append process will proceed via the callback.
+        val (error, node) = 
getTransactionCoordinator(transactionStatePartition.get)
+
+        if (error != Errors.NONE) {
+          throw error.exception() // Can throw coordinator not available -- 
which is retriable

Review Comment:
   CoordinatorNotAvailable error is not listed in ProduceResponse. Should we 
add it there and verify that it's handled as expected by existing clients?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
           completeShutdown()
           return
 
+        case callback: RequestChannel.CallbackRequest =>
+          try {
+            val originalRequest = callback.originalRequest
+            
+            // If we've already executed a callback for this request, reset 
the times and subtract the callback time from the 
+            // new dequeue time. This will allow calculation of multiple 
callback times.
+            // Otherwise, set dequeue time to now.
+            if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
+              val prevCallbacksTimeNanos = 
originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - 
originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
+              originalRequest.callbackRequestCompleteTimeNanos = None
+              originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds() - prevCallbacksTimeNanos)
+            } else {
+              originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds())
+            }
+            
+            currentRequest.set(originalRequest)
+            callback.fun()
+            if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)
+              originalRequest.callbackRequestCompleteTimeNanos = 
Some(time.nanoseconds())
+          } catch {
+            case e: FatalExitError =>
+              completeShutdown()
+              Exit.exit(e.statusCode)
+            case e: Throwable => error("Exception when handling request", e)
+          } finally {
+            currentRequest.remove()
+          }
+
         case request: RequestChannel.Request =>
           try {
             request.requestDequeueTimeNanos = endTime
             trace(s"Kafka request handler $id on broker $brokerId handling 
request $request")
+            currentRequest.set(request)
             apis.handle(request, requestLocal)
           } catch {
             case e: FatalExitError =>
               completeShutdown()
               Exit.exit(e.statusCode)
             case e: Throwable => error("Exception when handling request", e)
           } finally {
+            currentRequest.remove()
             request.releaseBuffer()
           }
 
+        case RequestChannel.WakeupRequest => // We should handle this in 
receiveRequest by polling callbackQueue.

Review Comment:
   Since we don't expect to see  WakeupRequest here, should we add a warning 
log?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -35,6 +36,43 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
 }
 
+object KafkaRequestHandler {
+  // Support for scheduling callbacks on a request thread.
+  private val threadRequestChannel = new ThreadLocal[RequestChannel]
+  private val currentRequest = new ThreadLocal[RequestChannel.Request]
+
+  // For testing
+  private var bypassThreadCheck = false

Review Comment:
   Does this need to be volatile?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
           completeShutdown()
           return
 
+        case callback: RequestChannel.CallbackRequest =>
+          try {
+            val originalRequest = callback.originalRequest
+            
+            // If we've already executed a callback for this request, reset 
the times and subtract the callback time from the 
+            // new dequeue time. This will allow calculation of multiple 
callback times.
+            // Otherwise, set dequeue time to now.
+            if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
+              val prevCallbacksTimeNanos = 
originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - 
originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
+              originalRequest.callbackRequestCompleteTimeNanos = None
+              originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds() - prevCallbacksTimeNanos)
+            } else {
+              originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds())
+            }
+            
+            currentRequest.set(originalRequest)
+            callback.fun()
+            if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)

Review Comment:
   So, `originalRequest.callbackRequestCompleteTimeNanos.isEmpty` is expected 
to be true? Should we add a warning log if it's false?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -616,66 +619,128 @@ class ReplicaManager(val config: KafkaConfig,
                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
                     delayedProduceLock: Option[Lock] = None,
                     recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit = _ => (),
-                    requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
+                    requestLocal: RequestLocal = RequestLocal.NoCaching,
+                    transactionalId: String = null,

Review Comment:
   The params of this method is getting a bit large. Could we add the javadoc 
explaining each of the param?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
           completeShutdown()
           return
 
+        case callback: RequestChannel.CallbackRequest =>
+          try {
+            val originalRequest = callback.originalRequest
+            
+            // If we've already executed a callback for this request, reset 
the times and subtract the callback time from the 
+            // new dequeue time. This will allow calculation of multiple 
callback times.

Review Comment:
   Do we have a use case where the same callback needs to be handled multiple 
times by the request thread? How do we prevent that the callback is added an 
infinite number of time to the callback queue?



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+    nodesToTransactions.synchronized {
+      // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.

Review Comment:
   Incomplete sentence "Check if we have already".



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+    nodesToTransactions.synchronized {
+      // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+      val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+        new TransactionDataAndCallbacks(
+          new AddPartitionsToTxnTransactionCollection(1),
+          mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+      val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+      // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+      // reconnected so return the retriable network exception.
+      if (currentTransactionData != null) {
+        val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+          Errors.INVALID_PRODUCER_EPOCH
+        else 
+          Errors.NETWORK_EXCEPTION
+        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+        currentTransactionData.topics().forEach { topic =>
+          topic.partitions().forEach { partition =>
+            topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
+          }
+        }
+        val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+        currentNodeAndTransactionData.transactionData.remove(transactionData)
+        oldCallback(topicPartitionsToError.toMap)
+      }
+      currentNodeAndTransactionData.transactionData.add(transactionData)
+      
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+      wakeup()
+    }
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+    override def onComplete(response: ClientResponse): Unit = {
+      // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+      inflightNodes.remove(node)
+      if (response.authenticationException() != null) {
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with an " +
+          "authentication exception.", response.authenticationException)
+        transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
+          callback(buildErrorMap(txnId, 
transactionDataAndCallbacks.transactionData, 
Errors.forException(response.authenticationException()).code()))
+        }
+      } else if (response.versionMismatch != null) {
+        // We may see unsupported version exception if we try to send a verify 
only request to a broker that can't handle it. 
+        // In this case, skip verification.
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with invalid version exception. This suggests verification is not supported." +

Review Comment:
   Since this can be skipped, should this be warn instead of error?



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+    nodesToTransactions.synchronized {
+      // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+      val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+        new TransactionDataAndCallbacks(
+          new AddPartitionsToTxnTransactionCollection(1),
+          mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+      val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+      // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+      // reconnected so return the retriable network exception.
+      if (currentTransactionData != null) {
+        val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+          Errors.INVALID_PRODUCER_EPOCH
+        else 
+          Errors.NETWORK_EXCEPTION
+        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+        currentTransactionData.topics().forEach { topic =>
+          topic.partitions().forEach { partition =>
+            topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
+          }
+        }
+        val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+        currentNodeAndTransactionData.transactionData.remove(transactionData)
+        oldCallback(topicPartitionsToError.toMap)
+      }
+      currentNodeAndTransactionData.transactionData.add(transactionData)
+      
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+      wakeup()
+    }
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+    override def onComplete(response: ClientResponse): Unit = {
+      // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+      inflightNodes.remove(node)
+      if (response.authenticationException() != null) {
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with an " +
+          "authentication exception.", response.authenticationException)
+        transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
+          callback(buildErrorMap(txnId, 
transactionDataAndCallbacks.transactionData, 
Errors.forException(response.authenticationException()).code()))
+        }
+      } else if (response.versionMismatch != null) {
+        // We may see unsupported version exception if we try to send a verify 
only request to a broker that can't handle it. 
+        // In this case, skip verification.
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with invalid version exception. This suggests verification is not supported." +
+              s"Continuing handling the produce request.")
+        transactionDataAndCallbacks.callbacks.values.foreach(_(Map.empty))
+      } else {
+        val addPartitionsToTxnResponseData = 
response.responseBody.asInstanceOf[AddPartitionsToTxnResponse].data
+        if (addPartitionsToTxnResponseData.errorCode != 0) {
+          error(s"AddPartitionsToTxnRequest for broker ${config.brokerId}  
returned with error 
${Errors.forCode(addPartitionsToTxnResponseData.errorCode)}.")
+          // The client should not be exposed to CLUSTER_AUTHORIZATION_FAILED 
so modify the error to invalid record -- to signify the verification did not 
complete.
+          // Older clients return with INVALID_RECORD
+          val finalError = if (addPartitionsToTxnResponseData.errorCode() == 
Errors.CLUSTER_AUTHORIZATION_FAILED.code)
+            Errors.INVALID_RECORD.code
+          else 
+            addPartitionsToTxnResponseData.errorCode()
+          
+          transactionDataAndCallbacks.callbacks.foreach { case (txnId, 
callback) =>
+            callback(buildErrorMap(txnId, 
transactionDataAndCallbacks.transactionData, finalError))
+          }
+        } else {
+          addPartitionsToTxnResponseData.resultsByTransaction().forEach { 
transactionResult =>
+            val unverified = mutable.Map[TopicPartition, Errors]()
+            transactionResult.topicResults().forEach { topicResult =>
+              topicResult.resultsByPartition().forEach { partitionResult =>
+                val tp = new TopicPartition(topicResult.name(), 
partitionResult.partitionIndex())
+                if (partitionResult.partitionErrorCode() != 
Errors.NONE.code()) {
+                  // Producers expect to handle INVALID_PRODUCER_EPOCH in this 
scenario.
+                  val code = 
+                    if (partitionResult.partitionErrorCode() == 
Errors.PRODUCER_FENCED.code)
+                      Errors.INVALID_PRODUCER_EPOCH.code
+                    // Older clients return INVALID_RECORD  
+                    else if (partitionResult.partitionErrorCode() == 
Errors.INVALID_TXN_STATE.code)
+                      Errors.INVALID_RECORD.code  
+                    else 
+                      partitionResult.partitionErrorCode()
+                  unverified.put(tp, Errors.forCode(code))
+                }
+              }
+            }
+            val callback = 
transactionDataAndCallbacks.callbacks(transactionResult.transactionalId())
+            callback(unverified.toMap)
+          }
+        }
+      }
+      wakeup()
+    }
+    
+    private def buildErrorMap(transactionalId: String, 
addPartitionsToTxnCollection: AddPartitionsToTxnTransactionCollection, 
errorCode: Short): Map[TopicPartition, Errors] = {

Review Comment:
   addPartitionsToTxnCollection seems unused?



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+    nodesToTransactions.synchronized {
+      // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+      val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+        new TransactionDataAndCallbacks(
+          new AddPartitionsToTxnTransactionCollection(1),
+          mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+      val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+      // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+      // reconnected so return the retriable network exception.
+      if (currentTransactionData != null) {
+        val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+          Errors.INVALID_PRODUCER_EPOCH
+        else 
+          Errors.NETWORK_EXCEPTION
+        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+        currentTransactionData.topics().forEach { topic =>
+          topic.partitions().forEach { partition =>
+            topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
+          }
+        }
+        val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+        currentNodeAndTransactionData.transactionData.remove(transactionData)
+        oldCallback(topicPartitionsToError.toMap)
+      }
+      currentNodeAndTransactionData.transactionData.add(transactionData)
+      
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+      wakeup()
+    }
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+    override def onComplete(response: ClientResponse): Unit = {
+      // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+      inflightNodes.remove(node)
+      if (response.authenticationException() != null) {
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with an " +
+          "authentication exception.", response.authenticationException)
+        transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
+          callback(buildErrorMap(txnId, 
transactionDataAndCallbacks.transactionData, 
Errors.forException(response.authenticationException()).code()))
+        }
+      } else if (response.versionMismatch != null) {
+        // We may see unsupported version exception if we try to send a verify 
only request to a broker that can't handle it. 
+        // In this case, skip verification.
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with invalid version exception. This suggests verification is not supported." +
+              s"Continuing handling the produce request.")
+        transactionDataAndCallbacks.callbacks.values.foreach(_(Map.empty))
+      } else {
+        val addPartitionsToTxnResponseData = 
response.responseBody.asInstanceOf[AddPartitionsToTxnResponse].data
+        if (addPartitionsToTxnResponseData.errorCode != 0) {
+          error(s"AddPartitionsToTxnRequest for broker ${config.brokerId}  
returned with error 
${Errors.forCode(addPartitionsToTxnResponseData.errorCode)}.")

Review Comment:
   extra space before returned



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+                                  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+    nodesToTransactions.synchronized {
+      // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+      val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+        new TransactionDataAndCallbacks(
+          new AddPartitionsToTxnTransactionCollection(1),
+          mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+      val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+      // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+      // reconnected so return the retriable network exception.
+      if (currentTransactionData != null) {
+        val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+          Errors.INVALID_PRODUCER_EPOCH
+        else 
+          Errors.NETWORK_EXCEPTION
+        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+        currentTransactionData.topics().forEach { topic =>
+          topic.partitions().forEach { partition =>
+            topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
+          }
+        }
+        val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+        currentNodeAndTransactionData.transactionData.remove(transactionData)
+        oldCallback(topicPartitionsToError.toMap)
+      }
+      currentNodeAndTransactionData.transactionData.add(transactionData)
+      
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+      wakeup()
+    }
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+    override def onComplete(response: ClientResponse): Unit = {
+      // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+      inflightNodes.remove(node)
+      if (response.authenticationException() != null) {
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with an " +
+          "authentication exception.", response.authenticationException)
+        transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
+          callback(buildErrorMap(txnId, 
transactionDataAndCallbacks.transactionData, 
Errors.forException(response.authenticationException()).code()))
+        }
+      } else if (response.versionMismatch != null) {
+        // We may see unsupported version exception if we try to send a verify 
only request to a broker that can't handle it. 
+        // In this case, skip verification.
+        error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with invalid version exception. This suggests verification is not supported." +
+              s"Continuing handling the produce request.")
+        transactionDataAndCallbacks.callbacks.values.foreach(_(Map.empty))
+      } else {
+        val addPartitionsToTxnResponseData = 
response.responseBody.asInstanceOf[AddPartitionsToTxnResponse].data
+        if (addPartitionsToTxnResponseData.errorCode != 0) {
+          error(s"AddPartitionsToTxnRequest for broker ${config.brokerId}  
returned with error 
${Errors.forCode(addPartitionsToTxnResponseData.errorCode)}.")
+          // The client should not be exposed to CLUSTER_AUTHORIZATION_FAILED 
so modify the error to invalid record -- to signify the verification did not 
complete.
+          // Older clients return with INVALID_RECORD

Review Comment:
   Hmm, not quite sure that I follow. Do you mean that we return INVALID_RECORD 
to be compatible for old clients?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to