This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f0ab89 MINOR: Ensure a reason is logged for all segment deletion
operations (#9110)
6f0ab89 is described below
commit 6f0ab89b9204e86cd49d44d6b9f97ee1939c50c6
Author: Dhruvil Shah <[email protected]>
AuthorDate: Fri Aug 7 10:59:41 2020 -0700
MINOR: Ensure a reason is logged for all segment deletion operations (#9110)
This PR improves the logging for segment deletion to ensure that a reason
is logged for segment deletions via all code paths. It also updates the logging
so that we log a reason for an entire batch of deletions instead of logging one
message per segment in cases when segment-level details are not significant.
Reviewers: Ismael Juma <[email protected]>, Kowshik Prakasam
<[email protected]>, Jason Gustafson <[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 143 ++++++++++++++++---------
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
2 files changed, 95 insertions(+), 50 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 7dccd5f..fa78666 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -795,7 +795,9 @@ class Log(@volatile private var _dir: File,
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
warn(s"Corruption found in segment ${segment.baseOffset}, truncating
to offset ${segment.readNextOffset}")
- removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
+ removeAndDeleteSegments(unflushed.toList,
+ asyncDelete = true,
+ reason = LogRecovery)
truncated = true
}
}
@@ -806,7 +808,9 @@ class Log(@volatile private var _dir: File,
if (logEndOffset < logStartOffset) {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) is
smaller than logStartOffset ($logStartOffset). " +
"This could happen if segment files were deleted from the file
system.")
- removeAndDeleteSegments(logSegments, asyncDelete = true)
+ removeAndDeleteSegments(logSegments,
+ asyncDelete = true,
+ reason = LogRecovery)
}
}
@@ -1697,16 +1701,18 @@ class Log(@volatile private var _dir: File,
* (if there is one) and returns true iff it is deletable
* @return The number of segments deleted
*/
- private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) =>
Boolean) = {
+ private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) =>
Boolean,
+ reason: SegmentDeletionReason): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
- if (deletable.nonEmpty) {
- deleteSegments(deletable)
- } else 0
+ if (deletable.nonEmpty)
+ deleteSegments(deletable, reason)
+ else
+ 0
}
}
- private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
+ private def deleteSegments(deletable: Iterable[LogSegment], reason:
SegmentDeletionReason): Int = {
maybeHandleIOException(s"Error while deleting segments for $topicPartition
in dir ${dir.getParent}") {
val numToDelete = deletable.size
if (numToDelete > 0) {
@@ -1716,7 +1722,7 @@ class Log(@volatile private var _dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
- removeAndDeleteSegments(deletable, asyncDelete = true)
+ removeAndDeleteSegments(deletable, asyncDelete = true, reason)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset,
SegmentDeletion)
}
}
@@ -1779,57 +1785,34 @@ class Log(@volatile private var _dir: File,
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
- def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment])
= {
- if (startMs - segment.largestTimestamp > config.retentionMs) {
- segment.largestRecordTimestamp match {
- case Some(ts) =>
- info(s"Segment with base offset ${segment.baseOffset} will be
deleted due to" +
- s" retention time ${config.retentionMs}ms breach based on the
largest record timestamp from the" +
- s" segment, which is $ts")
- case None =>
- info(s"Segment with base offset ${segment.baseOffset} will be
deleted due to" +
- s" retention time ${config.retentionMs}ms breach based on the
last modified timestamp from the" +
- s" segment, which is ${segment.lastModified}")
- }
- true
- } else {
- false
- }
+ def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]):
Boolean = {
+ startMs - segment.largestTimestamp > config.retentionMs
}
- deleteOldSegments(shouldDelete)
+ deleteOldSegments(shouldDelete, RetentionMsBreach)
}
private def deleteRetentionSizeBreachedSegments(): Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
var diff = size - config.retentionSize
- def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment])
= {
+ def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]):
Boolean = {
if (diff - segment.size >= 0) {
diff -= segment.size
- info(s"Segment with base offset ${segment.baseOffset} will be deleted
due to" +
- s" retention size ${config.retentionSize} bytes breach. Segment size
is" +
- s" ${segment.size} and total log size after deletion will be ${size
- diff}")
true
} else {
false
}
}
- deleteOldSegments(shouldDelete)
+ deleteOldSegments(shouldDelete, RetentionSizeBreach)
}
private def deleteLogStartOffsetBreachedSegments(): Int = {
- def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment])
= {
- if (nextSegmentOpt.exists(_.baseOffset <= logStartOffset)) {
- info(s"Segment with base offset ${segment.baseOffset} will be deleted
due to" +
- s" startOffset breach. logStartOffset is $logStartOffset")
- true
- } else {
- false
- }
+ def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]):
Boolean = {
+ nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
}
- deleteOldSegments(shouldDelete)
+ deleteOldSegments(shouldDelete, StartOffsetBreach)
}
def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
@@ -1920,7 +1903,7 @@ class Log(@volatile private var _dir: File,
s"=max(provided offset = $expectedNextOffset, LEO =
$logEndOffset) while it already " +
s"exists and is active with size 0. Size of time index:
${activeSegment.timeIndex.entries}," +
s" size of offset index:
${activeSegment.offsetIndex.entries}.")
- removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true)
+ removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true,
LogRoll)
} else {
throw new KafkaException(s"Trying to roll a new log segment for
topic partition $topicPartition with start offset $newOffset" +
s" =max(provided offset =
$expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
@@ -2052,7 +2035,7 @@ class Log(@volatile private var _dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
producerExpireCheck.cancel(true)
- removeAndDeleteSegments(logSegments, asyncDelete = false)
+ removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion)
leaderEpochCache.foreach(_.clear())
Utils.delete(dir)
// File handlers will be closed if this log is deleted
@@ -2103,7 +2086,7 @@ class Log(@volatile private var _dir: File,
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset >
targetOffset)
- removeAndDeleteSegments(deletable, asyncDelete = true)
+ removeAndDeleteSegments(deletable, asyncDelete = true,
LogTruncation)
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
updateLogStartOffset(math.min(targetOffset, this.logStartOffset))
@@ -2126,7 +2109,7 @@ class Log(@volatile private var _dir: File,
debug(s"Truncate and start at offset $newOffset")
lock synchronized {
checkIfMemoryMappedBufferClosed()
- removeAndDeleteSegments(logSegments, asyncDelete = true)
+ removeAndDeleteSegments(logSegments, asyncDelete = true, LogTruncation)
addSegment(LogSegment.open(dir,
baseOffset = newOffset,
config = config,
@@ -2227,13 +2210,16 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted
asynchronously
*/
- private def removeAndDeleteSegments(segments: Iterable[LogSegment],
asyncDelete: Boolean): Unit = {
+ private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+ asyncDelete: Boolean,
+ reason: SegmentDeletionReason): Unit = {
if (segments.nonEmpty) {
lock synchronized {
// As most callers hold an iterator into the `segments` collection and
`removeAndDeleteSegment` mutates it by
// removing the deleted segment, we should force materialization of
the iterator here, so that results of the
// iteration remain valid and deterministic.
val toDelete = segments.toList
+ reason.logReason(this, toDelete)
toDelete.foreach { segment =>
this.segments.remove(segment.baseOffset)
}
@@ -2256,18 +2242,16 @@ class Log(@volatile private var _dir: File,
segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
def deleteSegments(): Unit = {
- info(s"Deleting segments ${segments.mkString(",")}")
+ info(s"Deleting segment files ${segments.mkString(",")}")
maybeHandleIOException(s"Error while deleting segments for
$topicPartition in dir ${dir.getParent}") {
segments.foreach(_.deleteIfExists())
}
}
- if (asyncDelete) {
- info(s"Scheduling segments for deletion ${segments.mkString(",")}")
+ if (asyncDelete)
scheduler.schedule("delete-file", () => deleteSegments(), delay =
config.fileDeleteDelayMs)
- } else {
+ else
deleteSegments()
- }
}
/**
@@ -2686,3 +2670,64 @@ object LogMetricNames {
List(NumLogSegments, LogStartOffset, LogEndOffset, Size)
}
}
+
+sealed trait SegmentDeletionReason {
+ def logReason(log: Log, toDelete: List[LogSegment]): Unit
+}
+
+case object RetentionMsBreach extends SegmentDeletionReason {
+ override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+ val retentionMs = log.config.retentionMs
+ toDelete.foreach { segment =>
+ segment.largestRecordTimestamp match {
+ case Some(_) =>
+ log.info(s"Deleting segment $segment due to retention time
${retentionMs}ms breach based on the largest " +
+ s"record timestamp in the segment")
+ case None =>
+ log.info(s"Deleting segment $segment due to retention time
${retentionMs}ms breach based on the " +
+ s"last modified time of the segment")
+ }
+ }
+ }
+}
+
+case object RetentionSizeBreach extends SegmentDeletionReason {
+ override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+ var size = log.size
+ toDelete.foreach { segment =>
+ size -= segment.size
+ log.info(s"Deleting segment $segment due to retention size
${log.config.retentionSize} breach. Log size " +
+ s"after deletion will be $size.")
+ }
+ }
+}
+
+case object StartOffsetBreach extends SegmentDeletionReason {
+ override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+ log.info(s"Deleting segments due to log start offset ${log.logStartOffset}
breach: ${toDelete.mkString(",")}")
+ }
+}
+
+case object LogRecovery extends SegmentDeletionReason {
+ override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+ log.info(s"Deleting segments as part of log recovery:
${toDelete.mkString(",")}")
+ }
+}
+
+case object LogTruncation extends SegmentDeletionReason {
+ override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+ log.info(s"Deleting segments as part of log truncation:
${toDelete.mkString(",")}")
+ }
+}
+
+case object LogRoll extends SegmentDeletionReason {
+ override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+ log.info(s"Deleting segments as part of log roll:
${toDelete.mkString(",")}")
+ }
+}
+
+case object LogDeletion extends SegmentDeletionReason {
+ override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+ log.info(s"Deleting segments as the log has been deleted:
${toDelete.mkString(",")}")
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala
b/core/src/main/scala/kafka/log/LogSegment.scala
index ce5d48c..6c0f8f9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -413,7 +413,7 @@ class LogSegment private[log] (val log: FileRecords,
override def toString: String = "LogSegment(baseOffset=" + baseOffset +
", size=" + size +
", lastModifiedTime=" + lastModified +
- ", largestTime=" + largestTimestamp +
+ ", largestRecordTimestamp=" + largestRecordTimestamp +
")"
/**