This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5dafe22fed MINOR:Fill missing parameter annotations for LogCleaner 
methods (#13839)
d5dafe22fed is described below

commit d5dafe22fed244d25cca8839c41221fab87d367e
Author: hudeqi <[email protected]>
AuthorDate: Wed Jun 21 21:54:32 2023 +0800

    MINOR:Fill missing parameter annotations for LogCleaner methods (#13839)
    
    
    Reviewers: Josep Prat <[email protected]>
    ---------
    
    Co-authored-by: Deqi Hu <[email protected]>
---
 core/src/main/scala/kafka/log/LogCleaner.scala | 129 ++++++++++++++++++++++---
 core/src/main/scala/kafka/log/LogSegment.scala |   1 -
 2 files changed, 116 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4eef6119a80..78afdc86b40 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -91,6 +91,7 @@ import scala.util.control.ControlThrowable
  * @param initialConfig Initial configuration parameters for the cleaner. 
Actual config may be dynamically updated.
  * @param logDirs The directories where offset checkpoints reside
  * @param logs The pool of logs
+ * @param logDirFailureChannel The channel used to add offline log dirs that 
may be encountered when cleaning the log
  * @param time A way to control the passage of time
  */
 class LogCleaner(initialConfig: CleanerConfig,
@@ -149,7 +150,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
 
   /**
-   * Start the background cleaning
+   * Start the background cleaner threads
    */
   def startup(): Unit = {
     info("Starting the log cleaner")
@@ -161,7 +162,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Stop the background cleaning
+   * Stop the background cleaner threads
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
@@ -173,14 +174,25 @@ class LogCleaner(initialConfig: CleanerConfig,
     }
   }
 
+  /**
+   * Remove metrics
+   */
   def removeMetrics(): Unit = {
     LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
   }
 
+  /**
+   * @return A set of configs that is reconfigurable in LogCleaner
+   */
   override def reconfigurableConfigs: Set[String] = {
     LogCleaner.ReconfigurableConfigs
   }
 
+  /**
+   * Validate the new cleaner threads num is reasonable
+   *
+   * @param newConfig A submitted new KafkaConfig instance that contains new 
cleaner config
+   */
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads
     val currentThreads = config.numThreads
@@ -194,11 +206,14 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-    * Reconfigure log clean config. The will:
-    * 1. update desiredRatePerSec in Throttler with 
logCleanerIoMaxBytesPerSecond, if necessary
-    * 2. stop current log cleaners and create new ones.
-    * That ensures that if any of the cleaners had failed, new cleaners are 
created to match the new config.
-    */
+   * Reconfigure log clean config. The will:
+   * 1. update desiredRatePerSec in Throttler with 
logCleanerIoMaxBytesPerSecond, if necessary
+   * 2. stop current log cleaners and create new ones.
+   * That ensures that if any of the cleaners had failed, new cleaners are 
created to match the new config.
+   *
+   * @param oldConfig the old log cleaner config
+   * @param newConfig the new log cleaner config reconfigured
+   */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
 
@@ -215,6 +230,8 @@ class LogCleaner(initialConfig: CleanerConfig,
   /**
    *  Abort the cleaning of a particular partition, if it's in progress. This 
call blocks until the cleaning of
    *  the partition is aborted.
+   *
+   *  @param topicPartition The topic and partition to abort cleaning
    */
   def abortCleaning(topicPartition: TopicPartition): Unit = {
     cleanerManager.abortCleaning(topicPartition)
@@ -222,20 +239,28 @@ class LogCleaner(initialConfig: CleanerConfig,
 
   /**
    * Update checkpoint file to remove partitions if necessary.
+   *
+   * @param dataDir The data dir to be updated if necessary
+   * @param partitionToRemove The topicPartition to be removed, default none
    */
   def updateCheckpoints(dataDir: File, partitionToRemove: 
Option[TopicPartition] = None): Unit = {
     cleanerManager.updateCheckpoints(dataDir, partitionToRemove = 
partitionToRemove)
   }
 
   /**
-   * alter the checkpoint directory for the topicPartition, to remove the data 
in sourceLogDir, and add the data in destLogDir
+   * Alter the checkpoint directory for the `topicPartition`, to remove the 
data in `sourceLogDir`, and add the data in `destLogDir`
+   * Generally occurs when the disk balance ends and replaces the previous 
file with the future file
+   *
+   * @param topicPartition The topic and partition to alter checkpoint
+   * @param sourceLogDir The source log dir to remove checkpoint
+   * @param destLogDir The dest log dir to remove checkpoint
    */
   def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, 
destLogDir: File): Unit = {
     cleanerManager.alterCheckpointDir(topicPartition, sourceLogDir, destLogDir)
   }
 
   /**
-   * Stop cleaning logs in the provided directory
+   * Stop cleaning logs in the provided directory when handling log dir failure
    *
    * @param dir     the absolute path of the log dir
    */
@@ -244,7 +269,11 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Truncate cleaner offset checkpoint for the given partition if its 
checkpointed offset is larger than the given offset
+   * Truncate cleaner offset checkpoint for the given partition if its 
checkpoint offset is larger than the given offset
+   *
+   * @param dataDir The data dir to be truncated if necessary
+   * @param topicPartition The topic and partition to truncate checkpoint 
offset
+   * @param offset The given offset to be compared
    */
   def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, 
offset: Long): Unit = {
     cleanerManager.maybeTruncateCheckpoint(dataDir, topicPartition, offset)
@@ -253,14 +282,18 @@ class LogCleaner(initialConfig: CleanerConfig,
   /**
    *  Abort the cleaning of a particular partition if it's in progress, and 
pause any future cleaning of this partition.
    *  This call blocks until the cleaning of the partition is aborted and 
paused.
+   *
+   *  @param topicPartition The topic and partition to abort and pause cleaning
    */
   def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
     cleanerManager.abortAndPauseCleaning(topicPartition)
   }
 
   /**
-    *  Resume the cleaning of paused partitions.
-    */
+   *  Resume the cleaning of paused partitions.
+   *
+   *  @param topicPartitions The collection of topicPartitions to be resumed 
cleaning
+   */
   def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
     cleanerManager.resumeCleaning(topicPartitions)
   }
@@ -289,6 +322,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   /**
     * To prevent race between retention and compaction,
     * retention threads need to make this call to obtain:
+   *
     * @return A list of log partitions that retention threads can safely work 
on
     */
   def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, 
UnifiedLog)] = {
@@ -327,6 +361,11 @@ class LogCleaner(initialConfig: CleanerConfig,
     @volatile var lastStats: CleanerStats = new CleanerStats()
     @volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
 
+    /**
+     *  Check if the cleaning for a partition is aborted. If so, throw an 
exception.
+     *
+     *  @param topicPartition The topic and partition to check
+     */
     private def checkDone(topicPartition: TopicPartition): Unit = {
       if (!isRunning)
         throw new ThreadShutdownException
@@ -347,6 +386,7 @@ class LogCleaner(initialConfig: CleanerConfig,
 
     /**
      * Cleans a log if there is a dirty log available
+     *
      * @return whether a log was cleaned
      */
     private def tryCleanFilthiestLog(): Boolean = {
@@ -417,6 +457,12 @@ class LogCleaner(initialConfig: CleanerConfig,
 
     /**
      * Log out statistics on a single run of the cleaner.
+     *
+     * @param id The cleaner thread id
+     * @param name The cleaned log name
+     * @param from The cleaned offset that is the first dirty offset to begin
+     * @param to The cleaned offset that is the first not cleaned offset to end
+     * @param stats The statistics for this round of cleaning
      */
     def recordStats(id: Int, name: String, from: Long, to: Long, stats: 
CleanerStats): Unit = {
       this.lastStats = stats
@@ -532,6 +578,14 @@ private[log] class Cleaner(val id: Int,
     doClean(cleanable, time.milliseconds())
   }
 
+  /**
+   * Clean the given log
+   *
+   * @param cleanable The log to be cleaned
+   * @param currentTime The current timestamp for doing cleaning
+   *
+   * @return The first offset not cleaned and the statistics for this round of 
cleaning
+   * */
   private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, 
CleanerStats) = {
     info("Beginning cleaning of log %s".format(cleanable.log.name))
 
@@ -667,6 +721,8 @@ private[log] class Cleaner(val id: Int,
    * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
    * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
+   * @param transactionMetadata The state of ongoing transactions which is 
carried between the cleaning of the grouped segments
+   * @param lastRecordsOfActiveProducers The active producers and its last 
data offset
    * @param stats Collector for cleaning statistics
    * @param currentTime The time at which the clean was initiated
    */
@@ -778,6 +834,11 @@ private[log] class Cleaner(val id: Int,
    *   1. A compacted topic using compression may contain a message set 
slightly larger than max.message.bytes
    *   2. max.message.bytes of a topic could have been reduced after writing 
larger messages
    * In these cases, grow the buffer to hold the next batch.
+   *
+   * @param sourceRecords The dirty log segment records to process
+   * @param position The current position in the read buffer to read from
+   * @param maxLogMessageSize The maximum record size in bytes for the topic
+   * @param memoryRecords The memory records in read buffer
    */
   private def growBuffersOrFail(sourceRecords: FileRecords,
                                 position: Int,
@@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
     growBuffers(maxSize)
   }
 
+  /**
+   * Check if a batch should be discard by cleaned transaction state
+   *
+   * @param batch The batch of records to check
+   * @param transactionMetadata The maintained transaction state about cleaning
+   *
+   * @return if the batch can be discarded
+   */
   private def shouldDiscardBatch(batch: RecordBatch,
                                  transactionMetadata: 
CleanedTransactionMetadata): Boolean = {
     if (batch.isControlBatch)
@@ -811,6 +880,18 @@ private[log] class Cleaner(val id: Int,
       transactionMetadata.onBatchRead(batch)
   }
 
+  /**
+   * Check if a record should be retained
+   *
+   * @param map The offset map(key=>offset) to use for cleaning segments
+   * @param retainDeletesForLegacyRecords Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param batch The batch of records that the record belongs to
+   * @param record The record to check
+   * @param stats The collector for cleaning statistics
+   * @param currentTime The current time that used to compare with the delete 
horizon time of the batch when judging a non-legacy record
+   *
+   * @return if the record  can be retained
+   */
   private def shouldRetainRecord(map: OffsetMap,
                                  retainDeletesForLegacyRecords: Boolean,
                                  batch: RecordBatch,
@@ -847,6 +928,8 @@ private[log] class Cleaner(val id: Int,
 
   /**
    * Double the I/O buffer capacity
+   *
+   * @param maxLogMessageSize The maximum record size in bytes allowed
    */
   def growBuffers(maxLogMessageSize: Int): Unit = {
     val maxBufferSize = math.max(maxLogMessageSize, maxIoBufferSize)
@@ -876,6 +959,7 @@ private[log] class Cleaner(val id: Int,
    * @param segments The log segments to group
    * @param maxSize the maximum size in bytes for the total of all log data in 
a group
    * @param maxIndexSize the maximum size in bytes for the total of all index 
data in a group
+   * @param firstUncleanableOffset The upper(exclusive) offset to clean to
    *
    * @return A list of grouped segments
    */
@@ -915,7 +999,8 @@ private[log] class Cleaner(val id: Int,
     * the base offset of the next segment in the list.
     * If the next segment doesn't exist, first Uncleanable Offset will be used.
     *
-    * @param segs - remaining segments to group.
+    * @param segs Remaining segments to group.
+   *  @param firstUncleanableOffset The upper(exclusive) offset to clean to
     * @return The estimated last offset for the first segment in segs
     */
   private def lastOffsetForFirstSegment(segs: List[LogSegment], 
firstUncleanableOffset: Long): Long = {
@@ -972,8 +1057,13 @@ private[log] class Cleaner(val id: Int,
   /**
    * Add the messages in the given segment to the offset map
    *
+   * @param topicPartition The topic and partition of the log segment to build 
offset
    * @param segment The segment to index
    * @param map The map in which to store the key=>offset mapping
+   * @param startOffset The offset at which dirty messages begin
+   * @param nextSegmentStartOffset The base offset for next segment when 
building current segment
+   * @param maxLogMessageSize The maximum size in bytes for record allowed
+   * @param transactionMetadata The state of ongoing transactions for the log 
between offset range to build
    * @param stats Collector for cleaning statistics
    *
    * @return If the map was filled whilst loading from this segment
@@ -1152,6 +1242,11 @@ private[log] class CleanedTransactionMetadata {
   // Output cleaned index to write retained aborted transactions
   var cleanedIndex: Option[TransactionIndex] = None
 
+  /**
+   * Update the cleaned transaction state with the new found aborted 
transactions that has just been traversed.
+   *
+   * @param abortedTransactions The new found aborted transactions to add
+   */
   def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
     this.abortedTransactions ++= abortedTransactions
   }
@@ -1159,6 +1254,10 @@ private[log] class CleanedTransactionMetadata {
   /**
    * Update the cleaned transaction state with a control batch that has just 
been traversed by the cleaner.
    * Return true if the control batch can be discarded.
+   *
+   * @param controlBatch The control batch that been traversed
+   *
+   * @return True if the control batch can be discarded
    */
   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
@@ -1200,6 +1299,10 @@ private[log] class CleanedTransactionMetadata {
   /**
    * Update the transactional state for the incoming non-control batch. If the 
batch is part of
    * an aborted transaction, return true to indicate that it is safe to 
discard.
+   *
+   * @param batch The batch to read when updating the transactional state
+   *
+   * @return Whether the batch is part of an aborted transaction or not
    */
   def onBatchRead(batch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(batch.lastOffset)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 21a2a8c58fd..bc07df1ddc1 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -140,7 +140,6 @@ class LogSegment private[log] (val log: FileRecords,
    * @param largestTimestamp The largest timestamp in the message set.
    * @param shallowOffsetOfMaxTimestamp The offset of the message that has the 
largest timestamp in the messages to append.
    * @param records The log entries to append.
-   * @return the physical position in the file of the appended records
    * @throws LogSegmentOffsetOverflowException if the largest offset causes 
index offset overflow
    */
   @nonthreadsafe

Reply via email to