This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdf7258  KAFKA-10832: Fix Log to use the correct ProducerStateManager 
instance when updating producers (#9718)
cdf7258 is described below

commit cdf725828bc247078458de403b17190a9da07496
Author: Kowshik Prakasam <[email protected]>
AuthorDate: Fri Dec 11 16:34:46 2020 -0800

    KAFKA-10832: Fix Log to use the correct ProducerStateManager instance when 
updating producers (#9718)
    
    Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala | 57 +++++++++++++++++----------------
 1 file changed, 30 insertions(+), 27 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 9677dc4..2a86021 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -935,7 +935,7 @@ class Log(@volatile private var _dir: File,
             maxPosition = maxPosition,
             minOneMessage = false)
           if (fetchDataInfo != null)
-            loadProducersFromLog(producerStateManager, fetchDataInfo.records)
+            loadProducersFromRecords(producerStateManager, 
fetchDataInfo.records)
         }
       }
       producerStateManager.updateMapEndOffset(lastOffset)
@@ -948,22 +948,6 @@ class Log(@volatile private var _dir: File,
     maybeIncrementFirstUnstableOffset()
   }
 
-  private def loadProducersFromLog(producerStateManager: ProducerStateManager, 
records: Records): Unit = {
-    val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
-    val completedTxns = ListBuffer.empty[CompletedTxn]
-    records.batches.forEach { batch =>
-      if (batch.hasProducerId) {
-        val maybeCompletedTxn = updateProducers(batch,
-          loadedProducers,
-          firstOffsetMetadata = None,
-          origin = AppendOrigin.Replication)
-        maybeCompletedTxn.foreach(completedTxns += _)
-      }
-    }
-    loadedProducers.values.foreach(producerStateManager.update)
-    completedTxns.foreach(producerStateManager.completeTxn)
-  }
-
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {
     producerStateManager.activeProducers.map { case (producerId, 
producerIdEntry) =>
       (producerId, producerIdEntry.lastSeq)
@@ -1349,7 +1333,7 @@ class Log(@volatile private var _dir: File,
         else
           None
 
-        val maybeCompletedTxn = updateProducers(batch, updatedProducers, 
firstOffsetMetadata, origin)
+        val maybeCompletedTxn = updateProducers(producerStateManager, batch, 
updatedProducers, firstOffsetMetadata, origin)
         maybeCompletedTxn.foreach(completedTxns += _)
       }
 
@@ -1456,15 +1440,6 @@ class Log(@volatile private var _dir: File,
       RecordConversionStats.EMPTY, sourceCodec, targetCodec, 
shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
   }
 
-  private def updateProducers(batch: RecordBatch,
-                              producers: mutable.Map[Long, ProducerAppendInfo],
-                              firstOffsetMetadata: Option[LogOffsetMetadata],
-                              origin: AppendOrigin): Option[CompletedTxn] = {
-    val producerId = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(producerId, 
producerStateManager.prepareUpdate(producerId, origin))
-    appendInfo.append(batch, firstOffsetMetadata)
-  }
-
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
    *
@@ -2670,6 +2645,34 @@ object Log {
   private def isLogFile(file: File): Boolean =
     file.getPath.endsWith(LogFileSuffix)
 
+  private def loadProducersFromRecords(producerStateManager: 
ProducerStateManager, records: Records): Unit = {
+    val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
+    val completedTxns = ListBuffer.empty[CompletedTxn]
+    records.batches.forEach { batch =>
+      if (batch.hasProducerId) {
+        val maybeCompletedTxn = updateProducers(
+          producerStateManager,
+          batch,
+          loadedProducers,
+          firstOffsetMetadata = None,
+          origin = AppendOrigin.Replication)
+        maybeCompletedTxn.foreach(completedTxns += _)
+      }
+    }
+    loadedProducers.values.foreach(producerStateManager.update)
+    completedTxns.foreach(producerStateManager.completeTxn)
+  }
+
+  private def updateProducers(producerStateManager: ProducerStateManager,
+                              batch: RecordBatch,
+                              producers: mutable.Map[Long, ProducerAppendInfo],
+                              firstOffsetMetadata: Option[LogOffsetMetadata],
+                              origin: AppendOrigin): Option[CompletedTxn] = {
+    val producerId = batch.producerId
+    val appendInfo = producers.getOrElseUpdate(producerId, 
producerStateManager.prepareUpdate(producerId, origin))
+    appendInfo.append(batch, firstOffsetMetadata)
+  }
+
 }
 
 object LogMetricNames {

Reply via email to