This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e88c10ed56 KAFKA-14483 Move LocalLog to storage module (#17587)
6e88c10ed56 is described below

commit 6e88c10ed5628ff31c0aa3096d16a1c78ef3127f
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Oct 28 13:41:46 2024 +0100

    KAFKA-14483 Move LocalLog to storage module (#17587)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 core/src/main/scala/kafka/log/LocalLog.scala       | 720 -------------------
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  63 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |   2 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   2 +-
 .../test/scala/unit/kafka/log/LocalLogTest.scala   | 108 +--
 .../unit/kafka/log/LogCleanerManagerTest.scala     |   2 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   2 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |   2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   4 +-
 .../scala/unit/kafka/utils/SchedulerTest.scala     |   4 +-
 gradle/spotbugs-exclude.xml                        |   1 -
 .../kafka/storage/internals/log/LocalLog.java      | 768 ++++++++++++++++++++-
 .../kafka/storage/internals/log/LogFileUtils.java  |   6 +
 .../kafka/storage/internals/log/LogTruncation.java |  38 +
 .../internals/log/SegmentDeletionReason.java       |  23 +
 16 files changed, 929 insertions(+), 818 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 492422cd564..78ab9c097d8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -344,7 +344,7 @@
     <suppress checks="CyclomaticComplexity"
               
files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
     <suppress checks="NPathComplexity"
-              
files="(LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
+              
files="(LocalLog|LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
     <suppress checks="ParameterNumber"
               files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig).java"/>
 
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala 
b/core/src/main/scala/kafka/log/LocalLog.scala
deleted file mode 100644
index 09aaed13d68..00000000000
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ /dev/null
@@ -1,720 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.utils.Logging
-import org.apache.kafka.common.errors.{KafkaStorageException, 
OffsetOutOfRangeException}
-import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.storage.internals.log.{AbortedTxn, FetchDataInfo, 
LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, 
LogSegments, OffsetPosition, LocalLog => JLocalLog}
-
-import java.io.File
-import java.nio.file.Files
-import java.util
-import java.util.Collections.singletonList
-import java.util.concurrent.atomic.AtomicLong
-import java.util.regex.Pattern
-import java.util.{Collections, Optional}
-import scala.collection.mutable.ListBuffer
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOptional
-
-/**
- * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
- * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
- * for a given segment.
- *
- * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
- *
- * @param _dir The directory in which log segments are created.
- * @param config The log configuration settings
- * @param segments The non-empty log segments recovered from disk
- * @param recoveryPoint The offset at which to begin the next recovery i.e. 
the first offset which has not been flushed to disk
- * @param nextOffsetMetadata The offset where the next message could be 
appended
- * @param scheduler The thread pool scheduler used for background actions
- * @param time The time instance used for checking the clock
- * @param topicPartition The topic partition associated with this log
- * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
- */
-class LocalLog(@volatile private var _dir: File,
-               @volatile private[log] var config: LogConfig,
-               private[log] val segments: LogSegments,
-               @volatile private[log] var recoveryPoint: Long,
-               @volatile private var nextOffsetMetadata: LogOffsetMetadata,
-               private[log] val scheduler: Scheduler,
-               private[log] val time: Time,
-               private[log] val topicPartition: TopicPartition,
-               private[log] val logDirFailureChannel: LogDirFailureChannel) 
extends Logging {
-
-  import kafka.log.LocalLog._
-
-  this.logIdent = s"[LocalLog partition=$topicPartition, dir=${dir.getParent}] 
"
-
-  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
-  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
-  @volatile private[log] var isMemoryMappedBufferClosed = false
-
-  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
-  @volatile private var _parentDir: String = dir.getParent
-
-  // Last time the log was flushed
-  private val lastFlushedTime = new AtomicLong(time.milliseconds)
-
-  private[log] def dir: File = _dir
-
-  private[log] def name: String = dir.getName
-
-  private[log] def parentDir: String = _parentDir
-
-  private[log] def parentDirFile: File = new File(_parentDir)
-
-  private[log] def isFuture: Boolean = 
dir.getName.endsWith(LocalLog.FutureDirSuffix)
-
-  private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
-    JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => 
msg, () => fun)
-  }
-
-  /**
-   * Rename the directory of the log
-   * @param name the new dir name
-   * @throws KafkaStorageException if rename fails
-   */
-  private[log] def renameDir(name: String): Boolean = {
-    maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
log dir ${dir.getParent}") {
-      val renamedDir = new File(dir.getParent, name)
-      Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
-      if (renamedDir != dir) {
-        _dir = renamedDir
-        _parentDir = renamedDir.getParent
-        segments.updateParentDir(renamedDir)
-        true
-      } else {
-        false
-      }
-    }
-  }
-
-  /**
-   * Update the existing configuration to the new provided configuration.
-   * @param newConfig the new configuration to be updated to
-   */
-  private[log] def updateConfig(newConfig: LogConfig): Unit = {
-    val oldConfig = config
-    config = newConfig
-    val oldRecordVersion = oldConfig.recordVersion
-    val newRecordVersion = newConfig.recordVersion
-    if (newRecordVersion.precedes(oldRecordVersion))
-      warn(s"Record format version has been downgraded from $oldRecordVersion 
to $newRecordVersion.")
-  }
-
-  private[log] def checkIfMemoryMappedBufferClosed(): Unit = {
-    if (isMemoryMappedBufferClosed)
-      throw new KafkaStorageException(s"The memory mapped buffer for log of 
$topicPartition is already closed")
-  }
-
-  private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = {
-    recoveryPoint = newRecoveryPoint
-  }
-
-  /**
-   * Update recoveryPoint to provided offset and mark the log as flushed, if 
the offset is greater
-   * than the existing recoveryPoint.
-   *
-   * @param offset the offset to be updated
-   */
-  private[log] def markFlushed(offset: Long): Unit = {
-    checkIfMemoryMappedBufferClosed()
-    if (offset > recoveryPoint) {
-      updateRecoveryPoint(offset)
-      lastFlushedTime.set(time.milliseconds)
-    }
-  }
-
-  /**
-   * The number of messages appended to the log since the last flush
-   */
-  private[log] def unflushedMessages: Long = logEndOffset - recoveryPoint
-
-  /**
-   * Flush local log segments for all offsets up to offset-1.
-   * Does not update the recovery point.
-   *
-   * @param offset The offset to flush up to (non-inclusive)
-   */
-  private[log] def flush(offset: Long): Unit = {
-    val currentRecoveryPoint = recoveryPoint
-    if (currentRecoveryPoint <= offset) {
-      val segmentsToFlush = segments.values(currentRecoveryPoint, offset)
-      segmentsToFlush.forEach(_.flush())
-      // If there are any new segments, we need to flush the parent directory 
for crash consistency.
-      if (segmentsToFlush.stream().anyMatch(_.baseOffset >= 
currentRecoveryPoint)) {
-        // The directory might be renamed concurrently for topic deletion, 
which may cause NoSuchFileException here.
-        // Since the directory is to be deleted anyways, we just swallow 
NoSuchFileException and let it go.
-        Utils.flushDirIfExists(dir.toPath)
-      }
-    }
-  }
-
-  /**
-   * The time this log is last known to have been fully flushed to disk
-   */
-  private[log] def lastFlushTime: Long = lastFlushedTime.get
-
-  /**
-   * The offset metadata of the next message that will be appended to the log
-   */
-  private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
-
-  /**
-   * The offset of the next message that will be appended to the log
-   */
-  private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
-
-  /**
-   * Update end offset of the log, and update the recoveryPoint.
-   *
-   * @param endOffset the new end offset of the log
-   */
-  private[log] def updateLogEndOffset(endOffset: Long): Unit = {
-    nextOffsetMetadata = new LogOffsetMetadata(endOffset, 
segments.activeSegment.baseOffset, segments.activeSegment.size)
-    if (recoveryPoint > endOffset) {
-      updateRecoveryPoint(endOffset)
-    }
-  }
-
-  /**
-   * Close file handlers used by log but don't write to disk.
-   * This is called if the log directory is offline.
-   */
-  private[log] def closeHandlers(): Unit = {
-    segments.closeHandlers()
-    isMemoryMappedBufferClosed = true
-  }
-
-  /**
-   * Closes the segments of the log.
-   */
-  private[log] def close(): Unit = {
-    maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
dir ${dir.getParent}") {
-      checkIfMemoryMappedBufferClosed()
-      segments.close()
-    }
-  }
-
-  /**
-   * Completely delete this log directory with no delay.
-   */
-  private[log] def deleteEmptyDir(): Unit = {
-    maybeHandleIOException(s"Error while deleting dir for $topicPartition in 
dir ${dir.getParent}") {
-      if (segments.nonEmpty) {
-        throw new IllegalStateException(s"Can not delete directory when 
${segments.numberOfSegments} segments are still present")
-      }
-      if (!isMemoryMappedBufferClosed) {
-        throw new IllegalStateException(s"Can not delete directory when memory 
mapped buffer for log of $topicPartition is still open.")
-      }
-      Utils.delete(dir)
-    }
-  }
-
-  /**
-   * Completely delete all segments with no delay.
-   * @return the deleted segments
-   */
-  private[log] def deleteAllSegments(): util.List[LogSegment] = {
-    maybeHandleIOException(s"Error while deleting all segments for 
$topicPartition in dir ${dir.getParent}") {
-      val deletableSegments = new util.ArrayList(segments.values)
-      removeAndDeleteSegments(segments.values.asScala, asyncDelete = false, 
LogDeletion(this))
-      isMemoryMappedBufferClosed = true
-      deletableSegments
-    }
-  }
-
-  /**
-   * This method deletes the given log segments by doing the following for 
each of them:
-   * - It removes the segment from the segment map so that it will no longer 
be used for reads.
-   * - It renames the index and log files by appending .deleted to the 
respective file name
-   * - It can either schedule an asynchronous delete operation to occur in the 
future or perform the deletion synchronously
-   *
-   * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
-   * physically deleting a file while it is being read.
-   *
-   * This method does not convert IOException to KafkaStorageException, the 
immediate caller
-   * is expected to catch and handle IOException.
-   *
-   * @param segmentsToDelete The log segments to schedule for deletion
-   * @param asyncDelete Whether the segment files should be deleted 
asynchronously
-   * @param reason The reason for the segment deletion
-   */
-  private[log] def removeAndDeleteSegments(segmentsToDelete: 
Iterable[LogSegment],
-                                           asyncDelete: Boolean,
-                                           reason: SegmentDeletionReason): 
Unit = {
-    if (segmentsToDelete.nonEmpty) {
-      // Most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
-      // removing the deleted segment, we should force materialization of the 
iterator here, so that results of the
-      // iteration remain valid and deterministic. We should also pass only 
the materialized view of the
-      // iterator to the logic that actually deletes the segments.
-      val toDelete = segmentsToDelete.toList
-      reason.logReason(toDelete)
-      toDelete.foreach { segment =>
-        segments.remove(segment.baseOffset)
-      }
-      JLocalLog.deleteSegmentFiles(toDelete.asJava, asyncDelete, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logIdent)
-    }
-  }
-
-  /**
-   * This method deletes the given segment and creates a new segment with the 
given new base offset. It ensures an
-   * active segment exists in the log at all times during this process.
-   *
-   * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
-   * physically deleting a file while it is being read.
-   *
-   * This method does not convert IOException to KafkaStorageException, the 
immediate caller
-   * is expected to catch and handle IOException.
-   *
-   * @param newOffset The base offset of the new segment
-   * @param segmentToDelete The old active segment to schedule for deletion
-   * @param asyncDelete Whether the segment files should be deleted 
asynchronously
-   * @param reason The reason for the segment deletion
-   */
-  private[log] def createAndDeleteSegment(newOffset: Long,
-                                          segmentToDelete: LogSegment,
-                                          asyncDelete: Boolean,
-                                          reason: SegmentDeletionReason): 
LogSegment = {
-    if (newOffset == segmentToDelete.baseOffset)
-      segmentToDelete.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX)
-
-    val newSegment = LogSegment.open(dir,
-      newOffset,
-      config,
-      time,
-      config.initFileSize,
-      config.preallocate)
-    segments.add(newSegment)
-
-    reason.logReason(List(segmentToDelete))
-    if (newOffset != segmentToDelete.baseOffset)
-      segments.remove(segmentToDelete.baseOffset)
-    JLocalLog.deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, 
dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
-
-    newSegment
-  }
-
-  /**
-   * Given a message offset, find its corresponding offset metadata in the log.
-   * If the message offset is out of range, throw an OffsetOutOfRangeException
-   */
-  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {
-    val fetchDataInfo = read(offset,
-      maxLength = 1,
-      minOneMessage = false,
-      maxOffsetMetadata = nextOffsetMetadata,
-      includeAbortedTxns = false)
-    fetchDataInfo.fetchOffsetMetadata
-  }
-
-  /**
-   * Read messages from the log.
-   *
-   * @param startOffset The offset to begin reading at
-   * @param maxLength The maximum number of bytes to read
-   * @param minOneMessage If this is true, the first message will be returned 
even if it exceeds `maxLength` (if one exists)
-   * @param maxOffsetMetadata The metadata of the maximum offset to be fetched
-   * @param includeAbortedTxns If true, aborted transactions are included
-   * @throws OffsetOutOfRangeException If startOffset is beyond the log end 
offset
-   * @return The fetch data information including fetch starting offset 
metadata and messages read.
-   */
-  def read(startOffset: Long,
-           maxLength: Int,
-           minOneMessage: Boolean,
-           maxOffsetMetadata: LogOffsetMetadata,
-           includeAbortedTxns: Boolean): FetchDataInfo = {
-    maybeHandleIOException(s"Exception while reading from $topicPartition in 
dir ${dir.getParent}") {
-      trace(s"Reading maximum $maxLength bytes at offset $startOffset from log 
with " +
-        s"total length ${segments.sizeInBytes} bytes")
-
-      val endOffsetMetadata = nextOffsetMetadata
-      val endOffset = endOffsetMetadata.messageOffset
-      var segmentOpt = segments.floorSegment(startOffset)
-
-      // return error on attempt to read beyond the log end offset
-      if (startOffset > endOffset || !segmentOpt.isPresent)
-        throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
-          s"but we only have log segments upto $endOffset.")
-
-      if (startOffset == maxOffsetMetadata.messageOffset)
-        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-      else if (startOffset > maxOffsetMetadata.messageOffset)
-        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-      else {
-        // Do the read on the segment with a base offset less than the target 
offset
-        // but if that segment doesn't contain any messages with an offset 
greater than that
-        // continue to read from successive segments until we get some 
messages or we reach the end of the log
-        var fetchDataInfo: FetchDataInfo = null
-        while (fetchDataInfo == null && segmentOpt.isPresent) {
-          val segment = segmentOpt.get
-          val baseOffset = segment.baseOffset
-
-          // 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, 
then return maxPosition as empty.
-          // 2. Use the max-offset position if it is on this segment; 
otherwise, the segment size is the limit.
-          // 3. When maxOffsetMetadata is message-offset-only, then we don't 
know the relativePositionInSegment so
-          //    return maxPosition as empty to avoid reading beyond the 
max-offset
-          val maxPositionOpt: Optional[java.lang.Long] =
-            if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset)
-              Optional.of(segment.size)
-            else if (segment.baseOffset == maxOffsetMetadata.segmentBaseOffset 
&& !maxOffsetMetadata.messageOffsetOnly())
-              Optional.of(maxOffsetMetadata.relativePositionInSegment)
-            else
-              Optional.empty()
-
-          fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, 
minOneMessage)
-          if (fetchDataInfo != null) {
-            if (includeAbortedTxns)
-              fetchDataInfo = addAbortedTransactions(startOffset, segment, 
fetchDataInfo)
-          } else segmentOpt = segments.higherSegment(baseOffset)
-        }
-
-        if (fetchDataInfo != null) fetchDataInfo
-        else {
-          // okay we are beyond the end of the last segment with no data 
fetched although the start offset is in range,
-          // this can happen when all messages with offset larger than start 
offsets have been deleted.
-          // In this case, we will return the empty set with log end offset 
metadata
-          new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
-        }
-      }
-    }
-  }
-
-  private[log] def append(lastOffset: Long, largestTimestamp: Long, 
shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
-    segments.activeSegment.append(lastOffset, largestTimestamp, 
shallowOffsetOfMaxTimestamp, records)
-    updateLogEndOffset(lastOffset + 1)
-  }
-
-  private def addAbortedTransactions(startOffset: Long, segment: LogSegment,
-                                     fetchInfo: FetchDataInfo): FetchDataInfo 
= {
-    val fetchSize = fetchInfo.records.sizeInBytes
-    val startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
-      fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
-    val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, 
fetchSize).orElse(
-      segments.higherSegment(segment.baseOffset).toScala.map(s => 
s.baseOffset).getOrElse(logEndOffset))
-
-    val abortedTransactions = 
ListBuffer.empty[FetchResponseData.AbortedTransaction]
-    def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions 
++= abortedTxns.map(_.asAbortedTransaction)
-    collectAbortedTransactions(startOffset, upperBoundOffset, segment, 
accumulator)
-
-    new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
-      fetchInfo.records,
-      fetchInfo.firstEntryIncomplete,
-      Optional.of(abortedTransactions.toList.asJava))
-  }
-
-  private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: 
Long,
-                                         startingSegment: LogSegment,
-                                         accumulator: Seq[AbortedTxn] => 
Unit): Unit = {
-    val higherSegments = 
segments.higherSegments(startingSegment.baseOffset).iterator
-    var segmentEntryOpt = Option(startingSegment)
-    while (segmentEntryOpt.isDefined) {
-      val segment = segmentEntryOpt.get
-      val searchResult = segment.collectAbortedTxns(startOffset, 
upperBoundOffset)
-      accumulator(searchResult.abortedTransactions.asScala)
-      if (searchResult.isComplete)
-        return
-      segmentEntryOpt = nextOption(higherSegments)
-    }
-  }
-
-  private[log] def collectAbortedTransactions(logStartOffset: Long, 
baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
-    val segmentEntry = segments.floorSegment(baseOffset)
-    val allAbortedTxns = ListBuffer.empty[AbortedTxn]
-    def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= 
abortedTxns
-    segmentEntry.ifPresent(segment => 
collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, 
accumulator))
-    allAbortedTxns.toList
-  }
-
-  /**
-   * Roll the log over to a new active segment starting with the current 
logEndOffset.
-   * This will trim the index to the exact size of the number of entries it 
currently contains.
-   *
-   * @param expectedNextOffset The expected next offset after the segment is 
rolled
-   *
-   * @return The newly rolled segment
-   */
-  private[log] def roll(expectedNextOffset: Option[Long] = None): LogSegment = 
{
-    maybeHandleIOException(s"Error while rolling log segment for 
$topicPartition in dir ${dir.getParent}") {
-      val start = time.hiResClockMs()
-      checkIfMemoryMappedBufferClosed()
-      val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
-      val logFile = LogFileUtils.logFile(dir, newOffset, "")
-      val activeSegment = segments.activeSegment
-      if (segments.contains(newOffset)) {
-        // segment with the same base offset already exists and loaded
-        if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
-          // We have seen this happen (see KAFKA-6388) after shouldRoll() 
returns true for an
-          // active segment of size zero because of one of the indexes is 
"full" (due to _maxEntries == 0).
-          warn(s"Trying to roll a new log segment with start offset $newOffset 
" +
-            s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) 
while it already " +
-            s"exists and is active with size 0. Size of time index: 
${activeSegment.timeIndex.entries}," +
-            s" size of offset index: ${activeSegment.offsetIndex.entries}.")
-          val newSegment = createAndDeleteSegment(newOffset, activeSegment, 
asyncDelete = true, LogRoll(this))
-          updateLogEndOffset(nextOffsetMetadata.messageOffset)
-          info(s"Rolled new log segment at offset $newOffset in 
${time.hiResClockMs() - start} ms.")
-          return newSegment
-        } else {
-          throw new KafkaException(s"Trying to roll a new log segment for 
topic partition $topicPartition with start offset $newOffset" +
-            s" =max(provided offset = $expectedNextOffset, LEO = 
$logEndOffset) while it already exists. Existing " +
-            s"segment is ${segments.get(newOffset)}.")
-        }
-      } else if (segments.nonEmpty && newOffset < activeSegment.baseOffset) {
-        throw new KafkaException(
-          s"Trying to roll a new log segment for topic partition 
$topicPartition with " +
-            s"start offset $newOffset =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active 
segment $activeSegment")
-      } else {
-        val offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset)
-        val timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset)
-        val txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset)
-
-        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if 
file.exists) {
-          warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
-          Files.delete(file.toPath)
-        }
-
-        segments.lastSegment.ifPresent(_.onBecomeInactiveSegment())
-      }
-
-      val newSegment = LogSegment.open(dir,
-        newOffset,
-        config,
-        time,
-        config.initFileSize,
-        config.preallocate)
-      segments.add(newSegment)
-
-      // We need to update the segment base offset and append position data of 
the metadata when log rolls.
-      // The next offset should not change.
-      updateLogEndOffset(nextOffsetMetadata.messageOffset)
-
-      info(s"Rolled new log segment at offset $newOffset in 
${time.hiResClockMs() - start} ms.")
-
-      newSegment
-    }
-  }
-
-  /**
-   *  Delete all data in the local log and start at the new offset.
-   *
-   *  @param newOffset The new offset to start the log with
-   *  @return the list of segments that were scheduled for deletion
-   */
-  private[log] def truncateFullyAndStartAt(newOffset: Long): 
Iterable[LogSegment] = {
-    maybeHandleIOException(s"Error while truncating the entire log for 
$topicPartition in dir ${dir.getParent}") {
-      debug(s"Truncate and start at offset $newOffset")
-      checkIfMemoryMappedBufferClosed()
-      val segmentsToDelete = new util.ArrayList(segments.values).asScala
-
-      if (segmentsToDelete.nonEmpty) {
-        removeAndDeleteSegments(segmentsToDelete.dropRight(1), asyncDelete = 
true, LogTruncation(this))
-        // Use createAndDeleteSegment() to create new segment first and then 
delete the old last segment to prevent missing
-        // active segment during the deletion process
-        createAndDeleteSegment(newOffset, segmentsToDelete.last, asyncDelete = 
true, LogTruncation(this))
-      }
-
-      updateLogEndOffset(newOffset)
-
-      segmentsToDelete
-    }
-  }
-
-  /**
-   * Truncate this log so that it ends with the greatest offset < targetOffset.
-   *
-   * @param targetOffset The offset to truncate to, an upper bound on all 
offsets in the log after truncation is complete.
-   * @return the list of segments that were scheduled for deletion
-   */
-  private[log] def truncateTo(targetOffset: Long): Iterable[LogSegment] = {
-    val deletableSegments = segments.filter(segment => segment.baseOffset > 
targetOffset).asScala
-    removeAndDeleteSegments(deletableSegments, asyncDelete = true, 
LogTruncation(this))
-    segments.activeSegment.truncateTo(targetOffset)
-    updateLogEndOffset(targetOffset)
-    deletableSegments
-  }
-}
-
-/**
- * Helper functions for logs
- */
-object LocalLog extends Logging {
-
-  /** a directory that is scheduled to be deleted */
-  private[log] val DeleteDirSuffix = LogFileUtils.DELETE_DIR_SUFFIX
-
-  /** a directory that is used for future partition */
-  private[log] val FutureDirSuffix = "-future"
-
-  /** a directory that is used for stray partition */
-  private[log] val StrayDirSuffix = "-stray"
-
-  private[log] val DeleteDirPattern = 
Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
-  private[log] val FutureDirPattern = 
Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
-  private[log] val StrayDirPattern = 
Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix")
-
-  private[log] val UnknownOffset = -1L
-
-  /**
-   * Return a directory name to rename the log directory to for async deletion.
-   * The name will be in the following format: 
"topic-partitionId.uniqueId-delete".
-   * If the topic name is too long, it will be truncated to prevent the total 
name
-   * from exceeding 255 characters.
-   */
-  private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
-    logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix)
-  }
-
-  /**
-   * Return a directory name to rename the log directory to for stray 
partition deletion.
-   * The name will be in the following format: 
"topic-partitionId.uniqueId-stray".
-   * If the topic name is too long, it will be truncated to prevent the total 
name
-   * from exceeding 255 characters.
-   */
-  private[log] def logStrayDirName(topicPartition: TopicPartition): String = {
-    logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix)
-  }
-
-  /**
-   * Return a future directory name for the given topic partition. The name 
will be in the following
-   * format: topic-partition.uniqueId-future where topic, partition and 
uniqueId are variables.
-   */
-  private[log] def logFutureDirName(topicPartition: TopicPartition): String = {
-    logDirNameWithSuffix(topicPartition, FutureDirSuffix)
-  }
-
-  /**
-   * Return a new directory name in the following format: 
"${topic}-${partitionId}.${uniqueId}${suffix}".
-   * If the topic name is too long, it will be truncated to prevent the total 
name
-   * from exceeding 255 characters.
-   */
-  private[log] def logDirNameWithSuffixCappedLength(topicPartition: 
TopicPartition, suffix: String): String = {
-    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
-    val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix"
-    val prefixLength = Math.min(topicPartition.topic().length, 255 - 
fullSuffix.length)
-    s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix"
-  }
-
-  private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, 
suffix: String): String = {
-    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
-    s"${logDirName(topicPartition)}.$uniqueId$suffix"
-  }
-
-  /**
-   * Return a directory name for the given topic partition. The name will be 
in the following
-   * format: topic-partition where topic, partition are variables.
-   */
-  private[log] def logDirName(topicPartition: TopicPartition): String = {
-    s"${topicPartition.topic}-${topicPartition.partition}"
-  }
-
-  /**
-   * Parse the topic and partition out of the directory name of a log
-   */
-  private[log] def parseTopicPartitionName(dir: File): TopicPartition = {
-    if (dir == null)
-      throw new KafkaException("dir should not be null")
-
-    def exception(dir: File): KafkaException = {
-      new KafkaException(s"Found directory ${dir.getCanonicalPath}, 
'${dir.getName}' is not in the form of " +
-        "topic-partition or topic-partition.uniqueId-delete (if marked for 
deletion).\n" +
-        "Kafka's log directories (and children) should only contain Kafka 
topic data.")
-    }
-
-    val dirName = dir.getName
-    if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
-      throw exception(dir)
-    if (dirName.endsWith(DeleteDirSuffix) && 
!DeleteDirPattern.matcher(dirName).matches ||
-      dirName.endsWith(FutureDirSuffix) && 
!FutureDirPattern.matcher(dirName).matches ||
-      dirName.endsWith(StrayDirSuffix) && 
!StrayDirPattern.matcher(dirName).matches)
-      throw exception(dir)
-
-    val name: String =
-      if (dirName.endsWith(DeleteDirSuffix) || 
dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix))
-        dirName.substring(0, dirName.lastIndexOf('.'))
-      else dirName
-
-    val index = name.lastIndexOf('-')
-    val topic = name.substring(0, index)
-    val partitionString = name.substring(index + 1)
-    if (topic.isEmpty || partitionString.isEmpty)
-      throw exception(dir)
-
-    val partition =
-      try partitionString.toInt
-      catch { case _: NumberFormatException => throw exception(dir) }
-
-    new TopicPartition(topic, partition)
-  }
-
-  private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
-                                      includeAbortedTxns: Boolean): 
FetchDataInfo = {
-    val abortedTransactions: 
Optional[java.util.List[FetchResponseData.AbortedTransaction]] =
-      if (includeAbortedTxns) Optional.of(Collections.emptyList())
-      else Optional.empty()
-    new FetchDataInfo(fetchOffsetMetadata,
-      MemoryRecords.EMPTY,
-      false,
-      abortedTransactions)
-  }
-
-  /**
-   * Wraps the value of iterator.next() in an option.
-   * Note: this facility is a part of the Iterator class starting from scala 
v2.13.
-   *
-   * @param iterator
-   * @tparam T the type of object held within the iterator
-   * @return Some(iterator.next) if a next element exists, None otherwise.
-   */
-  private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
-    if (iterator.hasNext)
-      Some(iterator.next())
-    else
-      None
-  }
-}
-
-trait SegmentDeletionReason {
-  def logReason(toDelete: List[LogSegment]): Unit
-}
-
-case class LogTruncation(log: LocalLog) extends SegmentDeletionReason {
-  override def logReason(toDelete: List[LogSegment]): Unit = {
-    log.info(s"Deleting segments as part of log truncation: 
${toDelete.mkString(",")}")
-  }
-}
-
-case class LogRoll(log: LocalLog) extends SegmentDeletionReason {
-  override def logReason(toDelete: List[LogSegment]): Unit = {
-    log.info(s"Deleting segments as part of log roll: 
${toDelete.mkString(",")}")
-  }
-}
-
-case class LogDeletion(log: LocalLog) extends SegmentDeletionReason {
-  override def logReason(toDelete: List[LogSegment]): Unit = {
-    log.info(s"Deleting segments as the log has been deleted: 
${toDelete.mkString(",")}")
-  }
-}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 4747b5b0ee9..f4ef718f94f 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -18,7 +18,6 @@
 package kafka.log
 
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
-import kafka.log.LocalLog.nextOption
 import kafka.log.remote.RemoteLogManager
 import kafka.utils._
 import org.apache.kafka.common.errors._
@@ -41,7 +40,7 @@ import org.apache.kafka.server.util.Scheduler
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, 
LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, 
LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, 
ProducerStateManager, ProducerStateManagerConfig, RollParams, 
VerificationGuard, LocalLog => JLocalLog, UnifiedLog => JUnifiedLog}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, 
ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, 
RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog = [...]
 import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, 
BrokerTopicStats}
 
 import java.io.{File, IOException}
@@ -1246,7 +1245,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {
-    localLog.collectAbortedTransactions(logStartOffset, startOffset, 
upperBoundOffset)
+    localLog.collectAbortedTransactions(logStartOffset, startOffset, 
upperBoundOffset).asScala.toList
   }
 
   /**
@@ -1559,7 +1558,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           val newLocalLogStartOffset = 
localLog.segments.higherSegment(segmentsToDelete.last.baseOffset()).get.baseOffset()
           incrementStartOffset(newLocalLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
           // remove the segments for lookups
-          localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = 
true, reason)
+          localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, 
true, reason)
         }
         deleteProducerSnapshots(deletable.toList.asJava, asyncDelete = true)
       }
@@ -1713,7 +1712,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * @return The newly rolled segment
    */
   def roll(expectedNextOffset: Option[Long] = None): LogSegment = lock 
synchronized {
-    val newSegment = localLog.roll(expectedNextOffset)
+    val nextOffset : JLong = expectedNextOffset match {
+      case Some(offset) => offset
+      case None => 0L
+    }
+    val newSegment = localLog.roll(nextOffset)
     // Take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
     // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
     // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
@@ -1848,7 +1851,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
             truncateFullyAndStartAt(targetOffset)
           } else {
             val deletedSegments = localLog.truncateTo(targetOffset)
-            deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete 
= true)
+            deleteProducerSnapshots(deletedSegments, asyncDelete = true)
             leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
             logStartOffset = math.min(targetOffset, logStartOffset)
             rebuildProducerState(targetOffset, producerStateManager)
@@ -1962,7 +1965,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private[log] def addSegment(segment: LogSegment): LogSegment = 
localLog.segments.add(segment)
 
   private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
-    JLocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => 
msg, () => fun)
+    LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, () => 
msg, () => fun)
   }
 
   private[log] def splitOverflowedSegment(segment: LogSegment): 
List[LogSegment] = lock synchronized {
@@ -1971,7 +1974,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result.newSegments.asScala.toList
   }
 
-  private[log] def deleteProducerSnapshots(segments: util.List[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private[log] def deleteProducerSnapshots(segments: 
util.Collection[LogSegment], asyncDelete: Boolean): Unit = {
     JUnifiedLog.deleteProducerSnapshots(segments, producerStateManager, 
asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition)
   }
 }
@@ -1991,14 +1994,9 @@ object UnifiedLog extends Logging {
 
   val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX
 
-  val StrayDirSuffix: String = LocalLog.StrayDirSuffix
-
-  val FutureDirSuffix: String = LocalLog.FutureDirSuffix
-
-  private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
-  private[log] val FutureDirPattern = LocalLog.FutureDirPattern
+  val StrayDirSuffix: String = LogFileUtils.STRAY_DIR_SUFFIX
 
-  val UnknownOffset: Long = LocalLog.UnknownOffset
+  val UnknownOffset: Long = LocalLog.UNKNOWN_OFFSET
 
   def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean,
                          config: LogConfig,
@@ -2139,7 +2137,7 @@ object UnifiedLog extends Logging {
                                    logDirFailureChannel: LogDirFailureChannel,
                                    logPrefix: String,
                                    isRecoveredSwapFile: Boolean = false): 
Iterable[LogSegment] = {
-    JLocalLog.replaceSegments(existingSegments,
+    LocalLog.replaceSegments(existingSegments,
       newSegments.asJava,
       oldSegments.asJava,
       dir,
@@ -2159,11 +2157,11 @@ object UnifiedLog extends Logging {
                                           scheduler: Scheduler,
                                           logDirFailureChannel: 
LogDirFailureChannel,
                                           logPrefix: String): 
SplitSegmentResult = {
-    JLocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
+    LocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
   }
 
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, 
baseOffset: Long): LogSegment = {
-    JLocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
+    LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
 
   // Visible for benchmarking
@@ -2194,6 +2192,21 @@ object UnifiedLog extends Logging {
     if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else 
config.retentionSize
   }
 
+  /**
+   * Wraps the value of iterator.next() in an option.
+   * Note: this facility is a part of the Iterator class starting from scala 
v2.13.
+   *
+   * @param iterator the iterator
+   * @tparam T the type of object held within the iterator
+   * @return Some(iterator.next) if a next element exists, None otherwise.
+   */
+  private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
+    if (iterator.hasNext)
+      Some(iterator.next())
+    else
+      None
+  }
+
 }
 
 object LogMetricNames {
@@ -2208,9 +2221,9 @@ object LogMetricNames {
 }
 
 case class RetentionMsBreach(log: UnifiedLog, 
remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
-  override def logReason(toDelete: List[LogSegment]): Unit = {
+  override def logReason(toDelete: util.List[LogSegment]): Unit = {
     val retentionMs = UnifiedLog.localRetentionMs(log.config, 
remoteLogEnabledAndRemoteCopyEnabled)
-    toDelete.foreach { segment =>
+    toDelete.forEach { segment =>
       if (segment.largestRecordTimestamp.isPresent)
         if (remoteLogEnabledAndRemoteCopyEnabled)
           log.info(s"Deleting segment $segment due to local log retention time 
${retentionMs}ms breach based on the largest " +
@@ -2231,9 +2244,9 @@ case class RetentionMsBreach(log: UnifiedLog, 
remoteLogEnabledAndRemoteCopyEnabl
 }
 
 case class RetentionSizeBreach(log: UnifiedLog, 
remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
-  override def logReason(toDelete: List[LogSegment]): Unit = {
+  override def logReason(toDelete: util.List[LogSegment]): Unit = {
     var size = log.size
-    toDelete.foreach { segment =>
+    toDelete.forEach { segment =>
       size -= segment.size
       if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment 
$segment due to local log retention size 
${UnifiedLog.localRetentionSize(log.config, 
remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
         s"Local log size after deletion will be $size.")
@@ -2244,10 +2257,10 @@ case class RetentionSizeBreach(log: UnifiedLog, 
remoteLogEnabledAndRemoteCopyEna
 }
 
 case class StartOffsetBreach(log: UnifiedLog, remoteLogEnabled: Boolean) 
extends SegmentDeletionReason {
-  override def logReason(toDelete: List[LogSegment]): Unit = {
+  override def logReason(toDelete: util.List[LogSegment]): Unit = {
     if (remoteLogEnabled)
-      log.info(s"Deleting segments due to local log start offset 
${log.localLogStartOffset()} breach: ${toDelete.mkString(",")}")
+      log.info(s"Deleting segments due to local log start offset 
${log.localLogStartOffset()} breach: ${toDelete.asScala.mkString(",")}")
     else
-      log.info(s"Deleting segments due to log start offset 
${log.logStartOffset} breach: ${toDelete.mkString(",")}")
+      log.info(s"Deleting segments due to log start offset 
${log.logStartOffset} breach: ${toDelete.asScala.mkString(",")}")
   }
 }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 6d512c23853..179e098c347 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
FetchParams}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, 
ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, 
LogSegments, ProducerStateManager, ProducerStateManagerConfig, 
VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 1ab842e4888..befc4d5fa12 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -63,7 +63,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
FetchParams}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
EpochEntry, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, 
LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, 
ProducerStateManagerConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, 
LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, 
ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala 
b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index 775c86d0208..0b840ddde62 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.server.util.{MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog => 
JLocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, 
LogSegment, LogSegments}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog, 
LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, 
LogSegments, LogTruncation, SegmentDeletionReason}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -98,10 +98,10 @@ class LocalLogTest {
   private def appendRecords(records: Iterable[SimpleRecord],
                             log: LocalLog = log,
                             initialOffset: Long = 0L): Unit = {
-    log.append(lastOffset = initialOffset + records.size - 1,
-      largestTimestamp = records.head.timestamp,
-      shallowOffsetOfMaxTimestamp = initialOffset,
-      records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, 
records.toList : _*))
+    log.append(initialOffset + records.size - 1,
+      records.head.timestamp,
+      initialOffset,
+      MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, 
records.toList : _*))
   }
 
   private def readRecords(log: LocalLog = log,
@@ -112,16 +112,16 @@ class LocalLogTest {
                           includeAbortedTxns: Boolean = false): FetchDataInfo 
= {
     log.read(startOffset,
              maxLength,
-             minOneMessage = minOneMessage,
+             minOneMessage,
              maxOffsetMetadata,
-             includeAbortedTxns = includeAbortedTxns)
+             includeAbortedTxns)
   }
 
   @Test
   def testLogDeleteSegmentsSuccess(): Unit = {
     val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
     appendRecords(List(record))
-    log.roll()
+    log.roll(0L)
     assertEquals(2, log.segments.numberOfSegments)
     assertFalse(logDir.listFiles.isEmpty)
     val segmentsBeforeDelete = new util.ArrayList(log.segments.values)
@@ -135,7 +135,7 @@ class LocalLogTest {
   @Test
   def testRollEmptyActiveSegment(): Unit = {
     val oldActiveSegment = log.segments.activeSegment
-    log.roll()
+    log.roll(0L)
     assertEquals(1, log.segments.numberOfSegments)
     assertNotEquals(oldActiveSegment, log.segments.activeSegment)
     assertFalse(logDir.listFiles.isEmpty)
@@ -146,7 +146,7 @@ class LocalLogTest {
   def testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty(): Unit ={
     val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
     appendRecords(List(record))
-    log.roll()
+    log.roll(0L)
     assertEquals(2, log.segments.numberOfSegments)
     assertFalse(logDir.listFiles.isEmpty)
 
@@ -172,7 +172,7 @@ class LocalLogTest {
   def testLogDirRenameToNewDir(): Unit = {
     val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
     appendRecords(List(record))
-    log.roll()
+    log.roll(0L)
     assertEquals(2, log.segments.numberOfSegments)
     val newLogDir = TestUtils.randomPartitionLogDir(tmpDir)
     assertTrue(log.renameDir(newLogDir.getName))
@@ -198,7 +198,7 @@ class LocalLogTest {
     val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
     appendRecords(List(record))
     mockTime.sleep(1)
-    val newSegment = log.roll()
+    val newSegment = log.roll(0L)
     log.flush(newSegment.baseOffset)
     log.markFlushed(newSegment.baseOffset)
     assertEquals(1L, log.recoveryPoint)
@@ -263,29 +263,29 @@ class LocalLogTest {
     for (offset <- 0 to 8) {
       val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
       appendRecords(List(record), initialOffset = offset)
-      log.roll()
+      log.roll(0L)
     }
 
     assertEquals(10L, log.segments.numberOfSegments)
 
     class TestDeletionReason extends SegmentDeletionReason {
-      private var _deletedSegments: Iterable[LogSegment] = List[LogSegment]()
+      private var _deletedSegments: util.Collection[LogSegment] = new 
util.ArrayList()
 
-      override def logReason(toDelete: List[LogSegment]): Unit = {
-        _deletedSegments = List[LogSegment]() ++ toDelete
+      override def logReason(toDelete: util.List[LogSegment]): Unit = {
+        _deletedSegments = new util.ArrayList(toDelete)
       }
 
-      def deletedSegments: Iterable[LogSegment] = _deletedSegments
+      def deletedSegments: util.Collection[LogSegment] = _deletedSegments
     }
     val reason = new TestDeletionReason()
-    val toDelete = log.segments.values.asScala.toVector
-    log.removeAndDeleteSegments(toDelete, asyncDelete = asyncDelete, reason)
+    val toDelete = new util.ArrayList(log.segments.values)
+    log.removeAndDeleteSegments(toDelete, asyncDelete, reason)
     if (asyncDelete) {
       mockTime.sleep(log.config.fileDeleteDelayMs + 1)
     }
     assertTrue(log.segments.isEmpty)
     assertEquals(toDelete, reason.deletedSegments)
-    toDelete.foreach(segment => assertTrue(segment.deleted()))
+    toDelete.forEach(segment => assertTrue(segment.deleted()))
   }
 
   @Test
@@ -302,13 +302,13 @@ class LocalLogTest {
     for (offset <- 0 to 8) {
       val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
       appendRecords(List(record), initialOffset = offset)
-      log.roll()
+      log.roll(0L)
     }
 
     assertEquals(10L, log.segments.numberOfSegments)
 
     val toDelete = log.segments.values
-    JLocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, 
log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
+    LocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, 
log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
     if (asyncDelete) {
       toDelete.forEach {
         segment =>
@@ -336,7 +336,7 @@ class LocalLogTest {
     appendRecords(List(record))
     val newOffset = log.segments.activeSegment.baseOffset + 1
     val oldActiveSegment = log.segments.activeSegment
-    val newActiveSegment = log.createAndDeleteSegment(newOffset, 
log.segments.activeSegment, asyncDelete = true, LogTruncation(log))
+    val newActiveSegment = log.createAndDeleteSegment(newOffset, 
log.segments.activeSegment, true, new LogTruncation(log.logger))
     assertEquals(1, log.segments.numberOfSegments)
     assertEquals(newActiveSegment, log.segments.activeSegment)
     assertNotEquals(oldActiveSegment, log.segments.activeSegment)
@@ -354,7 +354,7 @@ class LocalLogTest {
     for (offset <- 0 to 7) {
       appendRecords(List(record), initialOffset = offset)
       if (offset % 2 != 0)
-        log.roll()
+        log.roll(0L)
     }
     for (offset <- 8 to 12) {
       val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
@@ -362,7 +362,7 @@ class LocalLogTest {
     }
     assertEquals(5, log.segments.numberOfSegments)
     assertNotEquals(10L, log.segments.activeSegment.baseOffset)
-    val expected = log.segments.values.asScala.toVector
+    val expected = new util.ArrayList(log.segments.values)
     val deleted = log.truncateFullyAndStartAt(10L)
     assertEquals(expected, deleted)
     assertEquals(1, log.segments.numberOfSegments)
@@ -379,7 +379,7 @@ class LocalLogTest {
     for (offset <- 0 to 4) {
       appendRecords(List(record), initialOffset = offset)
       if (offset % 2 != 0)
-        log.roll()
+        log.roll(0L)
     }
     assertEquals(3, log.segments.numberOfSegments)
 
@@ -416,15 +416,15 @@ class LocalLogTest {
       val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
       appendRecords(List(record), initialOffset = offset)
       if (offset % 3 == 2)
-        log.roll()
+        log.roll(0L)
     }
     assertEquals(5, log.segments.numberOfSegments)
     assertEquals(12L, log.logEndOffset)
 
-    val expected = log.segments.values(9L, log.logEndOffset + 
1).asScala.toVector
+    val expected = new util.ArrayList(log.segments.values(9L, log.logEndOffset 
+ 1))
     // Truncate to an offset before the base offset of the active segment
     val deleted = log.truncateTo(7L)
-    assertEquals(expected, deleted.toVector)
+    assertEquals(expected, deleted)
     assertEquals(3, log.segments.numberOfSegments)
     assertEquals(6L, log.segments.activeSegment.baseOffset)
     assertEquals(0L, log.recoveryPoint)
@@ -444,7 +444,7 @@ class LocalLogTest {
     for (i <- 0 until 5) {
       val keyValues = Seq(KeyValue(i.toString, i.toString))
       appendRecords(kvsToRecords(keyValues), initialOffset = i)
-      log.roll()
+      log.roll(0L)
     }
 
     def nonActiveBaseOffsetsFrom(startOffset: Long): Seq[Long] = {
@@ -506,7 +506,7 @@ class LocalLogTest {
     assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir),
       () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
     // also test the "-delete" marker case
-    val deleteMarkerDir = new File(logDir, topic + partition + "." + 
LocalLog.DeleteDirSuffix)
+    val deleteMarkerDir = new File(logDir, topic + partition + "." + 
LogFileUtils.DELETE_DIR_SUFFIX)
     assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(deleteMarkerDir),
       () => "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath)
   }
@@ -535,7 +535,7 @@ class LocalLogTest {
       () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
 
     // also test the "-delete" marker case
-    val deleteMarkerDir = new File(logDir, topicPartitionName(topic, 
partition) + "." + LocalLog.DeleteDirSuffix)
+    val deleteMarkerDir = new File(logDir, topicPartitionName(topic, 
partition) + "." + LogFileUtils.DELETE_DIR_SUFFIX)
     assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(deleteMarkerDir),
       () => "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath)
   }
@@ -549,7 +549,7 @@ class LocalLogTest {
       () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
 
     // also test the "-delete" marker case
-    val deleteMarkerDir = new File(logDir, topic + partition + "." + 
LocalLog.DeleteDirSuffix)
+    val deleteMarkerDir = new File(logDir, topic + partition + "." + 
LogFileUtils.DELETE_DIR_SUFFIX)
     assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(deleteMarkerDir),
       () => "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath)
   }
@@ -569,14 +569,14 @@ class LocalLogTest {
     val name1 = LocalLog.logDeleteDirName(new TopicPartition("foo", 3))
     assertTrue(name1.length <= 255)
     
assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches())
-    assertTrue(LocalLog.DeleteDirPattern.matcher(name1).matches())
-    assertFalse(LocalLog.FutureDirPattern.matcher(name1).matches())
+    assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name1).matches())
+    assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name1).matches())
     val name2 = LocalLog.logDeleteDirName(
       new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 
5))
     assertEquals(255, name2.length)
     
assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
-    assertTrue(LocalLog.DeleteDirPattern.matcher(name2).matches())
-    assertFalse(LocalLog.FutureDirPattern.matcher(name2).matches())
+    assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name2).matches())
+    assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name2).matches())
   }
 
   @Test
@@ -598,7 +598,7 @@ class LocalLogTest {
     assertEquals(1, log.segments.numberOfSegments, "Log begins with a single 
empty segment.")
 
     // roll active segment with the same base offset of size zero should 
recreate the segment
-    log.roll(Some(0L))
+    log.roll(0L)
     assertEquals(1, log.segments.numberOfSegments, "Expect 1 segment after 
roll() empty segment with base offset.")
 
     // should be able to append records to active segment
@@ -614,7 +614,7 @@ class LocalLogTest {
     assertEquals(keyValues1 ++ keyValues2, 
recordsToKvs(readResult.records.records.asScala))
 
     // roll so that active segment is empty
-    log.roll()
+    log.roll(0L)
     assertEquals(2L, log.segments.activeSegment.baseOffset, "Expect base 
offset of active segment to be LEO")
     assertEquals(2, log.segments.numberOfSegments, "Expect two segments.")
     assertEquals(2L, log.logEndOffset)
@@ -626,7 +626,7 @@ class LocalLogTest {
 
     // roll active segment with the same base offset of size zero should 
recreate the segment
     {
-      val newSegment = log.roll()
+      val newSegment = log.roll(0L)
       assertEquals(0L, newSegment.baseOffset)
       assertEquals(1, log.segments.numberOfSegments)
       assertEquals(0L, log.logEndOffset)
@@ -635,7 +635,7 @@ class LocalLogTest {
     appendRecords(List(KeyValue("k1", "v1").toRecord()))
 
     {
-      val newSegment = log.roll()
+      val newSegment = log.roll(0L)
       assertEquals(1L, newSegment.baseOffset)
       assertEquals(2, log.segments.numberOfSegments)
       assertEquals(1L, log.logEndOffset)
@@ -644,7 +644,7 @@ class LocalLogTest {
     appendRecords(List(KeyValue("k2", "v2").toRecord()), initialOffset = 1L)
 
     {
-      val newSegment = log.roll(Some(1L))
+      val newSegment = log.roll(1L)
       assertEquals(2L, newSegment.baseOffset)
       assertEquals(3, log.segments.numberOfSegments)
       assertEquals(2L, log.logEndOffset)
@@ -661,7 +661,7 @@ class LocalLogTest {
     assertEquals(3, log.logEndOffset, "Expect two records in the log")
 
     // roll to create an empty active segment
-    log.roll()
+    log.roll(0L)
     assertEquals(3L, log.segments.activeSegment.baseOffset)
 
     // intentionally setup the logEndOffset to introduce an error later
@@ -669,7 +669,7 @@ class LocalLogTest {
 
     // expect an error because of attempt to roll to a new offset (1L) that's 
lower than the
     // base offset (3L) of the active segment
-    assertThrows(classOf[KafkaException], () => log.roll())
+    assertThrows(classOf[KafkaException], () => log.roll(0L))
   }
 
   @Test
@@ -679,7 +679,7 @@ class LocalLogTest {
     val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
     appendRecords(List(record))
     mockTime.sleep(1)
-    val newSegment = log.roll()
+    val newSegment = log.roll(0L)
 
     // simulate the directory is renamed concurrently
     doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir
@@ -701,14 +701,14 @@ class LocalLogTest {
                                  time,
                                  config.initFileSize,
                                  config.preallocate))
-    new LocalLog(_dir = dir,
-                 config = config,
-                 segments = segments,
-                 recoveryPoint = recoveryPoint,
-                 nextOffsetMetadata = nextOffsetMetadata,
-                 scheduler = scheduler,
-                 time = time,
-                 topicPartition = topicPartition,
-                 logDirFailureChannel = logDirFailureChannel)
+    new LocalLog(dir,
+                 config,
+                 segments,
+                 recoveryPoint,
+                 nextOffsetMetadata,
+                 scheduler,
+                 time,
+                 topicPartition,
+                 logDirFailureChannel)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 68abd08f22f..5b3cc00732d 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogDirFailureChannel, LogLoader, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, 
LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 9086626706c..81600b0f201 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, 
ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogFileUtils, LogLoader, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, 
ProducerStateManagerConfig}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index b72c1c555db..e7e99852c53 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, 
EpochEntry, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, 
LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, 
EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, 
LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
 import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, 
assertFalse, assertNotEquals, assertThrows, assertTrue}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0642dfebd19..ddc9cac8d81 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.server
 import com.yammer.metrics.core.{Gauge, Meter, Timer}
 import kafka.cluster.PartitionTest.MockPartitionListener
 import kafka.cluster.Partition
-import kafka.log.{LocalLog, LogManager, LogManagerTest, UnifiedLog}
+import kafka.log.{LogManager, LogManagerTest, UnifiedLog}
 import kafka.log.remote.RemoteLogManager
 import 
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
 import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
@@ -70,7 +70,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
FetchParams, FetchPa
 import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, PartitionMetadataFile}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, 
LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, 
ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, 
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, 
LogOffsetSnapshot, LogSegments, LogStartOffsetIncrementReason, 
ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, 
VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala 
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index fa67126d275..3aaa58641bc 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -19,11 +19,11 @@ package kafka.utils
 import java.util.Properties
 import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, 
TimeUnit}
-import kafka.log.{LocalLog, UnifiedLog}
+import kafka.log.UnifiedLog
 import kafka.utils.TestUtils.retry
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{LogConfig, 
LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, 
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, 
LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, 
ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 0dc824d1f43..b5a3c9bd96e 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -163,7 +163,6 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
              Given that, this bug pattern doesn't make sense for Scala code. 
-->
         <Or>
             <Class name="kafka.log.Log"/>
-            <Class name="kafka.log.LocalLog$"/>
         </Or>
         <Bug pattern="REC_CATCH_EXCEPTION"/>
     </Match>
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
index a5436f448db..d2cd71addad 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
@@ -16,10 +16,16 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.record.FileLogInputStream;
 import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.Scheduler;
@@ -29,25 +35,774 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.require;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.DELETE_DIR_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.FUTURE_DIR_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.STRAY_DIR_SUFFIX;
 import static 
org.apache.kafka.storage.internals.log.LogFileUtils.SWAP_FILE_SUFFIX;
 import static org.apache.kafka.storage.internals.log.LogFileUtils.isLogFile;
 
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ */
 public class LocalLog {
 
     private static final Logger LOG = LoggerFactory.getLogger(LocalLog.class);
 
+    public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.DELETE_DIR_SUFFIX);
+    public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.FUTURE_DIR_SUFFIX);
+    public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + LogFileUtils.STRAY_DIR_SUFFIX);
+    public static final long UNKNOWN_OFFSET = -1L;
+
+    // Last time the log was flushed
+    private final AtomicLong lastFlushedTime;
+    private final String logIdent;
+    private final LogSegments segments;
+    private final Scheduler scheduler;
+    private final Time time;
+    private final TopicPartition topicPartition;
+    private final LogDirFailureChannel logDirFailureChannel;
+    private final Logger logger;
+
+    private volatile LogOffsetMetadata nextOffsetMetadata;
+    // The memory mapped buffer for index files of this log will be closed 
with either delete() or closeHandlers()
+    // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log.
+    private volatile boolean isMemoryMappedBufferClosed = false;
+    // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+    private volatile String parentDir;
+    private volatile LogConfig config;
+    private volatile long recoveryPoint;
+    private File dir;
+
+    /**
+     * @param dir The directory in which log segments are created.
+     * @param config The log configuration settings
+     * @param segments The non-empty log segments recovered from disk
+     * @param recoveryPoint The offset at which to begin the next recovery 
i.e. the first offset which has not been flushed to disk
+     * @param nextOffsetMetadata The offset where the next message could be 
appended
+     * @param scheduler The thread pool scheduler used for background actions
+     * @param time The time instance used for checking the clock
+     * @param topicPartition The topic partition associated with this log
+     * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+     */
+    public LocalLog(File dir,
+                    LogConfig config,
+                    LogSegments segments,
+                    long recoveryPoint,
+                    LogOffsetMetadata nextOffsetMetadata,
+                    Scheduler scheduler,
+                    Time time,
+                    TopicPartition topicPartition,
+                    LogDirFailureChannel logDirFailureChannel) {
+        this.dir = dir;
+        this.config = config;
+        this.segments = segments;
+        this.recoveryPoint = recoveryPoint;
+        this.nextOffsetMetadata = nextOffsetMetadata;
+        this.scheduler = scheduler;
+        this.time = time;
+        this.topicPartition = topicPartition;
+        this.logDirFailureChannel = logDirFailureChannel;
+        this.logIdent = "[LocalLog partition=" + topicPartition + ", dir=" + 
dir + "] ";
+        this.logger = new LogContext(logIdent).logger(LocalLog.class);
+        // Last time the log was flushed
+        this.lastFlushedTime = new AtomicLong(time.milliseconds());
+        this.parentDir = dir.getParent();
+    }
+
+    public File dir() {
+        return dir;
+    }
+
+    public Logger logger() {
+        return logger;
+    }
+
+    public LogConfig config() {
+        return config;
+    }
+
+    public LogSegments segments() {
+        return segments;
+    }
+
+    public Scheduler scheduler() {
+        return scheduler;
+    }
+
+    public LogOffsetMetadata nextOffsetMetadata() {
+        return nextOffsetMetadata;
+    }
+
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    public LogDirFailureChannel logDirFailureChannel() {
+        return logDirFailureChannel;
+    }
+
+    public long recoveryPoint() {
+        return recoveryPoint;
+    }
+
+    public Time time() {
+        return time;
+    }
+
+    public String name() {
+        return dir.getName();
+    }
+
+    public String parentDir() {
+        return parentDir;
+    }
+
+    public File parentDirFile() {
+        return new File(parentDir);
+    }
+
+    public boolean isFuture() {
+        return dir.getName().endsWith(LogFileUtils.FUTURE_DIR_SUFFIX);
+    }
+
+    private <T> T maybeHandleIOException(Supplier<String> errorMsgSupplier, 
StorageAction<T, IOException> function) {
+        return maybeHandleIOException(logDirFailureChannel, parentDir, 
errorMsgSupplier, function);
+    }
+
+    /**
+     * Rename the directory of the log
+     * @param name the new dir name
+     * @throws KafkaStorageException if rename fails
+     */
+    public boolean renameDir(String name) {
+        return maybeHandleIOException(
+            () -> "Error while renaming dir for " + topicPartition + " in log 
dir " +  dir.getParent(),
+            () -> {
+                File renamedDir = new File(dir.getParent(), name);
+                Utils.atomicMoveWithFallback(dir.toPath(), 
renamedDir.toPath());
+                if (!renamedDir.equals(dir)) {
+                    dir = renamedDir;
+                    parentDir = renamedDir.getParent();
+                    segments.updateParentDir(renamedDir);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        );
+    }
+
+    /**
+     * Update the existing configuration to the new provided configuration.
+     * @param newConfig the new configuration to be updated to
+     */
+    public void updateConfig(LogConfig newConfig) {
+        LogConfig oldConfig = config;
+        config = newConfig;
+        RecordVersion oldRecordVersion = oldConfig.recordVersion();
+        RecordVersion newRecordVersion = newConfig.recordVersion();
+        if (newRecordVersion.precedes(oldRecordVersion)) {
+            logger.warn("Record format version has been downgraded from {} to 
{}.", oldRecordVersion, newRecordVersion);
+        }
+    }
+
+    public void checkIfMemoryMappedBufferClosed() {
+        if (isMemoryMappedBufferClosed) {
+            throw new KafkaStorageException("The memory mapped buffer for log 
of " + topicPartition + " is already closed");
+        }
+    }
+
+    public void updateRecoveryPoint(long newRecoveryPoint) {
+        recoveryPoint = newRecoveryPoint;
+    }
+
+    /**
+     * Update recoveryPoint to provided offset and mark the log as flushed, if 
the offset is greater
+     * than the existing recoveryPoint.
+     *
+     * @param offset the offset to be updated
+     */
+    public void markFlushed(long offset) {
+        checkIfMemoryMappedBufferClosed();
+        if (offset > recoveryPoint) {
+            updateRecoveryPoint(offset);
+            lastFlushedTime.set(time.milliseconds());
+        }
+    }
+
+    /**
+     * The number of messages appended to the log since the last flush
+     */
+    public long unflushedMessages() {
+        return logEndOffset() - recoveryPoint;
+    }
+
+    /**
+     * Flush local log segments for all offsets up to offset-1.
+     * Does not update the recovery point.
+     *
+     * @param offset The offset to flush up to (non-inclusive)
+     */
+    public void flush(long offset) throws IOException {
+        long currentRecoveryPoint = recoveryPoint;
+        if (currentRecoveryPoint <= offset) {
+            Collection<LogSegment> segmentsToFlush = 
segments.values(currentRecoveryPoint, offset);
+            for (LogSegment logSegment : segmentsToFlush) {
+                logSegment.flush();
+            }
+            // If there are any new segments, we need to flush the parent 
directory for crash consistency.
+            if (segmentsToFlush.stream().anyMatch(s -> s.baseOffset() >= 
currentRecoveryPoint)) {
+                // The directory might be renamed concurrently for topic 
deletion, which may cause NoSuchFileException here.
+                // Since the directory is to be deleted anyways, we just 
swallow NoSuchFileException and let it go.
+                Utils.flushDirIfExists(dir.toPath());
+            }
+        }
+    }
+
+    /**
+     * The time this log is last known to have been fully flushed to disk
+     */
+    public long lastFlushTime() {
+        return lastFlushedTime.get();
+    }
+
+    /**
+     * The offset metadata of the next message that will be appended to the log
+     */
+    public LogOffsetMetadata logEndOffsetMetadata() {
+        return nextOffsetMetadata;
+    }
+
+    /**
+     * The offset of the next message that will be appended to the log
+     */
+    public long logEndOffset() {
+        return nextOffsetMetadata.messageOffset;
+    }
+
+    /**
+     * Update end offset of the log, and update the recoveryPoint.
+     *
+     * @param endOffset the new end offset of the log
+     */
+    public void updateLogEndOffset(long endOffset) {
+        nextOffsetMetadata = new LogOffsetMetadata(endOffset, 
segments.activeSegment().baseOffset(), segments.activeSegment().size());
+        if (recoveryPoint > endOffset) {
+            updateRecoveryPoint(endOffset);
+        }
+    }
+
+    /**
+     * Close file handlers used by log but don't write to disk.
+     * This is called if the log directory is offline.
+     */
+    public void closeHandlers() {
+        segments.closeHandlers();
+        isMemoryMappedBufferClosed = true;
+    }
+
+    /**
+     * Closes the segments of the log.
+     */
+    public void close() {
+        maybeHandleIOException(
+            () -> "Error while renaming dir for " + topicPartition + " in dir 
" + dir.getParent(),
+            () -> {
+                checkIfMemoryMappedBufferClosed();
+                segments.close();
+                return null;
+            }
+        );
+    }
+
+    /**
+     * Completely delete this log directory with no delay.
+     */
+    public void deleteEmptyDir() {
+        maybeHandleIOException(
+            () -> "Error while deleting dir for " + topicPartition + " in dir 
" + dir.getParent(),
+            () -> {
+                if (!segments.isEmpty()) {
+                    throw new IllegalStateException("Can not delete directory 
when " + segments.numberOfSegments() + " segments are still present");
+                }
+                if (!isMemoryMappedBufferClosed) {
+                    throw new IllegalStateException("Can not delete directory 
when memory mapped buffer for log of " + topicPartition + " is still open.");
+                }
+                Utils.delete(dir);
+                return null;
+            }
+        );
+    }
+
+    /**
+     * Completely delete all segments with no delay.
+     * @return the deleted segments
+     */
+    public List<LogSegment> deleteAllSegments() {
+        return maybeHandleIOException(
+            () -> "Error while deleting all segments for $topicPartition in 
dir ${dir.getParent}",
+            () -> {
+                List<LogSegment> deletableSegments = new 
ArrayList<>(segments.values());
+                removeAndDeleteSegments(
+                        segments.values(),
+                        false,
+                        toDelete -> logger.info("Deleting segments as the log 
has been deleted: {}", toDelete.stream()
+                            .map(LogSegment::toString)
+                            .collect(Collectors.joining(", "))));
+                isMemoryMappedBufferClosed = true;
+                return deletableSegments;
+            }
+        );
+    }
+
+    /**
+     * This method deletes the given log segments by doing the following for 
each of them:
+     * <ul>
+     *  <li>It removes the segment from the segment map so that it will no 
longer be used for reads.
+     *  <li>It renames the index and log files by appending .deleted to the 
respective file name
+     *  <li>It can either schedule an asynchronous delete operation to occur 
in the future or perform the deletion synchronously
+     * </ul>
+     * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
+     * physically deleting a file while it is being read.
+     * This method does not convert IOException to KafkaStorageException, the 
immediate caller
+     * is expected to catch and handle IOException.
+     *
+     * @param segmentsToDelete The log segments to schedule for deletion
+     * @param asyncDelete Whether the segment files should be deleted 
asynchronously
+     * @param reason The reason for the segment deletion
+     */
+    public void removeAndDeleteSegments(Collection<LogSegment> 
segmentsToDelete,
+                                         boolean asyncDelete,
+                                         SegmentDeletionReason reason) throws 
IOException {
+        if (!segmentsToDelete.isEmpty()) {
+            // Most callers hold an iterator into the `segments` collection 
and `removeAndDeleteSegment` mutates it by
+            // removing the deleted segment, we should force materialization 
of the iterator here, so that results of the
+            // iteration remain valid and deterministic. We should also pass 
only the materialized view of the
+            // iterator to the logic that actually deletes the segments.
+            List<LogSegment> toDelete = new ArrayList<>(segmentsToDelete);
+            reason.logReason(toDelete);
+            toDelete.forEach(segment -> segments.remove(segment.baseOffset()));
+            deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, 
config, scheduler, logDirFailureChannel, logIdent);
+        }
+    }
+
+    /**
+     * This method deletes the given segment and creates a new segment with 
the given new base offset. It ensures an
+     * active segment exists in the log at all times during this process.
+     * <br/>
+     * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
+     * physically deleting a file while it is being read.
+     * <br/>
+     * This method does not convert IOException to KafkaStorageException, the 
immediate caller
+     * is expected to catch and handle IOException.
+     *
+     * @param newOffset The base offset of the new segment
+     * @param segmentToDelete The old active segment to schedule for deletion
+     * @param asyncDelete Whether the segment files should be deleted 
asynchronously
+     * @param reason The reason for the segment deletion
+     */
+    public LogSegment createAndDeleteSegment(long newOffset,
+                                              LogSegment segmentToDelete,
+                                              boolean asyncDelete,
+                                              SegmentDeletionReason reason) 
throws IOException {
+        if (newOffset == segmentToDelete.baseOffset()) {
+            segmentToDelete.changeFileSuffixes("", 
LogFileUtils.DELETED_FILE_SUFFIX);
+        }
+        LogSegment newSegment = LogSegment.open(dir,
+                newOffset,
+                config,
+                time,
+                config.initFileSize(),
+                config.preallocate);
+        segments.add(newSegment);
+
+        reason.logReason(singletonList(segmentToDelete));
+        if (newOffset != segmentToDelete.baseOffset()) {
+            segments.remove(segmentToDelete.baseOffset());
+        }
+        deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logIdent);
+        return newSegment;
+    }
+
+    /**
+     * Given a message offset, find its corresponding offset metadata in the 
log.
+     * If the message offset is out of range, throw an 
OffsetOutOfRangeException
+     */
+    public LogOffsetMetadata convertToOffsetMetadataOrThrow(long offset) 
throws IOException {
+        FetchDataInfo fetchDataInfo = read(offset, 1, false, 
nextOffsetMetadata, false);
+        return fetchDataInfo.fetchOffsetMetadata;
+    }
+
+    /**
+     * Read messages from the log.
+     *
+     * @param startOffset The offset to begin reading at
+     * @param maxLength The maximum number of bytes to read
+     * @param minOneMessage If this is true, the first message will be 
returned even if it exceeds `maxLength` (if one exists)
+     * @param maxOffsetMetadata The metadata of the maximum offset to be 
fetched
+     * @param includeAbortedTxns If true, aborted transactions are included
+     * @throws OffsetOutOfRangeException If startOffset is beyond the log end 
offset
+     * @return The fetch data information including fetch starting offset 
metadata and messages read.
+     */
+    public FetchDataInfo read(long startOffset,
+                       int maxLength,
+                       boolean minOneMessage,
+                       LogOffsetMetadata maxOffsetMetadata,
+                       boolean includeAbortedTxns) throws IOException {
+        return maybeHandleIOException(
+                () -> "Exception while reading from " + topicPartition + " in 
dir " + dir.getParent(),
+                () -> {
+                    logger.trace("Reading maximum $maxLength bytes at offset 
{} from log with total length {} bytes",
+                            startOffset, segments.sizeInBytes());
+
+                    LogOffsetMetadata endOffsetMetadata = nextOffsetMetadata;
+                    long endOffset = endOffsetMetadata.messageOffset;
+                    Optional<LogSegment> segmentOpt = 
segments.floorSegment(startOffset);
+                    // return error on attempt to read beyond the log end 
offset
+                    if (startOffset > endOffset || segmentOpt.isEmpty()) {
+                        throw new OffsetOutOfRangeException("Received request 
for offset " + startOffset + " for partition " + topicPartition + ", " +
+                                "but we only have log segments upto " + 
endOffset + ".");
+                    }
+                    if (startOffset == maxOffsetMetadata.messageOffset) return 
emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns);
+                    if (startOffset > maxOffsetMetadata.messageOffset) return 
emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns);
+
+                    // Do the read on the segment with a base offset less than 
the target offset
+                    // but if that segment doesn't contain any messages with 
an offset greater than that
+                    // continue to read from successive segments until we get 
some messages or we reach the end of the log
+                    FetchDataInfo fetchDataInfo = null;
+                    while (fetchDataInfo == null && segmentOpt.isPresent()) {
+                        LogSegment segment = segmentOpt.get();
+                        long baseOffset = segment.baseOffset();
+
+                        // 1. If `maxOffsetMetadata#segmentBaseOffset < 
segment#baseOffset`, then return maxPosition as empty.
+                        // 2. Use the max-offset position if it is on this 
segment; otherwise, the segment size is the limit.
+                        // 3. When maxOffsetMetadata is message-offset-only, 
then we don't know the relativePositionInSegment so
+                        //    return maxPosition as empty to avoid reading 
beyond the max-offset
+                        Optional<Long> maxPositionOpt;
+                        if (segment.baseOffset() < 
maxOffsetMetadata.segmentBaseOffset)
+                            maxPositionOpt = Optional.of((long) 
segment.size());
+                        else if (segment.baseOffset() == 
maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
+                            maxPositionOpt = Optional.of((long) 
maxOffsetMetadata.relativePositionInSegment);
+                        else
+                            maxPositionOpt = Optional.empty();
+
+                        fetchDataInfo = segment.read(startOffset, maxLength, 
maxPositionOpt, minOneMessage);
+                        if (fetchDataInfo != null) {
+                            if (includeAbortedTxns) {
+                                fetchDataInfo = 
addAbortedTransactions(startOffset, segment, fetchDataInfo);
+                            }
+                        } else {
+                            segmentOpt = segments.higherSegment(baseOffset);
+                        }
+                    }
+                    if (fetchDataInfo != null) {
+                        return fetchDataInfo;
+                    } else {
+                        // okay we are beyond the end of the last segment with 
no data fetched although the start offset is in range,
+                        // this can happen when all messages with offset 
larger than start offsets have been deleted.
+                        // In this case, we will return the empty set with log 
end offset metadata
+                        return new FetchDataInfo(nextOffsetMetadata, 
MemoryRecords.EMPTY);
+                    }
+                }
+        );
+    }
+
+    public void append(long lastOffset, long largestTimestamp, long 
shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException {
+        segments.activeSegment().append(lastOffset, largestTimestamp, 
shallowOffsetOfMaxTimestamp, records);
+        updateLogEndOffset(lastOffset + 1);
+    }
+
+    FetchDataInfo addAbortedTransactions(long startOffset, LogSegment segment, 
FetchDataInfo fetchInfo) throws IOException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new OffsetPosition(
+                fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+        long upperBoundOffset = 
segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .orElse(segments.higherSegment(segment.baseOffset())
+                        .map(LogSegment::baseOffset).orElse(logEndOffset()));
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = new 
ArrayList<>();
+        Consumer<List<AbortedTxn>> accumulator = abortedTxns -> {
+            for (AbortedTxn abortedTxn : abortedTxns)
+                abortedTransactions.add(abortedTxn.asAbortedTransaction());
+        };
+        collectAbortedTransactions(startOffset, upperBoundOffset, segment, 
accumulator);
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset, long 
upperBoundOffset,
+                                            LogSegment startingSegment,
+                                            Consumer<List<AbortedTxn>> 
accumulator) {
+        Iterator<LogSegment> higherSegments = 
segments.higherSegments(startingSegment.baseOffset()).iterator();
+        Optional<LogSegment> segmentEntryOpt = Optional.of(startingSegment);
+        while (segmentEntryOpt.isPresent()) {
+            LogSegment segment = segmentEntryOpt.get();
+            TxnIndexSearchResult searchResult = 
segment.collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (searchResult.isComplete) return;
+            segmentEntryOpt = nextItem(higherSegments);
+        }
+    }
+
+    public List<AbortedTxn> collectAbortedTransactions(long logStartOffset, 
long baseOffset, long upperBoundOffset) {
+        Optional<LogSegment> segmentEntry = segments.floorSegment(baseOffset);
+        List<AbortedTxn> allAbortedTxns = new ArrayList<>();
+        segmentEntry.ifPresent(logSegment -> 
collectAbortedTransactions(logStartOffset, upperBoundOffset, logSegment, 
allAbortedTxns::addAll));
+        return allAbortedTxns;
+    }
+
+    /**
+     * Roll the log over to a new active segment starting with the current 
logEndOffset.
+     * This will trim the index to the exact size of the number of entries it 
currently contains.
+     *
+     * @param expectedNextOffset The expected next offset after the segment is 
rolled
+     *
+     * @return The newly rolled segment
+     */
+    public LogSegment roll(Long expectedNextOffset) {
+        return maybeHandleIOException(
+            () -> "Error while rolling log segment for " + topicPartition + " 
in dir " + dir.getParent(),
+            () -> {
+                long start = time.hiResClockMs();
+                checkIfMemoryMappedBufferClosed();
+                long newOffset = Math.max(expectedNextOffset, logEndOffset());
+                File logFile = LogFileUtils.logFile(dir, newOffset, "");
+                LogSegment activeSegment = segments.activeSegment();
+                if (segments.contains(newOffset)) {
+                    // segment with the same base offset already exists and 
loaded
+                    if (activeSegment.baseOffset() == newOffset && 
activeSegment.size() == 0) {
+                        // We have seen this happen (see KAFKA-6388) after 
shouldRoll() returns true for an
+                        // active segment of size zero because of one of the 
indexes is "full" (due to _maxEntries == 0).
+                        logger.warn("Trying to roll a new log segment with 
start offset {}=max(provided offset = {}, LEO = {}) " +
+                                    "while it already exists and is active 
with size 0. Size of time index: {}, size of offset index: {}.",
+                                newOffset, expectedNextOffset, logEndOffset(), 
activeSegment.timeIndex().entries(), activeSegment.offsetIndex().entries());
+                        LogSegment newSegment = createAndDeleteSegment(
+                                newOffset,
+                                activeSegment,
+                                true,
+                                toDelete -> logger.info("Deleting segments as 
part of log roll: {}", toDelete.stream()
+                                    .map(LogSegment::toString)
+                                    .collect(Collectors.joining(", "))));
+                        updateLogEndOffset(nextOffsetMetadata.messageOffset);
+                        logger.info("Rolled new log segment at offset {} in {} 
ms.", newOffset, time.hiResClockMs() - start);
+                        return newSegment;
+                    } else {
+                        throw new KafkaException("Trying to roll a new log 
segment for topic partition " + topicPartition + " with start offset " + 
newOffset +
+                                " =max(provided offset = " + 
expectedNextOffset + ", LEO = " + logEndOffset() + ") while it already exists. 
Existing " +
+                                "segment is " + segments.get(newOffset) + ".");
+                    }
+                } else if (!segments.isEmpty() && newOffset < 
activeSegment.baseOffset()) {
+                    throw new KafkaException(
+                            "Trying to roll a new log segment for topic 
partition " + topicPartition + " with " +
+                            "start offset " + newOffset + " =max(provided 
offset = " + expectedNextOffset + ", LEO = " + logEndOffset() + ") lower than 
start offset of the active segment " + activeSegment);
+                } else {
+                    File offsetIdxFile = LogFileUtils.offsetIndexFile(dir, 
newOffset);
+                    File timeIdxFile = LogFileUtils.timeIndexFile(dir, 
newOffset);
+                    File txnIdxFile = LogFileUtils.transactionIndexFile(dir, 
newOffset);
+                    for (File file : Arrays.asList(logFile, offsetIdxFile, 
timeIdxFile, txnIdxFile)) {
+                        if (file.exists()) {
+                            logger.warn("Newly rolled segment file {} already 
exists; deleting it first", file.getAbsolutePath());
+                            Files.delete(file.toPath());
+                        }
+                    }
+                    if (segments.lastSegment().isPresent()) {
+                        segments.lastSegment().get().onBecomeInactiveSegment();
+                    }
+                }
+                LogSegment newSegment = LogSegment.open(dir,
+                        newOffset,
+                        config,
+                        time,
+                        config.initFileSize(),
+                        config.preallocate);
+                segments.add(newSegment);
+
+                // We need to update the segment base offset and append 
position data of the metadata when log rolls.
+                // The next offset should not change.
+                updateLogEndOffset(nextOffsetMetadata.messageOffset);
+                logger.info("Rolled new log segment at offset {} in {} ms.", 
newOffset, time.hiResClockMs() - start);
+                return newSegment;
+            }
+        );
+    }
+
+    /**
+     *  Delete all data in the local log and start at the new offset.
+     *
+     *  @param newOffset The new offset to start the log with
+     *  @return the list of segments that were scheduled for deletion
+     */
+    public List<LogSegment> truncateFullyAndStartAt(long newOffset) {
+        return maybeHandleIOException(
+            () -> "Error while truncating the entire log for " + 
topicPartition + " in dir " + dir.getParent(),
+            () -> {
+                logger.debug("Truncate and start at offset {}", newOffset);
+                checkIfMemoryMappedBufferClosed();
+                List<LogSegment> segmentsToDelete = new 
ArrayList<>(segments.values());
+
+                if (!segmentsToDelete.isEmpty()) {
+                    removeAndDeleteSegments(segmentsToDelete.subList(0, 
segmentsToDelete.size() - 1), true, new LogTruncation(logger));
+                    // Use createAndDeleteSegment() to create new segment 
first and then delete the old last segment to prevent missing
+                    // active segment during the deletion process
+                    createAndDeleteSegment(newOffset, 
segmentsToDelete.get(segmentsToDelete.size() - 1), true, new 
LogTruncation(logger));
+                }
+                updateLogEndOffset(newOffset);
+                return segmentsToDelete;
+            }
+        );
+    }
+
+    /**
+     * Truncate this log so that it ends with the greatest offset < 
targetOffset.
+     *
+     * @param targetOffset The offset to truncate to, an upper bound on all 
offsets in the log after truncation is complete.
+     * @return the list of segments that were scheduled for deletion
+     */
+    public Collection<LogSegment> truncateTo(long targetOffset) throws 
IOException {
+        Collection<LogSegment> deletableSegments = segments.filter(segment -> 
segment.baseOffset() > targetOffset);
+        removeAndDeleteSegments(deletableSegments, true, new 
LogTruncation(logger));
+        segments.activeSegment().truncateTo(targetOffset);
+        updateLogEndOffset(targetOffset);
+        return deletableSegments;
+    }
+
+    /**
+     * Return a directory name to rename the log directory to for async 
deletion.
+     * The name will be in the following format: 
"topic-partitionId.uniqueId-delete".
+     * If the topic name is too long, it will be truncated to prevent the 
total name
+     * from exceeding 255 characters.
+     */
+    public static String logDeleteDirName(TopicPartition topicPartition) {
+        return logDirNameWithSuffixCappedLength(topicPartition, 
LogFileUtils.DELETE_DIR_SUFFIX);
+    }
+
+    /**
+     * Return a directory name to rename the log directory to for stray 
partition deletion.
+     * The name will be in the following format: 
"topic-partitionId.uniqueId-stray".
+     * If the topic name is too long, it will be truncated to prevent the 
total name
+     * from exceeding 255 characters.
+     */
+    public static String logStrayDirName(TopicPartition topicPartition) {
+        return logDirNameWithSuffixCappedLength(topicPartition, 
LogFileUtils.STRAY_DIR_SUFFIX);
+    }
+
+    /**
+     * Return a future directory name for the given topic partition. The name 
will be in the following
+     * format: topic-partition.uniqueId-future where topic, partition and 
uniqueId are variables.
+     */
+    public static String logFutureDirName(TopicPartition topicPartition) {
+        return logDirNameWithSuffix(topicPartition, 
LogFileUtils.FUTURE_DIR_SUFFIX);
+    }
+
+    /**
+     * Return a new directory name in the following format: 
"${topic}-${partitionId}.${uniqueId}${suffix}".
+     * If the topic name is too long, it will be truncated to prevent the 
total name
+     * from exceeding 255 characters.
+     */
+    private static String logDirNameWithSuffixCappedLength(TopicPartition 
topicPartition, String suffix) {
+        String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+        String fullSuffix = "-" + topicPartition.partition() + "." + uniqueId 
+ suffix;
+        int prefixLength = Math.min(topicPartition.topic().length(), 255 - 
fullSuffix.length());
+        return topicPartition.topic().substring(0, prefixLength) + fullSuffix;
+    }
+
+    private static String logDirNameWithSuffix(TopicPartition topicPartition, 
String suffix) {
+        String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+        return logDirName(topicPartition) + "." + uniqueId + suffix;
+    }
+
+    /**
+     * Return a directory name for the given topic partition. The name will be 
in the following
+     * format: topic-partition where topic, partition are variables.
+     */
+    public static String logDirName(TopicPartition topicPartition) {
+        return topicPartition.topic() + "-" + topicPartition.partition();
+    }
+
+    private static KafkaException exception(File dir) throws IOException {
+        return new KafkaException("Found directory " + dir.getCanonicalPath() 
+ ", '" + dir.getName() + "' is not in the form of " +
+                "topic-partition or topic-partition.uniqueId-delete (if marked 
for deletion).\n" +
+                "Kafka's log directories (and children) should only contain 
Kafka topic data.");
+    }
+
+    /**
+     * Parse the topic and partition out of the directory name of a log
+     */
+    public static TopicPartition parseTopicPartitionName(File dir) throws 
IOException {
+        if (dir == null) {
+            throw new KafkaException("dir should not be null");
+        }
+        String dirName = dir.getName();
+        if (dirName.isEmpty() || !dirName.contains("-")) {
+            throw exception(dir);
+        }
+        if (dirName.endsWith(DELETE_DIR_SUFFIX) && 
!DELETE_DIR_PATTERN.matcher(dirName).matches() ||
+                dirName.endsWith(FUTURE_DIR_SUFFIX) && 
!FUTURE_DIR_PATTERN.matcher(dirName).matches() ||
+                dirName.endsWith(STRAY_DIR_SUFFIX) && 
!STRAY_DIR_PATTERN.matcher(dirName).matches()) {
+            throw exception(dir);
+        }
+        String name = (dirName.endsWith(DELETE_DIR_SUFFIX) || 
dirName.endsWith(FUTURE_DIR_SUFFIX) || dirName.endsWith(STRAY_DIR_SUFFIX))
+            ? dirName.substring(0, dirName.lastIndexOf('.'))
+            : dirName;
+
+        int index = name.lastIndexOf('-');
+        String topic = name.substring(0, index);
+        String partitionString = name.substring(index + 1);
+        if (topic.isEmpty() || partitionString.isEmpty()) {
+            throw exception(dir);
+        }
+        try {
+            return new TopicPartition(topic, 
Integer.parseInt(partitionString));
+        } catch (NumberFormatException nfe) {
+            throw exception(dir);
+        }
+    }
+
+    /**
+     * Wraps the value of iterator.next() in an Optional instance.
+     *
+     * @param iterator given iterator to iterate over
+     * @return if a next element exists, Optional#empty otherwise.
+     * @param <T> the type of object held within the iterator
+     */
+    public static <T> Optional<T> nextItem(Iterator<T> iterator) {
+        return iterator.hasNext() ? Optional.of(iterator.next()) : 
Optional.empty();
+    }
+
+    private static FetchDataInfo emptyFetchDataInfo(LogOffsetMetadata 
fetchOffsetMetadata, boolean includeAbortedTxns) {
+        Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions = includeAbortedTxns
+            ? Optional.of(Collections.emptyList())
+            : Optional.empty();
+        return new FetchDataInfo(fetchOffsetMetadata, MemoryRecords.EMPTY, 
false, abortedTransactions);
+    }
+
     /**
      * Invokes the provided function and handles any IOException raised by the 
function by marking the
      * provided directory offline.
@@ -171,19 +926,16 @@ public class LocalLog {
         try {
             int position = 0;
             FileRecords sourceRecords = segment.log();
-
             while (position < sourceRecords.sizeInBytes()) {
                 FileLogInputStream.FileChannelRecordBatch firstBatch = 
sourceRecords.batchesFrom(position).iterator().next();
                 LogSegment newSegment = createNewCleanedSegment(dir, config, 
firstBatch.baseOffset());
                 newSegments.add(newSegment);
-
                 int bytesAppended = newSegment.appendFromFile(sourceRecords, 
position);
-                if (bytesAppended == 0)
+                if (bytesAppended == 0) {
                     throw new IllegalStateException("Failed to append records 
from position " + position + " in " + segment);
-
+                }
                 position += bytesAppended;
             }
-
             // prepare new segments
             int totalSizeOfNewSegments = 0;
             for (LogSegment splitSegment : newSegments) {
@@ -198,7 +950,7 @@ public class LocalLog {
             }
             // replace old segment with new ones
             LOG.info("{}Replacing overflowed segment $segment with split 
segments {}", logPrefix, newSegments);
-            List<LogSegment> deletedSegments = 
replaceSegments(existingSegments, newSegments, 
Collections.singletonList(segment),
+            List<LogSegment> deletedSegments = 
replaceSegments(existingSegments, newSegments, singletonList(segment),
                     dir, topicPartition, config, scheduler, 
logDirFailureChannel, logPrefix, false);
             return new SplitSegmentResult(deletedSegments, newSegments);
         } catch (Exception e) {
@@ -269,7 +1021,7 @@ public class LocalLog {
                 .sorted(Comparator.comparingLong(LogSegment::baseOffset))
                 .collect(Collectors.toList());
 
-        // need to do this in two phases to be crash safe AND do the delete 
asynchronously
+        // need to do this in two phases to be crash safe AND do the deletion 
asynchronously
         // if we crash in the middle of this we complete the swap in 
loadSegments()
         List<LogSegment> reversedSegmentsList = new 
ArrayList<>(sortedNewSegments);
         Collections.reverse(reversedSegmentsList);
@@ -290,7 +1042,7 @@ public class LocalLog {
                 existingSegments.remove(segment.baseOffset());
             }
             deleteSegmentFiles(
-                    Collections.singletonList(segment),
+                    singletonList(segment),
                     true,
                     dir,
                     topicPartition,
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
index fd873f103ee..dd62b90f7f6 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
@@ -60,6 +60,12 @@ public final class LogFileUtils {
     /** Suffix of a directory that is scheduled to be deleted */
     public static final String DELETE_DIR_SUFFIX = "-delete";
 
+    /** Suffix of a directory that is used for future partition */
+    public static final String FUTURE_DIR_SUFFIX = "-future";
+
+    /** Suffix of a directory that is used for stray partition */
+    public static final String STRAY_DIR_SUFFIX = "-stray";
+
     private LogFileUtils() {
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java
new file mode 100644
index 00000000000..1a819676b69
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogTruncation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LogTruncation implements SegmentDeletionReason {
+
+    private final Logger logger;
+
+    public LogTruncation(Logger logger) {
+        this.logger = logger;
+    }
+
+    @Override
+    public void logReason(List<LogSegment> toDelete) {
+        logger.info("Deleting segments as part of log truncation: {}", 
toDelete.stream()
+                .map(LogSegment::toString)
+                .collect(Collectors.joining(", ")));
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java
new file mode 100644
index 00000000000..464ade51ee8
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {
+    void logReason(List<LogSegment> toDelete);
+}

Reply via email to