This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7710d1c9511 KAFKA-14487: Move LogManager static methods/fields to 
storage module (#19302)
7710d1c9511 is described below

commit 7710d1c9511cc3e02ba2ecc8f48674f01ed05ae2
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Mon Apr 21 12:03:30 2025 +0200

    KAFKA-14487: Move LogManager static methods/fields to storage module 
(#19302)
    
    Move the static fields/methods
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-storage.xml              |   2 +
 core/src/main/scala/kafka/log/LogManager.scala     |  73 +--------
 core/src/main/scala/kafka/raft/RaftManager.scala   |   5 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |   3 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 166 ++-----------------
 .../scala/unit/kafka/raft/RaftManagerTest.scala    |   6 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |   4 +-
 .../kafka/storage/internals/log/LogManager.java    |  96 +++++++++++
 .../storage/internals/log/LogManagerTest.java      | 175 +++++++++++++++++++++
 ...FetchFromLeaderWithCorruptedCheckpointTest.java |   4 +-
 11 files changed, 301 insertions(+), 234 deletions(-)

diff --git a/build.gradle b/build.gradle
index 2e35057165c..50eec676569 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2230,6 +2230,7 @@ project(':storage') {
   }
 
   dependencies {
+    implementation project(':metadata')
     implementation project(':storage:storage-api')
     implementation project(':server-common')
     implementation project(':clients')
diff --git a/checkstyle/import-control-storage.xml 
b/checkstyle/import-control-storage.xml
index 639cb6dc1d0..6628de8d453 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -94,6 +94,8 @@
         <allow pkg="com.yammer.metrics.core" />
         <allow pkg="org.apache.kafka.common" />
         <allow pkg="org.apache.kafka.config" />
+        <allow pkg="org.apache.kafka.image" />
+        <allow pkg="org.apache.kafka.metadata" />
         <allow pkg="org.apache.kafka.server"/>
         <allow pkg="org.apache.kafka.storage.internals"/>
         <allow pkg="org.apache.kafka.storage.log.metrics"/>
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 506e46e6ce5..e555f1b0b46 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,7 +23,6 @@ import java.nio.file.{Files, NoSuchFileException}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
 import kafka.server.{KafkaConfig, KafkaRaftServer}
-import kafka.server.metadata.BrokerMetadataPublisher.info
 import kafka.utils.threadsafe
 import kafka.utils.{CoreUtils, Logging, Pool}
 import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
@@ -42,7 +41,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsem
 import java.util.{Collections, Optional, OptionalLong, Properties}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{FileLock, Scheduler}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, 
LogConfig, LogDirFailureChannel, LogOffsetsListener, 
ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, 
LogConfig, LogDirFailureChannel, LogManager => JLogManager, LogOffsetsListener, 
ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
 import 
org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, 
OffsetCheckpointFile}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
@@ -80,8 +79,6 @@ class LogManager(logDirs: Seq[File],
                  remoteStorageSystemEnable: Boolean,
                  val initialTaskDelayMs: Long) extends Logging {
 
-  import LogManager._
-
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   private val logCreationOrDeletionLock = new Object
@@ -127,9 +124,9 @@ class LogManager(logDirs: Seq[File],
   def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
 
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
-    (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), 
logDirFailureChannel))).toMap
+    (dir, new OffsetCheckpointFile(new File(dir, 
JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), logDirFailureChannel))).toMap
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
-    (dir, new OffsetCheckpointFile(new File(dir, 
LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
+    (dir, new OffsetCheckpointFile(new File(dir, 
JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE), logDirFailureChannel))).toMap
 
   private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, 
String]()
 
@@ -261,7 +258,7 @@ class LogManager(logDirs: Seq[File],
   private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
     dirs.flatMap { dir =>
       try {
-        val lock = new FileLock(new File(dir, LockFileName))
+        val lock = new FileLock(new File(dir, JLogManager.LOCK_FILE_NAME))
         if (!lock.tryLock())
           throw new KafkaException("Failed to acquire lock on file .lock in " 
+ lock.file.getParent +
             ". A Kafka instance in another process or thread is using this 
directory.")
@@ -680,7 +677,7 @@ class LogManager(logDirs: Seq[File],
 
     try {
       jobs.foreachEntry { (dir, dirJobs) =>
-        if (waitForAllToComplete(dirJobs,
+        if (JLogManager.waitForAllToComplete(dirJobs.toList.asJava,
           e => warn(s"There was an error in one of the threads during 
LogManager shutdown: ${e.getCause}"))) {
           val logs = logsInDir(localLogsByDir, dir)
 
@@ -1520,25 +1517,6 @@ class LogManager(logDirs: Seq[File],
 }
 
 object LogManager {
-  val LockFileName = ".lock"
-
-  /**
-   * Wait all jobs to complete
-   * @param jobs jobs
-   * @param callback this will be called to handle the exception caused by 
each Future#get
-   * @return true if all pass. Otherwise, false
-   */
-  private[log] def waitForAllToComplete(jobs: Seq[Future[_]], callback: 
Throwable => Unit): Boolean = {
-    jobs.count(future => Try(future.get) match {
-      case Success(_) => false
-      case Failure(e) =>
-        callback(e)
-        true
-    }) == 0
-  }
-
-  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
-  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
 
   def apply(config: KafkaConfig,
             initialOfflineDirs: Seq[String],
@@ -1575,45 +1553,4 @@ object LogManager {
       remoteStorageSystemEnable = 
config.remoteLogManagerConfig.isRemoteStorageSystemEnabled,
       initialTaskDelayMs = config.logInitialTaskDelayMs)
   }
-
-  /**
-   * Returns true if the given log should not be on the current broker
-   * according to the metadata image.
-   *
-   * @param brokerId       The ID of the current broker.
-   * @param newTopicsImage The new topics image after broker has been reloaded
-   * @param log            The log object to check
-   * @return true if the log should not exist on the broker, false otherwise.
-   */
-  def isStrayKraftReplica(
-   brokerId: Int,
-   newTopicsImage: TopicsImage,
-   log: UnifiedLog
-  ): Boolean = {
-    if (log.topicId.isEmpty) {
-      // Missing topic ID could result from storage failure or unclean 
shutdown after topic creation but before flushing
-      // data to the `partition.metadata` file. And before appending data to 
the log, the `partition.metadata` is always
-      // flushed to disk. So if the topic ID is missing, it mostly means no 
data was appended, and we can treat this as
-      // a stray log.
-      info(s"The topicId does not exist in $log, treat it as a stray log")
-      return true
-    }
-
-    val topicId = log.topicId.get
-    val partitionId = log.topicPartition.partition()
-    Option(newTopicsImage.getPartition(topicId, partitionId)) match {
-      case Some(partition) =>
-        if (!partition.replicas.contains(brokerId)) {
-          info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +
-            s"does not contain the local brokerId $brokerId.")
-          true
-        } else {
-          false
-        }
-
-      case None =>
-        info(s"Found stray log dir $log: the topicId $topicId does not exist 
in the metadata image")
-        true
-    }
-  }
 }
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 84dfa5ebee0..0727c660fe4 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -24,7 +24,6 @@ import java.util.OptionalInt
 import java.util.concurrent.CompletableFuture
 import java.util.{Map => JMap}
 import java.util.{Collection => JCollection}
-import kafka.log.LogManager
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils
 import kafka.utils.Logging
@@ -48,7 +47,7 @@ import 
org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.util.timer.SystemTimer
-import org.apache.kafka.storage.internals.log.UnifiedLog
+import org.apache.kafka.storage.internals.log.{LogManager, UnifiedLog}
 
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
@@ -62,7 +61,7 @@ object KafkaRaftManager {
   }
 
   private def lockDataDir(dataDir: File): FileLock = {
-    val lock = new FileLock(new File(dataDir, LogManager.LockFileName))
+    val lock = new FileLock(new File(dataDir, LogManager.LOCK_FILE_NAME))
 
     if (!lock.tryLock()) {
       throw new KafkaException(
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index de8f16e1e58..d95be34ff51 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
TopicDelta}
 import org.apache.kafka.metadata.publisher.AclPublisher
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.fault.FaultHandler
+import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
 
 import java.util.concurrent.CompletableFuture
 import scala.collection.mutable
@@ -300,7 +301,7 @@ class BrokerMetadataPublisher(
       // recovery-from-unclean-shutdown if required.
       logManager.startup(
         metadataCache.getAllTopics().asScala,
-        isStray = log => LogManager.isStrayKraftReplica(brokerId, 
newImage.topics(), log)
+        isStray = log => JLogManager.isStrayKraftReplica(brokerId, 
newImage.topics(), log)
       )
 
       // Rename all future replicas which are in the same directory as the
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 5e721596ce0..67880e0ced5 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -23,15 +23,14 @@ import org.apache.directory.api.util.FileUtils
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, 
TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.image.{TopicImage, TopicsImage}
-import org.apache.kafka.metadata.{ConfigRepository, LeaderRecoveryState, 
MockConfigRepository, PartitionRegistration}
+import org.apache.kafka.metadata.{ConfigRepository, MockConfigRepository}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
-import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, 
verify}
 
 import java.io._
@@ -39,12 +38,12 @@ import java.lang.{Long => JLong}
 import java.nio.file.Files
 import java.nio.file.attribute.PosixFilePermission
 import java.util
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future}
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.{Collections, Optional, OptionalLong, Properties}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, 
Scheduler}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, 
LogConfig, LogDirFailureChannel, LogMetricNames, LogOffsetsListener, 
LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, 
UnifiedLog}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, 
LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager, 
LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, 
RemoteIndexCache, UnifiedLog}
 import 
org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, 
OffsetCheckpointFile}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.function.Executable
@@ -56,7 +55,6 @@ import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Try}
 
 class LogManagerTest {
-  import LogManagerTest._
 
   val time = new MockTime()
   val maxRollInterval = 100
@@ -592,7 +590,7 @@ class LogManagerTest {
     }
 
     logManager.checkpointLogRecoveryOffsets()
-    val checkpoints = new OffsetCheckpointFile(new File(logDir, 
LogManager.RecoveryPointCheckpointFile), null).read()
+    val checkpoints = new OffsetCheckpointFile(new File(logDir, 
JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), null).read()
 
     topicPartitions.zip(logs).foreach { case (tp, log) =>
       assertEquals(checkpoints.get(tp), log.recoveryPoint, "Recovery point 
should equal checkpoint")
@@ -672,7 +670,7 @@ class LogManagerTest {
 
     logManager.checkpointRecoveryOffsetsInDir(logDir)
 
-    val checkpoints = new OffsetCheckpointFile(new File(logDir, 
LogManager.RecoveryPointCheckpointFile), null).read()
+    val checkpoints = new OffsetCheckpointFile(new File(logDir, 
JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), null).read()
 
     tps.zip(allLogs).foreach { case (tp, log) =>
       assertEquals(checkpoints.get(tp), log.recoveryPoint,
@@ -1094,36 +1092,6 @@ class LogManagerTest {
     verifyMetrics(1)
   }
 
-  @Test
-  def testWaitForAllToComplete(): Unit = {
-    var invokedCount = 0
-    val success: Future[Boolean] = Mockito.mock(classOf[Future[Boolean]])
-    Mockito.when(success.get()).thenAnswer { _ =>
-      invokedCount += 1
-      true
-    }
-    val failure: Future[Boolean] = Mockito.mock(classOf[Future[Boolean]])
-    Mockito.when(failure.get()).thenAnswer{ _ =>
-      invokedCount += 1
-      throw new RuntimeException
-    }
-
-    var failureCount = 0
-    // all futures should be evaluated
-    assertFalse(LogManager.waitForAllToComplete(Seq(success, failure), _ => 
failureCount += 1))
-    assertEquals(2, invokedCount)
-    assertEquals(1, failureCount)
-    assertFalse(LogManager.waitForAllToComplete(Seq(failure, success), _ => 
failureCount += 1))
-    assertEquals(4, invokedCount)
-    assertEquals(2, failureCount)
-    assertTrue(LogManager.waitForAllToComplete(Seq(success, success), _ => 
failureCount += 1))
-    assertEquals(6, invokedCount)
-    assertEquals(2, failureCount)
-    assertFalse(LogManager.waitForAllToComplete(Seq(failure, failure), _ => 
failureCount += 1))
-    assertEquals(8, invokedCount)
-    assertEquals(4, failureCount)
-  }
-
   @Test
   def testLoadDirectoryIds(): Unit = {
     val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
@@ -1161,7 +1129,7 @@ class LogManagerTest {
       remoteStorageSystemEnable = true
     )
 
-    val checkpointFile = new File(logDir, 
LogManager.LogStartOffsetCheckpointFile)
+    val checkpointFile = new File(logDir, 
JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE)
     val checkpoint = new OffsetCheckpointFile(checkpointFile, null)
     val topicPartition = new TopicPartition("test", 0)
     val log = logManager.getOrCreateLog(topicPartition, topicId = 
Optional.empty)
@@ -1192,7 +1160,7 @@ class LogManagerTest {
 
   @Test
   def testCheckpointLogStartOffsetForNormalTopic(): Unit = {
-    val checkpointFile = new File(logDir, 
LogManager.LogStartOffsetCheckpointFile)
+    val checkpointFile = new File(logDir, 
JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE)
     val checkpoint = new OffsetCheckpointFile(checkpointFile, null)
     val topicPartition = new TopicPartition("test", 0)
     val log = logManager.getOrCreateLog(topicPartition, topicId = 
Optional.empty)
@@ -1233,65 +1201,6 @@ class LogManagerTest {
       new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false)
   }
 
-  val foo0 = new TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), 
new TopicPartition("foo", 0))
-  val foo1 = new TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), 
new TopicPartition("foo", 1))
-  val bar0 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), 
new TopicPartition("bar", 0))
-  val bar1 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), 
new TopicPartition("bar", 1))
-  val baz0 = new TopicIdPartition(Uuid.fromString("2Ik9_5-oRDOKpSXd2SuG5w"), 
new TopicPartition("baz", 0))
-  val baz1 = new TopicIdPartition(Uuid.fromString("2Ik9_5-oRDOKpSXd2SuG5w"), 
new TopicPartition("baz", 1))
-  val baz2 = new TopicIdPartition(Uuid.fromString("2Ik9_5-oRDOKpSXd2SuG5w"), 
new TopicPartition("baz", 2))
-  val quux0 = new TopicIdPartition(Uuid.fromString("YS9owjv5TG2OlsvBM0Qw6g"), 
new TopicPartition("quux", 0))
-  val recreatedFoo0 = new 
TopicIdPartition(Uuid.fromString("_dOOzPe3TfiWV21Lh7Vmqg"), new 
TopicPartition("foo", 0))
-  val recreatedFoo1 = new 
TopicIdPartition(Uuid.fromString("_dOOzPe3TfiWV21Lh7Vmqg"), new 
TopicPartition("foo", 1))
-
-  @Test
-  def testIsStrayKraftReplicaWithEmptyImage(): Unit = {
-    val image: TopicsImage = topicsImage(Seq())
-    val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog)
-    assertTrue(onDisk.forall(log => LogManager.isStrayKraftReplica(0, image, 
log)))
-  }
-
-  @Test
-  def testIsStrayKraftReplicaInImage(): Unit = {
-    val image: TopicsImage = topicsImage(Seq(
-      topicImage(Map(
-        foo0 -> Seq(0, 1, 2),
-      )),
-      topicImage(Map(
-        bar0 -> Seq(0, 1, 2),
-        bar1 -> Seq(0, 1, 2),
-      ))
-    ))
-    val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog)
-    val expectedStrays = Set(foo1, quux0).map(_.topicPartition())
-
-    onDisk.foreach(log => 
assertEquals(expectedStrays.contains(log.topicPartition), 
LogManager.isStrayKraftReplica(0, image, log)))
-  }
-
-  @Test
-  def testIsStrayKraftReplicaInImageWithRemoteReplicas(): Unit = {
-    val image: TopicsImage = topicsImage(Seq(
-      topicImage(Map(
-        foo0 -> Seq(0, 1, 2),
-      )),
-      topicImage(Map(
-        bar0 -> Seq(1, 2, 3),
-        bar1 -> Seq(2, 3, 0),
-      ))
-    ))
-    val onDisk = Seq(foo0, bar0, bar1).map(mockLog)
-    val expectedStrays = Set(bar0).map(_.topicPartition)
-
-    onDisk.foreach(log => 
assertEquals(expectedStrays.contains(log.topicPartition), 
LogManager.isStrayKraftReplica(0, image, log)))
-  }
-
-  @Test
-  def testIsStrayKraftMissingTopicId(): Unit = {
-    val log = Mockito.mock(classOf[UnifiedLog])
-    Mockito.when(log.topicId).thenReturn(Optional.empty)
-    assertTrue(LogManager.isStrayKraftReplica(0, topicsImage(Seq()), log))
-  }
-
   /**
    * Test LogManager takes file lock by default and the lock is released after 
shutdown.
    */
@@ -1302,12 +1211,12 @@ class LogManagerTest {
 
     try {
       // ${tmpLogDir}.lock is acquired by tmpLogManager
-      val fileLock = new FileLock(new File(tmpLogDir, LogManager.LockFileName))
+      val fileLock = new FileLock(new File(tmpLogDir, 
JLogManager.LOCK_FILE_NAME))
       assertFalse(fileLock.tryLock())
     } finally {
       // ${tmpLogDir}.lock is removed after shutdown
       tmpLogManager.shutdown()
-      val f = new File(tmpLogDir, LogManager.LockFileName)
+      val f = new File(tmpLogDir, JLogManager.LOCK_FILE_NAME)
       assertFalse(f.exists())
     }
   }
@@ -1376,56 +1285,3 @@ class LogManagerTest {
     }
   }
 }
-
-object LogManagerTest {
-  def mockLog(
-    topicIdPartition: TopicIdPartition
-  ): UnifiedLog = {
-    val log = Mockito.mock(classOf[UnifiedLog])
-    
Mockito.when(log.topicId).thenReturn(Optional.of(topicIdPartition.topicId()))
-    
Mockito.when(log.topicPartition).thenReturn(topicIdPartition.topicPartition())
-    log
-  }
-
-  def topicImage(
-    partitions: Map[TopicIdPartition, Seq[Int]]
-  ): TopicImage = {
-    var topicName: String = null
-    var topicId: Uuid = null
-    partitions.keySet.foreach {
-      partition => if (topicId == null) {
-        topicId = partition.topicId()
-      } else if (!topicId.equals(partition.topicId())) {
-        throw new IllegalArgumentException("partition topic IDs did not match")
-      }
-        if (topicName == null) {
-          topicName = partition.topic()
-        } else if (!topicName.equals(partition.topic())) {
-          throw new IllegalArgumentException("partition topic names did not 
match")
-        }
-    }
-    if (topicId == null) {
-      throw new IllegalArgumentException("Invalid empty partitions map.")
-    }
-    val partitionRegistrations = partitions.map { case (partition, replicas) =>
-      Int.box(partition.partition()) -> new PartitionRegistration.Builder().
-        setReplicas(replicas.toArray).
-        setDirectories(DirectoryId.unassignedArray(replicas.size)).
-        setIsr(replicas.toArray).
-        setLeader(replicas.head).
-        setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
-        setLeaderEpoch(0).
-        setPartitionEpoch(0).
-        build()
-    }
-    new TopicImage(topicName, topicId, partitionRegistrations.asJava)
-  }
-
-  def topicsImage(
-    topics: Seq[TopicImage]
-  ): TopicsImage = {
-    var retval = TopicsImage.EMPTY
-    topics.foreach { t => retval = retval.including(t) }
-    retval
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala 
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 4255648347c..3c816f635db 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -22,7 +22,6 @@ import java.nio.channels.OverlappingFileLockException
 import java.nio.file.{Files, Path, StandardOpenOption}
 import java.util.Properties
 import java.util.concurrent.CompletableFuture
-import kafka.log.LogManager
 import kafka.server.KafkaConfig
 import kafka.tools.TestRaftServer.ByteArraySerde
 import kafka.utils.TestUtils
@@ -35,6 +34,7 @@ import org.apache.kafka.raft.{Endpoints, MetadataLogConfig, 
QuorumConfig}
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.fault.FaultHandler
+import org.apache.kafka.storage.internals.log.LogManager
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
@@ -164,7 +164,7 @@ class RaftManagerTest {
       )
     )
 
-    val lockPath = 
metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName)
+    val lockPath = 
metadataDir.getOrElse(logDir.head).resolve(LogManager.LOCK_FILE_NAME)
     assertTrue(fileLocked(lockPath))
 
     raftManager.shutdown()
@@ -188,7 +188,7 @@ class RaftManagerTest {
       )
     )
 
-    val lockPath = 
metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName)
+    val lockPath = 
metadataDir.getOrElse(logDir.head).resolve(LogManager.LOCK_FILE_NAME)
     assertTrue(fileLocked(lockPath))
 
     raftManager.shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index a6b71d912b3..39327bbeaf8 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -21,7 +21,6 @@ import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
 import java.io.File
 import java.util.concurrent.CancellationException
 import kafka.integration.KafkaServerTestHarness
-import kafka.log.LogManager
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -29,6 +28,7 @@ import 
org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerial
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
+import org.apache.kafka.storage.internals.log.LogManager
 import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
@@ -105,7 +105,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
     // do a clean shutdown and check that offset checkpoint file exists
     shutdownBroker()
     for (logDir <- config.logDirs) {
-      val OffsetCheckpointFile = new File(logDir, 
LogManager.RecoveryPointCheckpointFile)
+      val OffsetCheckpointFile = new File(logDir, 
LogManager.RECOVERY_POINT_CHECKPOINT_FILE)
       assertTrue(OffsetCheckpointFile.exists)
       assertTrue(OffsetCheckpointFile.length() > 0)
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
new file mode 100644
index 00000000000..cc46e4fc984
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class LogManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogManager.class);
+
+    public static final String LOCK_FILE_NAME = ".lock";
+    public static final String RECOVERY_POINT_CHECKPOINT_FILE = 
"recovery-point-offset-checkpoint";
+    public static final String LOG_START_OFFSET_CHECKPOINT_FILE = 
"log-start-offset-checkpoint";
+
+    /**
+     * Wait for all jobs to complete
+     * @param jobs The jobs
+     * @param callback This will be called to handle the exception caused by 
each Future#get
+     * @return true if all pass. Otherwise, false
+     */
+    public static boolean waitForAllToComplete(List<Future<?>> jobs, 
Consumer<Throwable> callback) {
+        List<Future<?>> failed = new ArrayList<>();
+        for (Future<?> job : jobs) {
+            try {
+                job.get();
+            } catch (Exception e) {
+                callback.accept(e);
+                failed.add(job);
+            }
+        }
+        return failed.isEmpty();
+    }
+
+    /**
+     * Returns true if the given log should not be on the current broker
+     * according to the metadata image.
+     *
+     * @param brokerId       The ID of the current broker.
+     * @param newTopicsImage The new topics image after broker has been 
reloaded
+     * @param log            The log object to check
+     * @return true if the log should not exist on the broker, false otherwise.
+     */
+    public static boolean isStrayKraftReplica(int brokerId, TopicsImage 
newTopicsImage, UnifiedLog log) {
+        if (log.topicId().isEmpty()) {
+            // Missing topic ID could result from storage failure or unclean 
shutdown after topic creation but before flushing
+            // data to the `partition.metadata` file. And before appending 
data to the log, the `partition.metadata` is always
+            // flushed to disk. So if the topic ID is missing, it mostly means 
no data was appended, and we can treat this as
+            // a stray log.
+            LOG.info("The topicId does not exist in {}, treat it as a stray 
log.", log);
+            return true;
+        }
+
+        Uuid topicId = log.topicId().get();
+        int partitionId = log.topicPartition().partition();
+        PartitionRegistration partition = newTopicsImage.getPartition(topicId, 
partitionId);
+        if (partition == null) {
+            LOG.info("Found stray log dir {}: the topicId {} does not exist in 
the metadata image.", log, topicId);
+            return true;
+        } else {
+            List<Integer> replicas = 
Arrays.stream(partition.replicas).boxed().toList();
+            if (!replicas.contains(brokerId)) {
+                LOG.info("Found stray log dir {}: the current replica 
assignment {} does not contain the local brokerId {}.",
+                        log, 
replicas.stream().map(String::valueOf).collect(Collectors.joining(", ", "[", 
"]")), brokerId);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java
new file mode 100644
index 00000000000..a86ec3691cf
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LogManagerTest {
+
+    private static final TopicIdPartition FOO_0 = new 
TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), new 
TopicPartition("foo", 0));
+    private static final TopicIdPartition FOO_1 = new 
TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), new 
TopicPartition("foo", 1));
+    private static final TopicIdPartition BAR_0 = new 
TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new 
TopicPartition("bar", 0));
+    private static final TopicIdPartition BAR_1 = new 
TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new 
TopicPartition("bar", 1));
+    private static final TopicIdPartition QUUX_0 = new 
TopicIdPartition(Uuid.fromString("YS9owjv5TG2OlsvBM0Qw6g"), new 
TopicPartition("quux", 0));
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testWaitForAllToComplete() throws ExecutionException, 
InterruptedException {
+        AtomicInteger invokedCount = new AtomicInteger(0);
+        Future<Boolean> success = mock(Future.class);
+        when(success.get()).thenAnswer(a -> {
+            invokedCount.incrementAndGet();
+            return true;
+        });
+        Future<Boolean> failure = mock(Future.class);
+        when(failure.get()).thenAnswer(a -> {
+            invokedCount.incrementAndGet();
+            throw new RuntimeException();
+        });
+
+        AtomicInteger failureCount = new AtomicInteger(0);
+        // all futures should be evaluated
+        assertFalse(LogManager.waitForAllToComplete(List.of(success, failure), 
t -> failureCount.incrementAndGet()));
+        assertEquals(2, invokedCount.get());
+        assertEquals(1, failureCount.get());
+        assertFalse(LogManager.waitForAllToComplete(List.of(failure, success), 
t -> failureCount.incrementAndGet()));
+        assertEquals(4, invokedCount.get());
+        assertEquals(2, failureCount.get());
+        assertTrue(LogManager.waitForAllToComplete(List.of(success, success), 
t -> failureCount.incrementAndGet()));
+        assertEquals(6, invokedCount.get());
+        assertEquals(2, failureCount.get());
+        assertFalse(LogManager.waitForAllToComplete(List.of(failure, failure), 
t -> failureCount.incrementAndGet()));
+        assertEquals(8, invokedCount.get());
+        assertEquals(4, failureCount.get());
+    }
+
+    @Test
+    public void testIsStrayKraftReplicaWithEmptyImage() {
+        TopicsImage image = topicsImage(List.of());
+        List<UnifiedLog> onDisk = Stream.of(FOO_0, FOO_1, BAR_0, BAR_1, 
QUUX_0).map(this::mockLog).toList();
+        assertTrue(onDisk.stream().allMatch(log -> 
LogManager.isStrayKraftReplica(0, image, log)));
+    }
+
+    @Test
+    public void testIsStrayKraftReplicaInImage() {
+        TopicsImage image = topicsImage(List.of(
+            topicImage(Map.of(
+                    FOO_0, List.of(0, 1, 2))),
+            topicImage(Map.of(
+                    BAR_0, List.of(0, 1, 2),
+                    BAR_1, List.of(0, 1, 2)))
+        ));
+        List<UnifiedLog> onDisk = Stream.of(FOO_0, FOO_1, BAR_0, BAR_1, 
QUUX_0).map(this::mockLog).toList();
+        Set<TopicPartition> expectedStrays = Stream.of(FOO_1, 
QUUX_0).map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
+
+        onDisk.forEach(log -> 
assertEquals(expectedStrays.contains(log.topicPartition()), 
LogManager.isStrayKraftReplica(0, image, log)));
+    }
+
+    @Test
+    public void testIsStrayKraftReplicaInImageWithRemoteReplicas() {
+        TopicsImage image = topicsImage(List.of(
+            topicImage(Map.of(
+                    FOO_0, List.of(0, 1, 2))),
+            topicImage(Map.of(
+                    BAR_0, List.of(1, 2, 3),
+                    BAR_1, List.of(2, 3, 0)))
+        ));
+        List<UnifiedLog> onDisk = Stream.of(FOO_0, BAR_0, 
BAR_1).map(this::mockLog).toList();
+        Set<TopicPartition> expectedStrays = 
Stream.of(BAR_0).map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
+        onDisk.forEach(log -> 
assertEquals(expectedStrays.contains(log.topicPartition()), 
LogManager.isStrayKraftReplica(0, image, log)));
+    }
+
+    @Test
+    public void testIsStrayKraftMissingTopicId() {
+        UnifiedLog log = mock(UnifiedLog.class);
+        when(log.topicId()).thenReturn(Optional.empty());
+        assertTrue(LogManager.isStrayKraftReplica(0, topicsImage(List.of()), 
log));
+    }
+
+    private TopicsImage topicsImage(List<TopicImage> topics) {
+        TopicsImage retval = TopicsImage.EMPTY;
+        for (TopicImage topic : topics) {
+            retval = retval.including(topic);
+        }
+        return retval;
+    }
+
+    private TopicImage topicImage(Map<TopicIdPartition, List<Integer>> 
partitions) {
+        String topicName = null;
+        Uuid topicId = null;
+        for (TopicIdPartition partition : partitions.keySet()) {
+            if (topicId == null) {
+                topicId = partition.topicId();
+            } else if (!topicId.equals(partition.topicId())) {
+                throw new IllegalArgumentException("partition topic IDs did 
not match");
+            }
+            if (topicName == null) {
+                topicName = partition.topic();
+            } else if (!topicName.equals(partition.topic())) {
+                throw new IllegalArgumentException("partition topic names did 
not match");
+            }
+        }
+        if (topicId == null) {
+            throw new IllegalArgumentException("Invalid empty partitions 
map.");
+        }
+        Map<Integer, PartitionRegistration> partitionRegistrations = 
partitions.entrySet().stream().collect(
+                Collectors.toMap(
+                        entry -> entry.getKey().partition(),
+                        entry -> new PartitionRegistration.Builder()
+                                
.setReplicas(entry.getValue().stream().mapToInt(Integer::intValue).toArray())
+                                
.setDirectories(DirectoryId.unassignedArray(entry.getValue().size()))
+                                
.setIsr(entry.getValue().stream().mapToInt(Integer::intValue).toArray())
+                                .setLeader(entry.getValue().get(0))
+                                
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+                                .setLeaderEpoch(0)
+                                .setPartitionEpoch(0)
+                                .build()));
+        return new TopicImage(topicName, topicId, partitionRegistrations);
+    }
+
+    private UnifiedLog mockLog(TopicIdPartition topicIdPartition) {
+        UnifiedLog log = mock(UnifiedLog.class);
+        
when(log.topicId()).thenReturn(Optional.of(topicIdPartition.topicId()));
+        
when(log.topicPartition()).thenReturn(topicIdPartition.topicPartition());
+        return log;
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
index 73699bffac6..e82aeff7f8f 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.tiered.storage.integration;
 
-import kafka.log.LogManager;
 import kafka.server.ReplicaManager;
 
 import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+import org.apache.kafka.storage.internals.log.LogManager;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
 import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
@@ -50,7 +50,7 @@ public class FetchFromLeaderWithCorruptedCheckpointTest 
extends TieredStorageTes
         final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, 
List.of(broker0, broker1)));
         final List<String> checkpointFiles = List.of(
                 ReplicaManager.HighWatermarkFilename(),
-                LogManager.RecoveryPointCheckpointFile(),
+                LogManager.RECOVERY_POINT_CHECKPOINT_FILE,
                 CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME);
 
         builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,


Reply via email to