This is an automated email from the ASF dual-hosted git repository.
jlprat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5dafe22fed MINOR:Fill missing parameter annotations for LogCleaner
methods (#13839)
d5dafe22fed is described below
commit d5dafe22fed244d25cca8839c41221fab87d367e
Author: hudeqi <[email protected]>
AuthorDate: Wed Jun 21 21:54:32 2023 +0800
MINOR:Fill missing parameter annotations for LogCleaner methods (#13839)
Reviewers: Josep Prat <[email protected]>
---------
Co-authored-by: Deqi Hu <[email protected]>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 129 ++++++++++++++++++++++---
core/src/main/scala/kafka/log/LogSegment.scala | 1 -
2 files changed, 116 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4eef6119a80..78afdc86b40 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -91,6 +91,7 @@ import scala.util.control.ControlThrowable
* @param initialConfig Initial configuration parameters for the cleaner.
Actual config may be dynamically updated.
* @param logDirs The directories where offset checkpoints reside
* @param logs The pool of logs
+ * @param logDirFailureChannel The channel used to add offline log dirs that
may be encountered when cleaning the log
* @param time A way to control the passage of time
*/
class LogCleaner(initialConfig: CleanerConfig,
@@ -149,7 +150,7 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
/**
- * Start the background cleaning
+ * Start the background cleaner threads
*/
def startup(): Unit = {
info("Starting the log cleaner")
@@ -161,7 +162,7 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
- * Stop the background cleaning
+ * Stop the background cleaner threads
*/
def shutdown(): Unit = {
info("Shutting down the log cleaner.")
@@ -173,14 +174,25 @@ class LogCleaner(initialConfig: CleanerConfig,
}
}
+ /**
+ * Remove metrics
+ */
def removeMetrics(): Unit = {
LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
}
+ /**
+ * @return A set of configs that is reconfigurable in LogCleaner
+ */
override def reconfigurableConfigs: Set[String] = {
LogCleaner.ReconfigurableConfigs
}
+ /**
+ * Validate the new cleaner threads num is reasonable
+ *
+ * @param newConfig A submitted new KafkaConfig instance that contains new
cleaner config
+ */
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val numThreads = LogCleaner.cleanerConfig(newConfig).numThreads
val currentThreads = config.numThreads
@@ -194,11 +206,14 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
- * Reconfigure log clean config. The will:
- * 1. update desiredRatePerSec in Throttler with
logCleanerIoMaxBytesPerSecond, if necessary
- * 2. stop current log cleaners and create new ones.
- * That ensures that if any of the cleaners had failed, new cleaners are
created to match the new config.
- */
+ * Reconfigure log clean config. The will:
+ * 1. update desiredRatePerSec in Throttler with
logCleanerIoMaxBytesPerSecond, if necessary
+ * 2. stop current log cleaners and create new ones.
+ * That ensures that if any of the cleaners had failed, new cleaners are
created to match the new config.
+ *
+ * @param oldConfig the old log cleaner config
+ * @param newConfig the new log cleaner config reconfigured
+ */
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
config = LogCleaner.cleanerConfig(newConfig)
@@ -215,6 +230,8 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Abort the cleaning of a particular partition, if it's in progress. This
call blocks until the cleaning of
* the partition is aborted.
+ *
+ * @param topicPartition The topic and partition to abort cleaning
*/
def abortCleaning(topicPartition: TopicPartition): Unit = {
cleanerManager.abortCleaning(topicPartition)
@@ -222,20 +239,28 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Update checkpoint file to remove partitions if necessary.
+ *
+ * @param dataDir The data dir to be updated if necessary
+ * @param partitionToRemove The topicPartition to be removed, default none
*/
def updateCheckpoints(dataDir: File, partitionToRemove:
Option[TopicPartition] = None): Unit = {
cleanerManager.updateCheckpoints(dataDir, partitionToRemove =
partitionToRemove)
}
/**
- * alter the checkpoint directory for the topicPartition, to remove the data
in sourceLogDir, and add the data in destLogDir
+ * Alter the checkpoint directory for the `topicPartition`, to remove the
data in `sourceLogDir`, and add the data in `destLogDir`
+ * Generally occurs when the disk balance ends and replaces the previous
file with the future file
+ *
+ * @param topicPartition The topic and partition to alter checkpoint
+ * @param sourceLogDir The source log dir to remove checkpoint
+ * @param destLogDir The dest log dir to remove checkpoint
*/
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File,
destLogDir: File): Unit = {
cleanerManager.alterCheckpointDir(topicPartition, sourceLogDir, destLogDir)
}
/**
- * Stop cleaning logs in the provided directory
+ * Stop cleaning logs in the provided directory when handling log dir failure
*
* @param dir the absolute path of the log dir
*/
@@ -244,7 +269,11 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
- * Truncate cleaner offset checkpoint for the given partition if its
checkpointed offset is larger than the given offset
+ * Truncate cleaner offset checkpoint for the given partition if its
checkpoint offset is larger than the given offset
+ *
+ * @param dataDir The data dir to be truncated if necessary
+ * @param topicPartition The topic and partition to truncate checkpoint
offset
+ * @param offset The given offset to be compared
*/
def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition,
offset: Long): Unit = {
cleanerManager.maybeTruncateCheckpoint(dataDir, topicPartition, offset)
@@ -253,14 +282,18 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Abort the cleaning of a particular partition if it's in progress, and
pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and
paused.
+ *
+ * @param topicPartition The topic and partition to abort and pause cleaning
*/
def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
cleanerManager.abortAndPauseCleaning(topicPartition)
}
/**
- * Resume the cleaning of paused partitions.
- */
+ * Resume the cleaning of paused partitions.
+ *
+ * @param topicPartitions The collection of topicPartitions to be resumed
cleaning
+ */
def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
cleanerManager.resumeCleaning(topicPartitions)
}
@@ -289,6 +322,7 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* To prevent race between retention and compaction,
* retention threads need to make this call to obtain:
+ *
* @return A list of log partitions that retention threads can safely work
on
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition,
UnifiedLog)] = {
@@ -327,6 +361,11 @@ class LogCleaner(initialConfig: CleanerConfig,
@volatile var lastStats: CleanerStats = new CleanerStats()
@volatile var lastPreCleanStats: PreCleanStats = new PreCleanStats()
+ /**
+ * Check if the cleaning for a partition is aborted. If so, throw an
exception.
+ *
+ * @param topicPartition The topic and partition to check
+ */
private def checkDone(topicPartition: TopicPartition): Unit = {
if (!isRunning)
throw new ThreadShutdownException
@@ -347,6 +386,7 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Cleans a log if there is a dirty log available
+ *
* @return whether a log was cleaned
*/
private def tryCleanFilthiestLog(): Boolean = {
@@ -417,6 +457,12 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Log out statistics on a single run of the cleaner.
+ *
+ * @param id The cleaner thread id
+ * @param name The cleaned log name
+ * @param from The cleaned offset that is the first dirty offset to begin
+ * @param to The cleaned offset that is the first not cleaned offset to end
+ * @param stats The statistics for this round of cleaning
*/
def recordStats(id: Int, name: String, from: Long, to: Long, stats:
CleanerStats): Unit = {
this.lastStats = stats
@@ -532,6 +578,14 @@ private[log] class Cleaner(val id: Int,
doClean(cleanable, time.milliseconds())
}
+ /**
+ * Clean the given log
+ *
+ * @param cleanable The log to be cleaned
+ * @param currentTime The current timestamp for doing cleaning
+ *
+ * @return The first offset not cleaned and the statistics for this round of
cleaning
+ * */
private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long,
CleanerStats) = {
info("Beginning cleaning of log %s".format(cleanable.log.name))
@@ -667,6 +721,8 @@ private[log] class Cleaner(val id: Int,
* @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than
version 2) and markers be retained while cleaning this segment
* @param deleteRetentionMs Defines how long a tombstone should be kept as
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding
topic
+ * @param transactionMetadata The state of ongoing transactions which is
carried between the cleaning of the grouped segments
+ * @param lastRecordsOfActiveProducers The active producers and its last
data offset
* @param stats Collector for cleaning statistics
* @param currentTime The time at which the clean was initiated
*/
@@ -778,6 +834,11 @@ private[log] class Cleaner(val id: Int,
* 1. A compacted topic using compression may contain a message set
slightly larger than max.message.bytes
* 2. max.message.bytes of a topic could have been reduced after writing
larger messages
* In these cases, grow the buffer to hold the next batch.
+ *
+ * @param sourceRecords The dirty log segment records to process
+ * @param position The current position in the read buffer to read from
+ * @param maxLogMessageSize The maximum record size in bytes for the topic
+ * @param memoryRecords The memory records in read buffer
*/
private def growBuffersOrFail(sourceRecords: FileRecords,
position: Int,
@@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
growBuffers(maxSize)
}
+ /**
+ * Check if a batch should be discard by cleaned transaction state
+ *
+ * @param batch The batch of records to check
+ * @param transactionMetadata The maintained transaction state about cleaning
+ *
+ * @return if the batch can be discarded
+ */
private def shouldDiscardBatch(batch: RecordBatch,
transactionMetadata:
CleanedTransactionMetadata): Boolean = {
if (batch.isControlBatch)
@@ -811,6 +880,18 @@ private[log] class Cleaner(val id: Int,
transactionMetadata.onBatchRead(batch)
}
+ /**
+ * Check if a record should be retained
+ *
+ * @param map The offset map(key=>offset) to use for cleaning segments
+ * @param retainDeletesForLegacyRecords Should tombstones (lower than
version 2) and markers be retained while cleaning this segment
+ * @param batch The batch of records that the record belongs to
+ * @param record The record to check
+ * @param stats The collector for cleaning statistics
+ * @param currentTime The current time that used to compare with the delete
horizon time of the batch when judging a non-legacy record
+ *
+ * @return if the record can be retained
+ */
private def shouldRetainRecord(map: OffsetMap,
retainDeletesForLegacyRecords: Boolean,
batch: RecordBatch,
@@ -847,6 +928,8 @@ private[log] class Cleaner(val id: Int,
/**
* Double the I/O buffer capacity
+ *
+ * @param maxLogMessageSize The maximum record size in bytes allowed
*/
def growBuffers(maxLogMessageSize: Int): Unit = {
val maxBufferSize = math.max(maxLogMessageSize, maxIoBufferSize)
@@ -876,6 +959,7 @@ private[log] class Cleaner(val id: Int,
* @param segments The log segments to group
* @param maxSize the maximum size in bytes for the total of all log data in
a group
* @param maxIndexSize the maximum size in bytes for the total of all index
data in a group
+ * @param firstUncleanableOffset The upper(exclusive) offset to clean to
*
* @return A list of grouped segments
*/
@@ -915,7 +999,8 @@ private[log] class Cleaner(val id: Int,
* the base offset of the next segment in the list.
* If the next segment doesn't exist, first Uncleanable Offset will be used.
*
- * @param segs - remaining segments to group.
+ * @param segs Remaining segments to group.
+ * @param firstUncleanableOffset The upper(exclusive) offset to clean to
* @return The estimated last offset for the first segment in segs
*/
private def lastOffsetForFirstSegment(segs: List[LogSegment],
firstUncleanableOffset: Long): Long = {
@@ -972,8 +1057,13 @@ private[log] class Cleaner(val id: Int,
/**
* Add the messages in the given segment to the offset map
*
+ * @param topicPartition The topic and partition of the log segment to build
offset
* @param segment The segment to index
* @param map The map in which to store the key=>offset mapping
+ * @param startOffset The offset at which dirty messages begin
+ * @param nextSegmentStartOffset The base offset for next segment when
building current segment
+ * @param maxLogMessageSize The maximum size in bytes for record allowed
+ * @param transactionMetadata The state of ongoing transactions for the log
between offset range to build
* @param stats Collector for cleaning statistics
*
* @return If the map was filled whilst loading from this segment
@@ -1152,6 +1242,11 @@ private[log] class CleanedTransactionMetadata {
// Output cleaned index to write retained aborted transactions
var cleanedIndex: Option[TransactionIndex] = None
+ /**
+ * Update the cleaned transaction state with the new found aborted
transactions that has just been traversed.
+ *
+ * @param abortedTransactions The new found aborted transactions to add
+ */
def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
this.abortedTransactions ++= abortedTransactions
}
@@ -1159,6 +1254,10 @@ private[log] class CleanedTransactionMetadata {
/**
* Update the cleaned transaction state with a control batch that has just
been traversed by the cleaner.
* Return true if the control batch can be discarded.
+ *
+ * @param controlBatch The control batch that been traversed
+ *
+ * @return True if the control batch can be discarded
*/
def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(controlBatch.lastOffset)
@@ -1200,6 +1299,10 @@ private[log] class CleanedTransactionMetadata {
/**
* Update the transactional state for the incoming non-control batch. If the
batch is part of
* an aborted transaction, return true to indicate that it is safe to
discard.
+ *
+ * @param batch The batch to read when updating the transactional state
+ *
+ * @return Whether the batch is part of an aborted transaction or not
*/
def onBatchRead(batch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(batch.lastOffset)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala
b/core/src/main/scala/kafka/log/LogSegment.scala
index 21a2a8c58fd..bc07df1ddc1 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -140,7 +140,6 @@ class LogSegment private[log] (val log: FileRecords,
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the
largest timestamp in the messages to append.
* @param records The log entries to append.
- * @return the physical position in the file of the appended records
* @throws LogSegmentOffsetOverflowException if the largest offset causes
index offset overflow
*/
@nonthreadsafe