This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdf7258 KAFKA-10832: Fix Log to use the correct ProducerStateManager
instance when updating producers (#9718)
cdf7258 is described below
commit cdf725828bc247078458de403b17190a9da07496
Author: Kowshik Prakasam <[email protected]>
AuthorDate: Fri Dec 11 16:34:46 2020 -0800
KAFKA-10832: Fix Log to use the correct ProducerStateManager instance when
updating producers (#9718)
Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 57 +++++++++++++++++----------------
1 file changed, 30 insertions(+), 27 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 9677dc4..2a86021 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -935,7 +935,7 @@ class Log(@volatile private var _dir: File,
maxPosition = maxPosition,
minOneMessage = false)
if (fetchDataInfo != null)
- loadProducersFromLog(producerStateManager, fetchDataInfo.records)
+ loadProducersFromRecords(producerStateManager,
fetchDataInfo.records)
}
}
producerStateManager.updateMapEndOffset(lastOffset)
@@ -948,22 +948,6 @@ class Log(@volatile private var _dir: File,
maybeIncrementFirstUnstableOffset()
}
- private def loadProducersFromLog(producerStateManager: ProducerStateManager,
records: Records): Unit = {
- val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
- val completedTxns = ListBuffer.empty[CompletedTxn]
- records.batches.forEach { batch =>
- if (batch.hasProducerId) {
- val maybeCompletedTxn = updateProducers(batch,
- loadedProducers,
- firstOffsetMetadata = None,
- origin = AppendOrigin.Replication)
- maybeCompletedTxn.foreach(completedTxns += _)
- }
- }
- loadedProducers.values.foreach(producerStateManager.update)
- completedTxns.foreach(producerStateManager.completeTxn)
- }
-
private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock
synchronized {
producerStateManager.activeProducers.map { case (producerId,
producerIdEntry) =>
(producerId, producerIdEntry.lastSeq)
@@ -1349,7 +1333,7 @@ class Log(@volatile private var _dir: File,
else
None
- val maybeCompletedTxn = updateProducers(batch, updatedProducers,
firstOffsetMetadata, origin)
+ val maybeCompletedTxn = updateProducers(producerStateManager, batch,
updatedProducers, firstOffsetMetadata, origin)
maybeCompletedTxn.foreach(completedTxns += _)
}
@@ -1456,15 +1440,6 @@ class Log(@volatile private var _dir: File,
RecordConversionStats.EMPTY, sourceCodec, targetCodec,
shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
}
- private def updateProducers(batch: RecordBatch,
- producers: mutable.Map[Long, ProducerAppendInfo],
- firstOffsetMetadata: Option[LogOffsetMetadata],
- origin: AppendOrigin): Option[CompletedTxn] = {
- val producerId = batch.producerId
- val appendInfo = producers.getOrElseUpdate(producerId,
producerStateManager.prepareUpdate(producerId, origin))
- appendInfo.append(batch, firstOffsetMetadata)
- }
-
/**
* Trim any invalid bytes from the end of this message set (if there are any)
*
@@ -2670,6 +2645,34 @@ object Log {
private def isLogFile(file: File): Boolean =
file.getPath.endsWith(LogFileSuffix)
+ private def loadProducersFromRecords(producerStateManager:
ProducerStateManager, records: Records): Unit = {
+ val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
+ val completedTxns = ListBuffer.empty[CompletedTxn]
+ records.batches.forEach { batch =>
+ if (batch.hasProducerId) {
+ val maybeCompletedTxn = updateProducers(
+ producerStateManager,
+ batch,
+ loadedProducers,
+ firstOffsetMetadata = None,
+ origin = AppendOrigin.Replication)
+ maybeCompletedTxn.foreach(completedTxns += _)
+ }
+ }
+ loadedProducers.values.foreach(producerStateManager.update)
+ completedTxns.foreach(producerStateManager.completeTxn)
+ }
+
+ private def updateProducers(producerStateManager: ProducerStateManager,
+ batch: RecordBatch,
+ producers: mutable.Map[Long, ProducerAppendInfo],
+ firstOffsetMetadata: Option[LogOffsetMetadata],
+ origin: AppendOrigin): Option[CompletedTxn] = {
+ val producerId = batch.producerId
+ val appendInfo = producers.getOrElseUpdate(producerId,
producerStateManager.prepareUpdate(producerId, origin))
+ appendInfo.append(batch, firstOffsetMetadata)
+ }
+
}
object LogMetricNames {