junrao commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r919529062
##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -366,25 +392,30 @@ class LogManager(logDirs: Seq[File],
val numLogsLoaded = new AtomicInteger(0)
numTotalLogs += logsToLoad.length
+ updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length)
+
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
try {
debug(s"Loading log $logDir")
val logLoadStartMs = time.hiResClockMs()
val log = loadLog(logDir, hadCleanShutdown, recoveryPoints,
logStartOffsets,
- defaultConfig, topicConfigOverrides)
+ defaultConfig, topicConfigOverrides, numRemainingSegments)
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
val currentNumLoaded = numLogsLoaded.incrementAndGet()
info(s"Completed load of $log with ${log.numberOfSegments}
segments in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} loaded in
$logDirAbsolutePath)")
+ println("!!! numRemainingSegments:" + numRemainingSegments + ";"
+ log)
} catch {
case e: IOException =>
handleIOException(logDirAbsolutePath, e)
case e: KafkaStorageException if
e.getCause.isInstanceOf[IOException] =>
// KafkaStorageException might be thrown, ex: during writing
LeaderEpochFileCache
// And while converting IOException to KafkaStorageException,
we've already handled the exception. So we can ignore it here.
+ } finally {
+ updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length -
numLogsLoaded.get())
Review Comment:
Hmm, how do we handle concurrency here? It's possible that one thread
completes and calls numLogsLoaded.get(), but before it could call
updateNumRemainingLogs(), another thread completes and calls
updateNumRemainingLogs(). Then this thread continue to call
updateNumRemainingLogs(), which updates numRemainingLogs with an outdated count.
##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -90,6 +90,10 @@ class LogManager(logDirs: Seq[File],
private val _liveLogDirs: ConcurrentLinkedQueue[File] =
createAndValidateLogDirs(logDirs, initialOfflineDirs)
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir =
recoveryThreadsPerDataDir
+ // log dir path -> number of Remaining logs map for remainingLogsToRecover
metric
+ private val numRemainingLogs: ConcurrentMap[String, Int] = new
ConcurrentHashMap[String, Int]
+ // log recovery thread name -> number of remaining segments map for
remainingSegmentsToRecover metric
+ private val numRemainingSegments: ConcurrentMap[String, Int] = new
ConcurrentHashMap[String, Int]
Review Comment:
Could we make these two maps local in loadLogs()?
##########
core/src/main/scala/kafka/log/LogLoader.scala:
##########
@@ -424,9 +432,11 @@ class LogLoader(
// we had an invalid message, delete all remaining log
warn(s"Corruption found in segment ${segment.baseOffset}," +
s" truncating to offset ${segment.readNextOffset}")
- removeAndDeleteSegmentsAsync(unflushed.toList)
+ removeAndDeleteSegmentsAsync(unflushedIter.toList)
Review Comment:
This will iterate unflushedIter. If we don't account for that, remaining
segments won't be accurate.
##########
core/src/main/scala/kafka/log/LogLoader.scala:
##########
@@ -404,12 +406,18 @@ class LogLoader(
// If we have the clean shutdown marker, skip recovery.
if (!hadCleanShutdown) {
- val unflushed = segments.values(recoveryPointCheckpoint,
Long.MaxValue).iterator
+ val unflushed = segments.values(recoveryPointCheckpoint, Long.MaxValue)
+ val unflushedSize = unflushed.size
Review Comment:
To be consistent with numFlushed, this probably should be named numUnflushed?
##########
core/src/main/scala/kafka/log/LogLoader.scala:
##########
@@ -77,7 +78,8 @@ class LogLoader(
logStartOffsetCheckpoint: Long,
recoveryPointCheckpoint: Long,
leaderEpochCache: Option[LeaderEpochFileCache],
- producerStateManager: ProducerStateManager
+ producerStateManager: ProducerStateManager,
+ numRemainingSegments: ConcurrentMap[String, Int] = new
ConcurrentHashMap[String, Int]
Review Comment:
Could we add the new param to javadoc?
##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -366,25 +392,30 @@ class LogManager(logDirs: Seq[File],
val numLogsLoaded = new AtomicInteger(0)
numTotalLogs += logsToLoad.length
+ updateNumRemainingLogs(dir.getAbsolutePath, logsToLoad.length)
+
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
try {
debug(s"Loading log $logDir")
val logLoadStartMs = time.hiResClockMs()
val log = loadLog(logDir, hadCleanShutdown, recoveryPoints,
logStartOffsets,
- defaultConfig, topicConfigOverrides)
+ defaultConfig, topicConfigOverrides, numRemainingSegments)
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
val currentNumLoaded = numLogsLoaded.incrementAndGet()
info(s"Completed load of $log with ${log.numberOfSegments}
segments in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} loaded in
$logDirAbsolutePath)")
+ println("!!! numRemainingSegments:" + numRemainingSegments + ";"
+ log)
Review Comment:
Is this needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]