This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8fce97b2c35 KAFKA-19144 Move DelayedProduce to server module (#19793)
8fce97b2c35 is described below
commit 8fce97b2c3584435840ec5d8c2ad6377124d3c6f
Author: S.Y. Wang <[email protected]>
AuthorDate: Tue Mar 10 04:48:39 2026 +0900
KAFKA-19144 Move DelayedProduce to server module (#19793)
This PR moves `DelayedProduce` to the server module. One notable change
is that the type of the `responseCallback` parameter in
`ReplicaManager#appendRecords()` has been changed to a Java `Map`. Other
related type changes have been made accordingly.
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, TengYao Chi <[email protected]>, DL1231
<[email protected]>, Kuan-Po Tseng
<[email protected]>, Christo Lolov <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../transaction/TransactionStateManager.scala | 11 +-
.../main/scala/kafka/server/DelayedProduce.scala | 161 ----------------
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 59 ++++--
.../kafka/server/LocalLeaderEndPointTest.scala | 6 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 2 +-
.../AbstractCoordinatorConcurrencyTest.scala | 19 +-
.../transaction/TransactionStateManagerTest.scala | 10 +-
.../unit/kafka/server/DelayedProduceTest.scala | 7 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 16 +-
.../server/ReplicaManagerConcurrencyTest.scala | 8 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 16 +-
.../kafka/server/purgatory/DelayedProduce.java | 214 +++++++++++++++++++++
14 files changed, 308 insertions(+), 225 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 82a3f233fe2..e7a5b297674 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -44,7 +44,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.storage.internals.log.{AppendOrigin,
AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo,
LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog,
VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.partition.{AlterPartitionListener,
AssignmentState, CommittedPartitionState, OngoingReassignmentState,
PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange,
PendingShrinkIsr, SimpleAssignmentState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
import org.apache.kafka.server.replica.Replica
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
UnexpectedAppendOffsetException}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e67e81e1c88..fae1c857654 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -47,6 +47,7 @@ import org.apache.kafka.storage.internals.log.AppendOrigin
import com.google.re2j.{Pattern, PatternSyntaxException}
import org.apache.kafka.common.errors.InvalidRegularExpression
+import java.util
import java.util.Optional
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -258,8 +259,8 @@ class TransactionStateManager(brokerId: Int,
expiredForPartition: Iterable[TransactionalIdCoordinatorEpochAndMetadata],
tombstoneRecords: MemoryRecords
): Unit = {
- def removeFromCacheCallback(responses: collection.Map[TopicIdPartition,
PartitionResponse]): Unit = {
- responses.foreachEntry { (topicPartition, response) =>
+ def removeFromCacheCallback(responses: util.Map[TopicIdPartition,
PartitionResponse]): Unit = {
+ responses.forEach { (topicPartition, response) =>
inReadLock[Exception](stateLock, () => {
transactionMetadataCache.get(topicPartition.partition).foreach {
txnMetadataCacheEntry =>
expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
@@ -670,13 +671,13 @@ class TransactionStateManager(brokerId: Int,
val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
// set the callback function to update transaction status in cache after
log append completed
- def updateCacheCallback(responseStatus: collection.Map[TopicIdPartition,
PartitionResponse]): Unit = {
+ def updateCacheCallback(responseStatus: util.Map[TopicIdPartition,
PartitionResponse]): Unit = {
// the append response should only contain the topics partition
- if (responseStatus.size != 1 ||
!responseStatus.contains(transactionStateTopicIdPartition))
+ if (responseStatus.size != 1 ||
!responseStatus.containsKey(transactionStateTopicIdPartition))
throw new IllegalStateException("Append status %s should only have one
partition %s"
.format(responseStatus, transactionStateTopicPartition))
- val status = responseStatus(transactionStateTopicIdPartition)
+ val status = responseStatus.get(transactionStateTopicIdPartition)
var responseError = if (status.error == Errors.NONE) {
Errors.NONE
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala
b/core/src/main/scala/kafka/server/DelayedProduce.scala
deleted file mode 100644
index e2c9c72b4bb..00000000000
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import com.typesafe.scalalogging.Logger
-import com.yammer.metrics.core.Meter
-import kafka.utils.Logging
-import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-import org.apache.kafka.server.purgatory.DelayedOperation
-
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-case class ProducePartitionStatus(requiredOffset: Long, responseStatus:
PartitionResponse) {
- @volatile var acksPending = false
-
- override def toString: String = s"[acksPending: $acksPending, error:
${responseStatus.error.code}, " +
- s"startOffset: ${responseStatus.baseOffset}, requiredOffset:
$requiredOffset]"
-}
-
-/**
- * The produce metadata maintained by the delayed produce operation
- */
-case class ProduceMetadata(produceRequiredAcks: Short,
- produceStatus: Map[TopicIdPartition,
ProducePartitionStatus]) {
-
- override def toString = s"[requiredAcks: $produceRequiredAcks,
partitionStatus: $produceStatus]"
-}
-
-object DelayedProduce {
- private final val logger = Logger(classOf[DelayedProduce])
-}
-
-/**
- * A delayed produce operation that can be created by the replica manager and
watched
- * in the produce operation purgatory
- */
-class DelayedProduce(delayMs: Long,
- produceMetadata: ProduceMetadata,
- replicaManager: ReplicaManager,
- responseCallback: Map[TopicIdPartition,
PartitionResponse] => Unit)
- extends DelayedOperation(delayMs) with Logging {
-
- override lazy val logger: Logger = DelayedProduce.logger
-
- // first update the acks pending variable according to the error code
- produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
- if (status.responseStatus.error == Errors.NONE) {
- // Timeout error state will be cleared when required acks are received
- status.acksPending = true
- status.responseStatus.error = Errors.REQUEST_TIMED_OUT
- } else {
- status.acksPending = false
- }
-
- trace(s"Initial partition status for $topicPartition is $status")
- }
-
- /**
- * The delayed produce operation can be completed if every partition
- * it produces to is satisfied by one of the following:
- *
- * Case A: Replica not assigned to partition
- * Case B: Replica is no longer the leader of this partition
- * Case C: This broker is the leader:
- * C.1 - If there was a local error thrown while checking if at least
requiredAcks
- * replicas have caught up to this operation: set an error in
response
- * C.2 - Otherwise, set the response with no error.
- */
- override def tryComplete(): Boolean = {
- // check for each partition if it still has pending acks
- produceMetadata.produceStatus.foreachEntry { (topicIdPartition, status) =>
- trace(s"Checking produce satisfaction for $topicIdPartition, current
status $status")
- // skip those partitions that have already been satisfied
- if (status.acksPending) {
- val (hasEnough, error) =
replicaManager.getPartitionOrError(topicIdPartition.topicPartition()) match {
- case Left(err) =>
- // Case A
- (false, err)
-
- case Right(partition) =>
- partition.checkEnoughReplicasReachOffset(status.requiredOffset)
- }
-
- // Case B || C.1 || C.2
- if (error != Errors.NONE || hasEnough) {
- status.acksPending = false
- status.responseStatus.error = error
- }
- }
- }
-
- // check if every partition has satisfied at least one of case A, B or C
- if (!produceMetadata.produceStatus.values.exists(_.acksPending))
- forceComplete()
- else
- false
- }
-
- override def onExpiration(): Unit = {
- produceMetadata.produceStatus.foreachEntry { (topicIdPartition, status) =>
- if (status.acksPending) {
- debug(s"Expiring produce request for partition $topicIdPartition with
status $status")
-
DelayedProduceMetrics.recordExpiration(topicIdPartition.topicPartition())
- }
- }
- }
-
- /**
- * Upon completion, return the current response status along with the error
code per partition
- */
- override def onComplete(): Unit = {
- val responseStatus = produceMetadata.produceStatus.map { case (k, status)
=> k -> status.responseStatus }
- responseCallback(responseStatus)
- }
-}
-
-object DelayedProduceMetrics {
- // Changing the package or class name may cause incompatibility with
existing code and metrics configuration
- private val metricsPackage = "kafka.server"
- private val metricsClassName = "DelayedProduceMetrics"
- private val metricsGroup = new KafkaMetricsGroup(metricsPackage,
metricsClassName)
-
- private val aggregateExpirationMeter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
-
- private val partitionExpirationMeters = new
ConcurrentHashMap[TopicPartition, Meter]
-
- def recordExpiration(partition: TopicPartition): Unit = {
- aggregateExpirationMeter.mark()
- partitionExpirationMeters.computeIfAbsent(partition, key =>
metricsGroup.newMeter("ExpiresPerSec",
- "requests",
- TimeUnit.SECONDS,
- Map("topic" -> key.topic, "partition" ->
key.partition.toString).asJava)).mark()
- }
-
- def removePartitionMetrics(partition: TopicPartition): Unit = {
- if (partitionExpirationMeters.remove(partition) != null) {
- metricsGroup.removeMetric("ExpiresPerSec",
- Map("topic" -> partition.topic, "partition" ->
partition.partition.toString).asJava)
- }
- }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 00b3b11495a..76a45415acc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1830,7 +1830,7 @@ class KafkaApis(val requestChannel: RequestChannel,
entriesPerPartition = controlRecords,
requestLocal = requestLocal,
responseCallback = errors => {
- errors.foreachEntry { (topicIdPartition, partitionResponse) =>
+ errors.forEach { (topicIdPartition, partitionResponse) =>
addResultAndMaybeComplete(topicIdPartition.topicPartition(),
partitionResponse.error)
}
},
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 84c0cd593c1..8bccb4dc8e4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -49,6 +49,7 @@ import org.apache.kafka.image.{LocalReplicaChanges,
MetadataImage, TopicsDelta}
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
+import org.apache.kafka.server.purgatory.DelayedProduce.ProducePartitionStatus
import org.apache.kafka.server.LogAppendResult.LogAppendSummary
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal,
StopPartition, TransactionVersion}
import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -57,7 +58,8 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.partition.PartitionListener
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets,
DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus,
TopicPartitionOperationKey}
+import
org.apache.kafka.server.purgatory.DelayedProduce.PartitionStatusValidator.Result
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch,
DelayedRemoteListOffsets, DeleteRecordsPartitionStatus,
ListOffsetsPartitionStatus, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey,
DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
@@ -79,6 +81,7 @@ import java.util.{Collections, Optional, OptionalInt,
OptionalLong}
import java.util.function.Consumer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.jdk.CollectionConverters._
+import scala.jdk.FunctionConverters.enrichAsJavaConsumer
import scala.jdk.OptionConverters.RichOptional
object ReplicaManager {
@@ -409,7 +412,7 @@ class ReplicaManager(val config: KafkaConfig,
completeDelayedOperationsWhenNotPartitionLeader(topicPartition, topicId)
// Clean up per-partition expiration metrics regardless of whether the
local log
// is deleted. This covers both partition deletion and reassignment
(leader -> follower).
- DelayedProduceMetrics.removePartitionMetrics(topicPartition)
+ DelayedProduce.removePartitionMetrics(topicPartition)
DelayedRemoteListOffsets.removePartitionMetrics(topicPartition)
}
@@ -636,7 +639,7 @@ class ReplicaManager(val config: KafkaConfig,
internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
- responseCallback: Map[TopicIdPartition, PartitionResponse]
=> Unit,
+ responseCallback: util.Map[TopicIdPartition,
PartitionResponse] => Unit,
recordValidationStatsCallback: Map[TopicIdPartition,
RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.noCaching,
verificationGuards: Map[TopicPartition, VerificationGuard]
= Map.empty,
@@ -756,8 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
val preAppendPartitionResponses =
buildProducePartitionStatus(errorResults).map { case (k, status) => k ->
status.responseStatus }
- def newResponseCallback(responses: Map[TopicIdPartition,
PartitionResponse]): Unit = {
- responseCallback(preAppendPartitionResponses ++ responses)
+ def newResponseCallback(responses: util.Map[TopicIdPartition,
PartitionResponse]): Unit = {
+ responseCallback(preAppendPartitionResponses ++ responses.asScala)
}
appendRecords(
@@ -832,7 +835,7 @@ class ReplicaManager(val config: KafkaConfig,
results: Map[TopicIdPartition, LogAppendResult]
): Map[TopicIdPartition, ProducePartitionStatus] = {
results.map { case (topicIdPartition, result) =>
- topicIdPartition -> ProducePartitionStatus(
+ topicIdPartition -> new ProducePartitionStatus(
result.logAppendSummary.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
@@ -877,12 +880,26 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
initialAppendResults: Map[TopicIdPartition, LogAppendResult],
initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus],
- responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
+ responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
): Unit = {
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition,
initialAppendResults)) {
- // create delayed produce operation
- val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
- val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata,
this, responseCallback)
+ // Create delayed produce operation
+ //
+ // This delegate is invoked by DelayedProduce to verify if the produce
operation can be completed.
+ // Defined here to provide access to ReplicaManager#getPartitionOrError,
which is otherwise inaccessible to the caller.
+ def delegate(tp: TopicPartition, requiredOffset: Long) : Result = {
+ val (hasEnough, error) = getPartitionOrError(tp).fold(
+ // Please refer to the documentation in
`DelayedProduce#tryComplete` for a comprehensive description of these cases.
+ // Case A or Case B
+ err => (false, err),
+
+ // Case B or Case C
+ partition =>
partition.checkEnoughReplicasReachOffset(requiredOffset))
+
+ new Result(hasEnough, error)
+ }
+
+ val delayedProduce = new DelayedProduce(timeoutMs,
initialProduceStatus.asJava, delegate, responseCallback.asJava)
// create a list of (topic, partition) pairs to use as keys for this
delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new
TopicPartitionOperationKey(_)).toList
@@ -893,23 +910,25 @@ class ReplicaManager(val config: KafkaConfig,
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce,
producerRequestKeys.asJava)
} else {
// we can respond immediately
- val produceResponseStatus = initialProduceStatus.map { case (k, status)
=> k -> status.responseStatus }
+ val produceResponseStatus = new util.HashMap[TopicIdPartition,
PartitionResponse]
+ initialProduceStatus.foreach { case (k, status) =>
produceResponseStatus.put(k, status.responseStatus) }
responseCallback(produceResponseStatus)
}
}
private def sendInvalidRequiredAcksResponse(
entries: Map[TopicIdPartition, MemoryRecords],
- responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit): Unit
= {
+ responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit):
Unit = {
// If required.acks is outside accepted range, something is wrong with the
client
// Just return an error and don't handle the request at all
- val responseStatus = entries.map { case (topicIdPartition, _) =>
- topicIdPartition -> new PartitionResponse(
- Errors.INVALID_REQUIRED_ACKS,
- LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
- RecordBatch.NO_TIMESTAMP,
- LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
- )
+ val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]
+ entries.foreach { case(topicIdPartition, _) =>
+ responseStatus.put(topicIdPartition, new PartitionResponse(
+ Errors.INVALID_REQUIRED_ACKS,
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+ RecordBatch.NO_TIMESTAMP,
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset)
+ )
}
responseCallback(responseStatus)
}
@@ -2519,7 +2538,7 @@ class ReplicaManager(val config: KafkaConfig,
partitionsToStartFetching.foreach{ case (topicPartition, partition) =>
completeDelayedOperationsWhenNotPartitionLeader(topicPartition,
partition.topicId)
// Clean up per-partition expiration metrics when transitioning from
leader to follower.
- DelayedProduceMetrics.removePartitionMetrics(topicPartition)
+ DelayedProduce.removePartitionMetrics(topicPartition)
DelayedRemoteListOffsets.removePartitionMetrics(topicPartition)
}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 70e04c94b20..b25bca7357a 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -427,10 +427,10 @@ class LocalLeaderEndPointTest extends Logging {
origin: AppendOrigin = AppendOrigin.CLIENT,
requiredAcks: Short = -1):
CallbackResult[PartitionResponse] = {
val result = new CallbackResult[PartitionResponse]()
- def appendCallback(responses: scala.collection.Map[TopicIdPartition,
PartitionResponse]): Unit = {
+ def appendCallback(responses: JMap[TopicIdPartition, PartitionResponse]):
Unit = {
val response = responses.get(partition)
- assertTrue(response.isDefined)
- result.fire(response.get)
+ assertNotNull(response)
+ result.fire(response)
}
replicaManager.appendRecords(
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 3aeb3e89b1a..2fdd251992a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -56,7 +56,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager, RequestLocal}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.partition.{AlterPartitionListener,
OngoingReassignmentState, PartitionListener, PendingShrinkIsr,
SimpleAssignmentState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index a2bf4ad3537..28343beeb0d 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -32,8 +32,9 @@ import
org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.MetadataCache
+import org.apache.kafka.server.purgatory.DelayedProduce.ProducePartitionStatus
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets,
TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch,
DelayedRemoteListOffsets, TopicPartitionOperationKey}
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
@@ -43,6 +44,7 @@ import org.mockito.Mockito.{mock, when, withSettings}
import scala.collection._
import scala.jdk.CollectionConverters._
+import scala.jdk.FunctionConverters.enrichAsJavaConsumer
abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember]
extends Logging {
val nThreads = 5
@@ -214,7 +216,7 @@ object AbstractCoordinatorConcurrencyTest {
internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicIdPartition,
MemoryRecords],
- responseCallback: Map[TopicIdPartition,
PartitionResponse] => Unit,
+ responseCallback:
java.util.Map[TopicIdPartition, PartitionResponse] => Unit,
processingStatsCallback: Map[TopicIdPartition,
RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal =
RequestLocal.noCaching,
verificationGuards: Map[TopicPartition,
VerificationGuard] = Map.empty,
@@ -222,11 +224,14 @@ object AbstractCoordinatorConcurrencyTest {
if (entriesPerPartition.isEmpty)
return
- val produceMetadata = ProduceMetadata(1, entriesPerPartition.map {
+ val produceStatus = entriesPerPartition.map {
case (tp, _) =>
- (tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE,
0L, RecordBatch.NO_TIMESTAMP, 0L)))
- })
- val delayedProduce = new DelayedProduce(5, produceMetadata, this,
responseCallback) {
+ (tp, new ProducePartitionStatus(0L, new
PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+ }.asJava
+
+ // It is safe to set the third parameter to null because it is only used
in tryComplete().
+ // In this test, we override the original implementation and do not use
that parameter at all.
+ val delayedProduce = new DelayedProduce(5, produceStatus, null,
responseCallback.asJava) {
// Complete produce requests after a few attempts to trigger delayed
produce from different threads
val completeAttempts = new AtomicInteger
override def tryComplete(): Boolean = {
@@ -239,7 +244,7 @@ object AbstractCoordinatorConcurrencyTest {
responseCallback(entriesPerPartition.map {
case (tp, _) =>
(tp, new PartitionResponse(Errors.NONE, 0L,
RecordBatch.NO_TIMESTAMP, 0L))
- })
+ }.asJava)
}
}
val producerRequestKeys = entriesPerPartition.keys.map(new
TopicPartitionOperationKey(_))
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 3d42de28110..84c58f40c87 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -1106,7 +1106,7 @@ class TransactionStateManagerTest {
capturedAppends: mutable.Map[TopicIdPartition,
mutable.Buffer[MemoryRecords]]
): Unit = {
val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
- val callbackCapture: ArgumentCaptor[Map[TopicIdPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] =>
Unit])
+ val callbackCapture: ArgumentCaptor[util.Map[TopicIdPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse]
=> Unit])
when(replicaManager.appendRecords(
anyLong(),
@@ -1130,7 +1130,7 @@ class TransactionStateManagerTest {
batches += records
topicPartition -> new PartitionResponse(appendError, 0L,
RecordBatch.NO_TIMESTAMP, 0L)
- }.toMap
+ }.toMap.asJava
))
}
@@ -1261,7 +1261,7 @@ class TransactionStateManagerTest {
private def prepareForTxnMessageAppend(error: Errors): Unit = {
reset(replicaManager)
- val capturedArgument: ArgumentCaptor[Map[TopicIdPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] =>
Unit])
+ val capturedArgument: ArgumentCaptor[util.Map[TopicIdPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse]
=> Unit])
when(replicaManager.appendRecords(anyLong(),
anyShort(),
internalTopicsAllowed = ArgumentMatchers.eq(true),
@@ -1273,8 +1273,8 @@ class TransactionStateManagerTest {
any(),
any()
)).thenAnswer(_ => capturedArgument.getValue.apply(
- Map(new TopicIdPartition(transactionTopicId, partitionId,
TRANSACTION_STATE_TOPIC_NAME) ->
- new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+ util.Map.of(new TopicIdPartition(transactionTopicId, partitionId,
TRANSACTION_STATE_TOPIC_NAME),
+ new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
)
when(replicaManager.topicIdPartition(new
TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 0))).thenReturn(new
TopicIdPartition(transactionTopicId, 0, TRANSACTION_STATE_TOPIC_NAME))
when(replicaManager.topicIdPartition(new
TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 1))).thenReturn(new
TopicIdPartition(transactionTopicId, 1, TRANSACTION_STATE_TOPIC_NAME))
diff --git a/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
b/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
index 1afe9b3fecd..25a044c6d08 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedProduceTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.purgatory.DelayedProduce
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions._
@@ -37,7 +38,7 @@ class DelayedProduceTest {
val partition = new TopicPartition("test-topic", 0)
// Record an expiration so the partition metric is created
- DelayedProduceMetrics.recordExpiration(partition)
+ DelayedProduce.recordExpiration(partition)
// Verify the partition metric exists in the registry
val metricsBefore =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@@ -53,7 +54,7 @@ class DelayedProduceTest {
!name.getMBeanName.contains("topic="))
// Remove the partition metric
- DelayedProduceMetrics.removePartitionMetrics(partition)
+ DelayedProduce.removePartitionMetrics(partition)
// Verify the partition metric is removed from the registry
val metricsAfter =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@@ -77,6 +78,6 @@ class DelayedProduceTest {
val partition = new TopicPartition("nonexistent-topic", 0)
// Should not throw when removing a partition that was never recorded
- DelayedProduceMetrics.removePartitionMetrics(partition)
+ DelayedProduce.removePartitionMetrics(partition)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 48268685da2..f6cc60bd46f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3205,7 +3205,7 @@ class KafkaApisTest extends Logging {
val expectedErrors = util.Map.of(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION,
tp2, Errors.NONE)
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] =
ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
- val responseCallback: ArgumentCaptor[Map[TopicIdPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] =>
Unit])
+ val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition, PartitionResponse]
=> Unit])
when(replicaManager.onlinePartition(tp1))
.thenReturn(None)
@@ -3223,7 +3223,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(requestLocal),
any(),
any()
- )).thenAnswer(_ => responseCallback.getValue.apply(Map(new
TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE))))
+ )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new
TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE))))
kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal)
verify(requestChannel).sendResponse(
@@ -3346,8 +3346,8 @@ class KafkaApisTest extends Logging {
val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition,
MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
- val responseCallback: ArgumentCaptor[Map[TopicIdPartition,
PartitionResponse] => Unit] =
- ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse]
=> Unit])
+ val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition,
PartitionResponse] => Unit] =
+ ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition,
PartitionResponse] => Unit])
when(replicaManager.appendRecords(
ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
@@ -3364,7 +3364,7 @@ class KafkaApisTest extends Logging {
responseCallback.getValue.apply(
entriesPerPartition.getValue.keySet.map { tp =>
tp -> new PartitionResponse(Errors.NONE)
- }.toMap
+ }.toMap.asJava
)
}
kafkaApis = createKafkaApis()
@@ -3517,8 +3517,8 @@ class KafkaApisTest extends Logging {
// Set up appendRecords to simulate epoch validation failure
val entriesPerPartition: ArgumentCaptor[Map[TopicIdPartition,
MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]])
- val responseCallback: ArgumentCaptor[Map[TopicIdPartition,
PartitionResponse] => Unit] =
- ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse]
=> Unit])
+ val responseCallback: ArgumentCaptor[util.Map[TopicIdPartition,
PartitionResponse] => Unit] =
+ ArgumentCaptor.forClass(classOf[util.Map[TopicIdPartition,
PartitionResponse] => Unit])
when(replicaManager.appendRecords(
ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
@@ -3535,7 +3535,7 @@ class KafkaApisTest extends Logging {
// Simulate epoch validation failure by calling callback with
INVALID_PRODUCER_EPOCH error
val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
responseCallback.getValue.apply(
- Map(topicIdPartition -> new
PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))
+ util.Map.of(topicIdPartition, new
PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))
)
}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 0b699f9ae08..cc532d05e00 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -296,10 +296,14 @@ class ReplicaManagerConcurrencyTest extends Logging {
val future = new CompletableFuture[ProduceResponse.PartitionResponse]()
val topicIdPartition: common.TopicIdPartition =
replicaManager.topicIdPartition(topicPartition)
- def produceCallback(results: collection.Map[common.TopicIdPartition,
ProduceResponse.PartitionResponse]): Unit = {
+ def produceCallback(results: util.Map[common.TopicIdPartition,
ProduceResponse.PartitionResponse]): Unit = {
try {
assertEquals(1, results.size)
- val (topicPartition, result) = results.head
+
+ val entry = results.entrySet().iterator().next()
+ val topicPartition = entry.getKey
+ val result = entry.getValue
+
assertEquals(topicIdPartition, topicPartition)
assertEquals(Errors.NONE, result.error)
future.complete(result)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 515cb65ee02..b83495ea3db 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -66,8 +66,8 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
+import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, DelayedRemoteFetch,
DelayedRemoteListOffsets}
import org.apache.kafka.server.{HostedPartition, PartitionFetchState}
-import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets}
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
@@ -261,8 +261,8 @@ class ReplicaManagerTest {
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager)
try {
- def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]):
Unit = {
- assert(responseStatus.values.head.error ==
Errors.INVALID_REQUIRED_ACKS)
+ def callback(responseStatus: util.Map[TopicIdPartition,
PartitionResponse]): Unit = {
+ assert(responseStatus.values().iterator().next().error ==
Errors.INVALID_REQUIRED_ACKS)
}
rm.appendRecords(
timeout = 0,
@@ -2459,8 +2459,8 @@ class ReplicaManagerTest {
numOfRecords: Int
): AtomicReference[PartitionResponse] = {
val produceResult = new AtomicReference[PartitionResponse]()
- def callback(response: Map[TopicIdPartition, PartitionResponse]): Unit = {
- produceResult.set(response(topicPartition))
+ def callback(response: util.Map[TopicIdPartition, PartitionResponse]):
Unit = {
+ produceResult.set(response.get(topicPartition))
}
val records = MemoryRecords.withRecords(
@@ -2706,10 +2706,10 @@ class ReplicaManagerTest {
transactionVersion: Short =
TransactionVersion.TV_UNKNOWN): CallbackResult[PartitionResponse] = {
val result = new CallbackResult[PartitionResponse]()
val topicIdPartition = new TopicIdPartition(topicId, partition)
- def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]):
Unit = {
+ def appendCallback(responses: util.Map[TopicIdPartition,
PartitionResponse]): Unit = {
val response = responses.get(topicIdPartition)
- assertTrue(response.isDefined)
- result.fire(response.get)
+ assertNotNull(response)
+ result.fire(response)
}
replicaManager.appendRecords(
diff --git
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
new file mode 100644
index 00000000000..711c45cff73
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * A delayed produce operation that can be created by the replica manager and
watched
+ * in the produce operation purgatory
+ */
+public class DelayedProduce extends DelayedOperation {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DelayedProduce.class);
+
+ // Changing the package or class name may cause incompatibility with
existing code and metrics configuration
+ private static final String METRICS_PACKAGE = "kafka.server";
+ private static final String METRICS_CLASS_NAME = "DelayedProduceMetrics";
+ private static final KafkaMetricsGroup METRICS_GROUP = new
KafkaMetricsGroup(METRICS_PACKAGE, METRICS_CLASS_NAME);
+ private static final Meter AGGREGATE_EXPIRATION_METER =
METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
+ private static final ConcurrentHashMap<TopicPartition, Meter>
PARTITION_EXPIRATION_METERS = new ConcurrentHashMap<>();
+
+ public static final class ProducePartitionStatus {
+ private final long requiredOffset;
+ private final PartitionResponse responseStatus;
+
+ private volatile boolean acksPending;
+
+ public ProducePartitionStatus(long requiredOffset, PartitionResponse
responseStatus) {
+ this.requiredOffset = requiredOffset;
+ this.responseStatus = responseStatus;
+ }
+
+ public PartitionResponse responseStatus() {
+ return responseStatus;
+ }
+
+ private void setAcksPending(boolean acksPending) {
+ this.acksPending = acksPending;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "[acksPending: %s, error: %s, startOffset: %s,
requiredOffset: %d]",
+ acksPending,
+ responseStatus.error.code(),
+ responseStatus.baseOffset,
+ requiredOffset
+ );
+ }
+ }
+
+ @FunctionalInterface
+ public interface PartitionStatusValidator {
+ record Result(boolean hasEnough, Errors error) { }
+ /**
+ * Validates the status of a partition and its replicas to determine
+ * if a delayed produce operation can be completed.
+ *
+ * @param topicPartition The partition to check.
+ * @param requiredOffset The offset that replicas must reach.
+ * @return A result with a Boolean (hasEnoughReplicas) and the Error
code.
+ */
+ Result validate(TopicPartition topicPartition, long requiredOffset);
+ }
+
+ private final Map<TopicIdPartition, ProducePartitionStatus> produceStatus;
+ private final PartitionStatusValidator statusValidator;
+ private final Consumer<Map<TopicIdPartition, PartitionResponse>>
responseCallback;
+
+ public DelayedProduce(long delayMs,
+ Map<TopicIdPartition, ProducePartitionStatus>
produceStatus,
+ PartitionStatusValidator statusValidator,
+ Consumer<Map<TopicIdPartition, PartitionResponse>>
responseCallback) {
+ super(delayMs);
+
+ this.produceStatus = produceStatus;
+ this.statusValidator = statusValidator;
+ this.responseCallback = responseCallback;
+
+ // first update the acks pending variable according to the error code
+ produceStatus.forEach((topicPartition, status) -> {
+ if (status.responseStatus.error == Errors.NONE) {
+ // Timeout error state will be cleared when required acks are
received
+ status.acksPending = true;
+ status.responseStatus.error = Errors.REQUEST_TIMED_OUT;
+ } else {
+ status.acksPending = false;
+ }
+
+ LOGGER.trace("Initial partition status for {} is {}",
topicPartition, status);
+ });
+ }
+
+ /**
+ * The delayed produce operation can be completed if every partition
+ * it produces to is satisfied by one of the following:
+ *
+ * Case A: Replica not assigned to partition
+ * Case B: Replica is no longer the leader of this partition
+ * Case C: This broker is the leader:
+ * C.1 - If there was a local error thrown while checking if at least
requiredAcks
+ * replicas have caught up to this operation: set an error in
response
+ * C.2 - Otherwise, set the response with no error.
+ *
+ * These cases were originally validated by some methods in the
ReplicaManager.
+ * However, since DelayedProduce has been moved to the server module, it
cannot directly access the ReplicaManager.
+ * Therefore, these validations have been delegated to the method within
`ReplicaManager#maybeAddDelayedProduce()`.
+ */
+ @Override
+ public boolean tryComplete() {
+ // check for each partition if it still has pending acks
+ produceStatus.forEach((topicIdPartition, status) -> {
+ LOGGER.trace("Checking produce satisfaction for {}, current status
{}", topicIdPartition, status);
+ // skip those partitions that have already been satisfied
+ if (status.acksPending) {
+ // Delegate to `ReplicaManager#maybeAddDelayedProduce`
+ // Validate Cases A, B, or C
+ PartitionStatusValidator.Result result =
statusValidator.validate(topicIdPartition.topicPartition(),
status.requiredOffset);
+
+ // Update the partition status to reflect Case A, B, or C:
+ Errors errors = result.error;
+ if (errors != Errors.NONE || result.hasEnough()) {
+ status.setAcksPending(false);
+ status.responseStatus.error = errors;
+ }
+ }
+ });
+
+ // check if every partition has satisfied at least one of case A, B or
C
+ boolean anyPending = false;
+ for (ProducePartitionStatus status : produceStatus.values()) {
+ if (status.acksPending) {
+ anyPending = true;
+ break;
+ }
+ }
+ if (!anyPending) {
+ return forceComplete();
+ }
+
+ return false;
+ }
+
+ @Override
+ public void onExpiration() {
+ produceStatus.forEach((topicIdPartition, status) -> {
+ if (status.acksPending) {
+ LOGGER.debug("Expiring produce request for partition {} with
status {}", topicIdPartition, status);
+ recordExpiration(topicIdPartition.topicPartition());
+ }
+ });
+ }
+
+ /**
+ * Upon completion, return the current response status along with the
error code per partition
+ */
+ @Override
+ public void onComplete() {
+ Map<TopicIdPartition, PartitionResponse> responseStatus = new
HashMap<>();
+
+ for (Map.Entry<TopicIdPartition, ProducePartitionStatus> entry :
produceStatus.entrySet()) {
+ responseStatus.put(entry.getKey(),
entry.getValue().responseStatus());
+ }
+
+ responseCallback.accept(responseStatus);
+ }
+
+ public static void recordExpiration(TopicPartition partition) {
+ AGGREGATE_EXPIRATION_METER.mark();
+ PARTITION_EXPIRATION_METERS.computeIfAbsent(partition,
+ key -> METRICS_GROUP.newMeter("ExpiresPerSec",
+ "requests",
+ TimeUnit.SECONDS,
+ Map.of("topic", key.topic(), "partition",
String.valueOf(key.partition())))
+ ).mark();
+ }
+
+ public static void removePartitionMetrics(TopicPartition partition) {
+ if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
+ METRICS_GROUP.removeMetric("ExpiresPerSec",
+ Map.of("topic", partition.topic(),
+ "partition",
String.valueOf(partition.partition())));
+ }
+ }
+}