This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 83b7c9a0535 MINOR Re-add action queue parameter removed from
appendRecords (#14753)
83b7c9a0535 is described below
commit 83b7c9a053546e59546d8ff904025a21bd41daa7
Author: Justine Olshan <[email protected]>
AuthorDate: Tue Nov 14 15:56:42 2023 -0800
MINOR Re-add action queue parameter removed from appendRecords (#14753)
In 91fa196, I accidentally removed the action queue paramater that was
added in 7d147cf. I also renamed the actionQueue as to not confuse this in the
future.
I don't think this broke anything since we don't use verification for group
coordinator commits, but I should fix it to be as it was before.
Reviewers: Artem Livshits <[email protected]>, Jason Gustafson
<[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 41 +++++++++++-----------
1 file changed, 21 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9f0c2c05fb6..3eaef6bb444 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -740,9 +740,9 @@ class ReplicaManager(val config: KafkaConfig,
/**
* TODO: move this action queue to handle thread so we can simplify
concurrency handling
*/
- private val actionQueue = new DelayedActionQueue
+ private val defaultActionQueue = new DelayedActionQueue
- def tryCompleteActions(): Unit = actionQueue.tryCompleteActions()
+ def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
/**
* Append messages to leader replicas of the partition, and wait for them to
be replicated to other replicas;
@@ -763,7 +763,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param recordConversionStatsCallback callback for updating stats on
record conversions
* @param requestLocal container for the stateful instances
scoped to this request
* @param transactionalId transactional ID if the request is
from a producer and the producer is transactional
- * @param actionQueue the action queue to use.
ReplicaManager#actionQueue is used by default.
+ * @param actionQueue the action queue to use.
ReplicaManager#defaultActionQueue is used by default.
*/
def appendRecords(timeout: Long,
requiredAcks: Short,
@@ -775,7 +775,7 @@ class ReplicaManager(val config: KafkaConfig,
recordConversionStatsCallback: Map[TopicPartition,
RecordConversionStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.NoCaching,
transactionalId: String = null,
- actionQueue: ActionQueue = this.actionQueue): Unit = {
+ actionQueue: ActionQueue = this.defaultActionQueue): Unit
= {
if (isValidRequiredAcks(requiredAcks)) {
val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] =
mutable.Map[TopicPartition, VerificationGuard]()
@@ -792,7 +792,7 @@ class ReplicaManager(val config: KafkaConfig,
if (notYetVerifiedEntriesPerPartition.isEmpty ||
addPartitionsToTxnManager.isEmpty) {
appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed,
origin, requiredAcks, verificationGuards.toMap,
- errorsPerPartition, recordConversionStatsCallback, timeout,
responseCallback, delayedProduceLock)(requestLocal, Map.empty)
+ errorsPerPartition, recordConversionStatsCallback, timeout,
responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty)
} else {
// For unverified entries, send a request to verify. When verified,
the append process will proceed via the callback.
// We verify above that all partitions use the same producer ID.
@@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig,
recordConversionStatsCallback,
timeout,
responseCallback,
- delayedProduceLock
+ delayedProduceLock,
+ actionQueue
),
requestLocal)
))
@@ -846,7 +847,8 @@ class ReplicaManager(val config: KafkaConfig,
recordConversionStatsCallback: Map[TopicPartition,
RecordConversionStats] => Unit,
timeout: Long,
responseCallback: Map[TopicPartition,
PartitionResponse] => Unit,
- delayedProduceLock: Option[Lock])
+ delayedProduceLock: Option[Lock],
+ actionQueue: ActionQueue)
(requestLocal: RequestLocal, unverifiedEntries:
Map[TopicPartition, Errors]): Unit = {
val sTime = time.milliseconds
val verifiedEntries =
@@ -898,19 +900,18 @@ class ReplicaManager(val config: KafkaConfig,
}
actionQueue.add {
- () =>
- allResults.foreach { case (topicPartition, result) =>
- val requestKey = TopicPartitionOperationKey(topicPartition)
- result.info.leaderHwChange match {
- case LeaderHwChange.INCREASED =>
- // some delayed operations may be unblocked after HW changed
- delayedProducePurgatory.checkAndComplete(requestKey)
- delayedFetchPurgatory.checkAndComplete(requestKey)
- delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
- case LeaderHwChange.SAME =>
- // probably unblock some follower fetch requests since log end
offset has been updated
- delayedFetchPurgatory.checkAndComplete(requestKey)
- case LeaderHwChange.NONE =>
+ () => allResults.foreach { case (topicPartition, result) =>
+ val requestKey = TopicPartitionOperationKey(topicPartition)
+ result.info.leaderHwChange match {
+ case LeaderHwChange.INCREASED =>
+ // some delayed operations may be unblocked after HW changed
+ delayedProducePurgatory.checkAndComplete(requestKey)
+ delayedFetchPurgatory.checkAndComplete(requestKey)
+ delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+ case LeaderHwChange.SAME =>
+ // probably unblock some follower fetch requests since log end
offset has been updated
+ delayedFetchPurgatory.checkAndComplete(requestKey)
+ case LeaderHwChange.NONE =>
// nothing
}
}