This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83b7c9a0535 MINOR Re-add action queue parameter removed from 
appendRecords (#14753)
83b7c9a0535 is described below

commit 83b7c9a053546e59546d8ff904025a21bd41daa7
Author: Justine Olshan <[email protected]>
AuthorDate: Tue Nov 14 15:56:42 2023 -0800

    MINOR Re-add action queue parameter removed from appendRecords (#14753)
    
    In 91fa196, I accidentally removed the action queue paramater that was 
added in 7d147cf. I also renamed the actionQueue as to not confuse this in the 
future.
    
    I don't think this broke anything since we don't use verification for group 
coordinator commits, but I should fix it to be as it was before.
    
    Reviewers:  Artem Livshits <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../main/scala/kafka/server/ReplicaManager.scala   | 41 +++++++++++-----------
 1 file changed, 21 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9f0c2c05fb6..3eaef6bb444 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -740,9 +740,9 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * TODO: move this action queue to handle thread so we can simplify 
concurrency handling
    */
-  private val actionQueue = new DelayedActionQueue
+  private val defaultActionQueue = new DelayedActionQueue
 
-  def tryCompleteActions(): Unit = actionQueue.tryCompleteActions()
+  def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
 
   /**
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
@@ -763,7 +763,7 @@ class ReplicaManager(val config: KafkaConfig,
    * @param recordConversionStatsCallback callback for updating stats on 
record conversions
    * @param requestLocal                  container for the stateful instances 
scoped to this request
    * @param transactionalId               transactional ID if the request is 
from a producer and the producer is transactional
-   * @param actionQueue                   the action queue to use. 
ReplicaManager#actionQueue is used by default.
+   * @param actionQueue                   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
    */
   def appendRecords(timeout: Long,
                     requiredAcks: Short,
@@ -775,7 +775,7 @@ class ReplicaManager(val config: KafkaConfig,
                     recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit = _ => (),
                     requestLocal: RequestLocal = RequestLocal.NoCaching,
                     transactionalId: String = null,
-                    actionQueue: ActionQueue = this.actionQueue): Unit = {
+                    actionQueue: ActionQueue = this.defaultActionQueue): Unit 
= {
     if (isValidRequiredAcks(requiredAcks)) {
 
       val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = 
mutable.Map[TopicPartition, VerificationGuard]()
@@ -792,7 +792,7 @@ class ReplicaManager(val config: KafkaConfig,
 
       if (notYetVerifiedEntriesPerPartition.isEmpty || 
addPartitionsToTxnManager.isEmpty) {
         appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed, 
origin, requiredAcks, verificationGuards.toMap,
-          errorsPerPartition, recordConversionStatsCallback, timeout, 
responseCallback, delayedProduceLock)(requestLocal, Map.empty)
+          errorsPerPartition, recordConversionStatsCallback, timeout, 
responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty)
       } else {
         // For unverified entries, send a request to verify. When verified, 
the append process will proceed via the callback.
         // We verify above that all partitions use the same producer ID.
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
               recordConversionStatsCallback,
               timeout,
               responseCallback,
-              delayedProduceLock
+              delayedProduceLock,
+              actionQueue
             ),
             requestLocal)
         ))
@@ -846,7 +847,8 @@ class ReplicaManager(val config: KafkaConfig,
                             recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit,
                             timeout: Long,
                             responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
-                            delayedProduceLock: Option[Lock])
+                            delayedProduceLock: Option[Lock],
+                            actionQueue: ActionQueue)
                            (requestLocal: RequestLocal, unverifiedEntries: 
Map[TopicPartition, Errors]): Unit = {
     val sTime = time.milliseconds
     val verifiedEntries =
@@ -898,19 +900,18 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     actionQueue.add {
-      () =>
-        allResults.foreach { case (topicPartition, result) =>
-          val requestKey = TopicPartitionOperationKey(topicPartition)
-          result.info.leaderHwChange match {
-            case LeaderHwChange.INCREASED =>
-              // some delayed operations may be unblocked after HW changed
-              delayedProducePurgatory.checkAndComplete(requestKey)
-              delayedFetchPurgatory.checkAndComplete(requestKey)
-              delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
-            case LeaderHwChange.SAME =>
-              // probably unblock some follower fetch requests since log end 
offset has been updated
-              delayedFetchPurgatory.checkAndComplete(requestKey)
-            case LeaderHwChange.NONE =>
+      () => allResults.foreach { case (topicPartition, result) =>
+        val requestKey = TopicPartitionOperationKey(topicPartition)
+        result.info.leaderHwChange match {
+          case LeaderHwChange.INCREASED =>
+            // some delayed operations may be unblocked after HW changed
+            delayedProducePurgatory.checkAndComplete(requestKey)
+            delayedFetchPurgatory.checkAndComplete(requestKey)
+            delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+          case LeaderHwChange.SAME =>
+            // probably unblock some follower fetch requests since log end 
offset has been updated
+            delayedFetchPurgatory.checkAndComplete(requestKey)
+          case LeaderHwChange.NONE =>
             // nothing
           }
         }

Reply via email to