This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f0ab89  MINOR: Ensure a reason is logged for all segment deletion 
operations (#9110)
6f0ab89 is described below

commit 6f0ab89b9204e86cd49d44d6b9f97ee1939c50c6
Author: Dhruvil Shah <[email protected]>
AuthorDate: Fri Aug 7 10:59:41 2020 -0700

    MINOR: Ensure a reason is logged for all segment deletion operations (#9110)
    
    This PR improves the logging for segment deletion to ensure that a reason 
is logged for segment deletions via all code paths. It also updates the logging 
so that we log a reason for an entire batch of deletions instead of logging one 
message per segment in cases when segment-level details are not significant.
    
    Reviewers: Ismael Juma <[email protected]>, Kowshik Prakasam 
<[email protected]>, Jason Gustafson <[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala        | 143 ++++++++++++++++---------
 core/src/main/scala/kafka/log/LogSegment.scala |   2 +-
 2 files changed, 95 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 7dccd5f..fa78666 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -795,7 +795,9 @@ class Log(@volatile private var _dir: File,
         if (truncatedBytes > 0) {
           // we had an invalid message, delete all remaining log
           warn(s"Corruption found in segment ${segment.baseOffset}, truncating 
to offset ${segment.readNextOffset}")
-          removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
+          removeAndDeleteSegments(unflushed.toList,
+            asyncDelete = true,
+            reason = LogRecovery)
           truncated = true
         }
       }
@@ -806,7 +808,9 @@ class Log(@volatile private var _dir: File,
       if (logEndOffset < logStartOffset) {
         warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
           "This could happen if segment files were deleted from the file 
system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
+        removeAndDeleteSegments(logSegments,
+          asyncDelete = true,
+          reason = LogRecovery)
       }
     }
 
@@ -1697,16 +1701,18 @@ class Log(@volatile private var _dir: File,
    *                  (if there is one) and returns true iff it is deletable
    * @return The number of segments deleted
    */
-  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean) = {
+  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
+                                reason: SegmentDeletionReason): Int = {
     lock synchronized {
       val deletable = deletableSegments(predicate)
-      if (deletable.nonEmpty) {
-        deleteSegments(deletable)
-      } else 0
+      if (deletable.nonEmpty)
+        deleteSegments(deletable, reason)
+      else
+        0
     }
   }
 
-  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
+  private def deleteSegments(deletable: Iterable[LogSegment], reason: 
SegmentDeletionReason): Int = {
     maybeHandleIOException(s"Error while deleting segments for $topicPartition 
in dir ${dir.getParent}") {
       val numToDelete = deletable.size
       if (numToDelete > 0) {
@@ -1716,7 +1722,7 @@ class Log(@volatile private var _dir: File,
         lock synchronized {
           checkIfMemoryMappedBufferClosed()
           // remove the segments for lookups
-          removeAndDeleteSegments(deletable, asyncDelete = true)
+          removeAndDeleteSegments(deletable, asyncDelete = true, reason)
           
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, 
SegmentDeletion)
         }
       }
@@ -1779,57 +1785,34 @@ class Log(@volatile private var _dir: File,
     if (config.retentionMs < 0) return 0
     val startMs = time.milliseconds
 
-    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) 
= {
-      if (startMs - segment.largestTimestamp > config.retentionMs) {
-        segment.largestRecordTimestamp match {
-          case Some(ts) =>
-            info(s"Segment with base offset ${segment.baseOffset} will be 
deleted due to" +
-              s" retention time ${config.retentionMs}ms breach based on the 
largest record timestamp from the" +
-              s" segment, which is $ts")
-          case None =>
-            info(s"Segment with base offset ${segment.baseOffset} will be 
deleted due to" +
-              s" retention time ${config.retentionMs}ms breach based on the 
last modified timestamp from the" +
-              s" segment, which is ${segment.lastModified}")
-        }
-        true
-      } else {
-        false
-      }
+    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
+      startMs - segment.largestTimestamp > config.retentionMs
     }
 
-    deleteOldSegments(shouldDelete)
+    deleteOldSegments(shouldDelete, RetentionMsBreach)
   }
 
   private def deleteRetentionSizeBreachedSegments(): Int = {
     if (config.retentionSize < 0 || size < config.retentionSize) return 0
     var diff = size - config.retentionSize
-    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) 
= {
+    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
       if (diff - segment.size >= 0) {
         diff -= segment.size
-        info(s"Segment with base offset ${segment.baseOffset} will be deleted 
due to" +
-          s" retention size ${config.retentionSize} bytes breach. Segment size 
is" +
-          s" ${segment.size} and total log size after deletion will be ${size 
- diff}")
         true
       } else {
         false
       }
     }
 
-    deleteOldSegments(shouldDelete)
+    deleteOldSegments(shouldDelete, RetentionSizeBreach)
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
-    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) 
= {
-      if (nextSegmentOpt.exists(_.baseOffset <= logStartOffset)) {
-        info(s"Segment with base offset ${segment.baseOffset} will be deleted 
due to" +
-          s" startOffset breach. logStartOffset is $logStartOffset")
-        true
-      } else {
-        false
-      }
+    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
+      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
     }
 
-    deleteOldSegments(shouldDelete)
+    deleteOldSegments(shouldDelete, StartOffsetBreach)
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
@@ -1920,7 +1903,7 @@ class Log(@volatile private var _dir: File,
                  s"=max(provided offset = $expectedNextOffset, LEO = 
$logEndOffset) while it already " +
                  s"exists and is active with size 0. Size of time index: 
${activeSegment.timeIndex.entries}," +
                  s" size of offset index: 
${activeSegment.offsetIndex.entries}.")
-            removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true)
+            removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, 
LogRoll)
           } else {
             throw new KafkaException(s"Trying to roll a new log segment for 
topic partition $topicPartition with start offset $newOffset" +
                                      s" =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
@@ -2052,7 +2035,7 @@ class Log(@volatile private var _dir: File,
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
         producerExpireCheck.cancel(true)
-        removeAndDeleteSegments(logSegments, asyncDelete = false)
+        removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion)
         leaderEpochCache.foreach(_.clear())
         Utils.delete(dir)
         // File handlers will be closed if this log is deleted
@@ -2103,7 +2086,7 @@ class Log(@volatile private var _dir: File,
             truncateFullyAndStartAt(targetOffset)
           } else {
             val deletable = logSegments.filter(segment => segment.baseOffset > 
targetOffset)
-            removeAndDeleteSegments(deletable, asyncDelete = true)
+            removeAndDeleteSegments(deletable, asyncDelete = true, 
LogTruncation)
             activeSegment.truncateTo(targetOffset)
             updateLogEndOffset(targetOffset)
             updateLogStartOffset(math.min(targetOffset, this.logStartOffset))
@@ -2126,7 +2109,7 @@ class Log(@volatile private var _dir: File,
       debug(s"Truncate and start at offset $newOffset")
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
+        removeAndDeleteSegments(logSegments, asyncDelete = true, LogTruncation)
         addSegment(LogSegment.open(dir,
           baseOffset = newOffset,
           config = config,
@@ -2227,13 +2210,16 @@ class Log(@volatile private var _dir: File,
    * @param segments The log segments to schedule for deletion
    * @param asyncDelete Whether the segment files should be deleted 
asynchronously
    */
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+                                      asyncDelete: Boolean,
+                                      reason: SegmentDeletionReason): Unit = {
     if (segments.nonEmpty) {
       lock synchronized {
         // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
         // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
         // iteration remain valid and deterministic.
         val toDelete = segments.toList
+        reason.logReason(this, toDelete)
         toDelete.foreach { segment =>
           this.segments.remove(segment.baseOffset)
         }
@@ -2256,18 +2242,16 @@ class Log(@volatile private var _dir: File,
     segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
 
     def deleteSegments(): Unit = {
-      info(s"Deleting segments ${segments.mkString(",")}")
+      info(s"Deleting segment files ${segments.mkString(",")}")
       maybeHandleIOException(s"Error while deleting segments for 
$topicPartition in dir ${dir.getParent}") {
         segments.foreach(_.deleteIfExists())
       }
     }
 
-    if (asyncDelete) {
-      info(s"Scheduling segments for deletion ${segments.mkString(",")}")
+    if (asyncDelete)
       scheduler.schedule("delete-file", () => deleteSegments(), delay = 
config.fileDeleteDelayMs)
-    } else {
+    else
       deleteSegments()
-    }
   }
 
   /**
@@ -2686,3 +2670,64 @@ object LogMetricNames {
     List(NumLogSegments, LogStartOffset, LogEndOffset, Size)
   }
 }
+
+sealed trait SegmentDeletionReason {
+  def logReason(log: Log, toDelete: List[LogSegment]): Unit
+}
+
+case object RetentionMsBreach extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    val retentionMs = log.config.retentionMs
+    toDelete.foreach { segment =>
+      segment.largestRecordTimestamp match {
+        case Some(_) =>
+          log.info(s"Deleting segment $segment due to retention time 
${retentionMs}ms breach based on the largest " +
+            s"record timestamp in the segment")
+        case None =>
+          log.info(s"Deleting segment $segment due to retention time 
${retentionMs}ms breach based on the " +
+            s"last modified time of the segment")
+      }
+    }
+  }
+}
+
+case object RetentionSizeBreach extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    var size = log.size
+    toDelete.foreach { segment =>
+      size -= segment.size
+      log.info(s"Deleting segment $segment due to retention size 
${log.config.retentionSize} breach. Log size " +
+        s"after deletion will be $size.")
+    }
+  }
+}
+
+case object StartOffsetBreach extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    log.info(s"Deleting segments due to log start offset ${log.logStartOffset} 
breach: ${toDelete.mkString(",")}")
+  }
+}
+
+case object LogRecovery extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    log.info(s"Deleting segments as part of log recovery: 
${toDelete.mkString(",")}")
+  }
+}
+
+case object LogTruncation extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    log.info(s"Deleting segments as part of log truncation: 
${toDelete.mkString(",")}")
+  }
+}
+
+case object LogRoll extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    log.info(s"Deleting segments as part of log roll: 
${toDelete.mkString(",")}")
+  }
+}
+
+case object LogDeletion extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    log.info(s"Deleting segments as the log has been deleted: 
${toDelete.mkString(",")}")
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index ce5d48c..6c0f8f9 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -413,7 +413,7 @@ class LogSegment private[log] (val log: FileRecords,
   override def toString: String = "LogSegment(baseOffset=" + baseOffset +
     ", size=" + size +
     ", lastModifiedTime=" + lastModified +
-    ", largestTime=" + largestTimestamp +
+    ", largestRecordTimestamp=" + largestRecordTimestamp +
     ")"
 
   /**

Reply via email to