This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7710d1c9511 KAFKA-14487: Move LogManager static methods/fields to storage module (#19302) 7710d1c9511 is described below commit 7710d1c9511cc3e02ba2ecc8f48674f01ed05ae2 Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Mon Apr 21 12:03:30 2025 +0200 KAFKA-14487: Move LogManager static methods/fields to storage module (#19302) Move the static fields/methods Reviewers: Luke Chen <show...@gmail.com> --- build.gradle | 1 + checkstyle/import-control-storage.xml | 2 + core/src/main/scala/kafka/log/LogManager.scala | 73 +-------- core/src/main/scala/kafka/raft/RaftManager.scala | 5 +- .../server/metadata/BrokerMetadataPublisher.scala | 3 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 166 ++----------------- .../scala/unit/kafka/raft/RaftManagerTest.scala | 6 +- .../unit/kafka/server/ServerShutdownTest.scala | 4 +- .../kafka/storage/internals/log/LogManager.java | 96 +++++++++++ .../storage/internals/log/LogManagerTest.java | 175 +++++++++++++++++++++ ...FetchFromLeaderWithCorruptedCheckpointTest.java | 4 +- 11 files changed, 301 insertions(+), 234 deletions(-) diff --git a/build.gradle b/build.gradle index 2e35057165c..50eec676569 100644 --- a/build.gradle +++ b/build.gradle @@ -2230,6 +2230,7 @@ project(':storage') { } dependencies { + implementation project(':metadata') implementation project(':storage:storage-api') implementation project(':server-common') implementation project(':clients') diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index 639cb6dc1d0..6628de8d453 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -94,6 +94,8 @@ <allow pkg="com.yammer.metrics.core" /> <allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.config" /> + <allow pkg="org.apache.kafka.image" /> + <allow pkg="org.apache.kafka.metadata" /> <allow pkg="org.apache.kafka.server"/> <allow pkg="org.apache.kafka.storage.internals"/> <allow pkg="org.apache.kafka.storage.log.metrics"/> diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 506e46e6ce5..e555f1b0b46 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,7 +23,6 @@ import java.nio.file.{Files, NoSuchFileException} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import kafka.server.{KafkaConfig, KafkaRaftServer} -import kafka.server.metadata.BrokerMetadataPublisher.info import kafka.utils.threadsafe import kafka.utils.{CoreUtils, Logging, Pool} import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid} @@ -42,7 +41,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem import java.util.{Collections, Optional, OptionalLong, Properties} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{FileLock, Scheduler} -import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} +import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, LogManager => JLogManager, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile} import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -80,8 +79,6 @@ class LogManager(logDirs: Seq[File], remoteStorageSystemEnable: Boolean, val initialTaskDelayMs: Long) extends Logging { - import LogManager._ - private val metricsGroup = new KafkaMetricsGroup(this.getClass) private val logCreationOrDeletionLock = new Object @@ -127,9 +124,9 @@ class LogManager(logDirs: Seq[File], def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir => - (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap + (dir, new OffsetCheckpointFile(new File(dir, JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), logDirFailureChannel))).toMap @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir => - (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap + (dir, new OffsetCheckpointFile(new File(dir, JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE), logDirFailureChannel))).toMap private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]() @@ -261,7 +258,7 @@ class LogManager(logDirs: Seq[File], private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = { dirs.flatMap { dir => try { - val lock = new FileLock(new File(dir, LockFileName)) + val lock = new FileLock(new File(dir, JLogManager.LOCK_FILE_NAME)) if (!lock.tryLock()) throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent + ". A Kafka instance in another process or thread is using this directory.") @@ -680,7 +677,7 @@ class LogManager(logDirs: Seq[File], try { jobs.foreachEntry { (dir, dirJobs) => - if (waitForAllToComplete(dirJobs, + if (JLogManager.waitForAllToComplete(dirJobs.toList.asJava, e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) { val logs = logsInDir(localLogsByDir, dir) @@ -1520,25 +1517,6 @@ class LogManager(logDirs: Seq[File], } object LogManager { - val LockFileName = ".lock" - - /** - * Wait all jobs to complete - * @param jobs jobs - * @param callback this will be called to handle the exception caused by each Future#get - * @return true if all pass. Otherwise, false - */ - private[log] def waitForAllToComplete(jobs: Seq[Future[_]], callback: Throwable => Unit): Boolean = { - jobs.count(future => Try(future.get) match { - case Success(_) => false - case Failure(e) => - callback(e) - true - }) == 0 - } - - val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" - val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint" def apply(config: KafkaConfig, initialOfflineDirs: Seq[String], @@ -1575,45 +1553,4 @@ object LogManager { remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled, initialTaskDelayMs = config.logInitialTaskDelayMs) } - - /** - * Returns true if the given log should not be on the current broker - * according to the metadata image. - * - * @param brokerId The ID of the current broker. - * @param newTopicsImage The new topics image after broker has been reloaded - * @param log The log object to check - * @return true if the log should not exist on the broker, false otherwise. - */ - def isStrayKraftReplica( - brokerId: Int, - newTopicsImage: TopicsImage, - log: UnifiedLog - ): Boolean = { - if (log.topicId.isEmpty) { - // Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing - // data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always - // flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as - // a stray log. - info(s"The topicId does not exist in $log, treat it as a stray log") - return true - } - - val topicId = log.topicId.get - val partitionId = log.topicPartition.partition() - Option(newTopicsImage.getPartition(topicId, partitionId)) match { - case Some(partition) => - if (!partition.replicas.contains(brokerId)) { - info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + - s"does not contain the local brokerId $brokerId.") - true - } else { - false - } - - case None => - info(s"Found stray log dir $log: the topicId $topicId does not exist in the metadata image") - true - } - } } diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 84dfa5ebee0..0727c660fe4 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -24,7 +24,6 @@ import java.util.OptionalInt import java.util.concurrent.CompletableFuture import java.util.{Map => JMap} import java.util.{Collection => JCollection} -import kafka.log.LogManager import kafka.server.KafkaConfig import kafka.utils.CoreUtils import kafka.utils.Logging @@ -48,7 +47,7 @@ import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.util.{FileLock, KafkaScheduler} import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.server.util.timer.SystemTimer -import org.apache.kafka.storage.internals.log.UnifiedLog +import org.apache.kafka.storage.internals.log.{LogManager, UnifiedLog} import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ @@ -62,7 +61,7 @@ object KafkaRaftManager { } private def lockDataDir(dataDir: File): FileLock = { - val lock = new FileLock(new File(dataDir, LogManager.LockFileName)) + val lock = new FileLock(new File(dataDir, LogManager.LOCK_FILE_NAME)) if (!lock.tryLock()) { throw new KafkaException( diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index de8f16e1e58..d95be34ff51 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -34,6 +34,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} import org.apache.kafka.metadata.publisher.AclPublisher import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.fault.FaultHandler +import org.apache.kafka.storage.internals.log.{LogManager => JLogManager} import java.util.concurrent.CompletableFuture import scala.collection.mutable @@ -300,7 +301,7 @@ class BrokerMetadataPublisher( // recovery-from-unclean-shutdown if required. logManager.startup( metadataCache.getAllTopics().asScala, - isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) + isStray = log => JLogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) ) // Rename all future replicas which are in the same directory as the diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 5e721596ce0..67880e0ced5 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -23,15 +23,14 @@ import org.apache.directory.api.util.FileUtils import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid} import org.apache.kafka.coordinator.transaction.TransactionLogConfig -import org.apache.kafka.image.{TopicImage, TopicsImage} -import org.apache.kafka.metadata.{ConfigRepository, LeaderRecoveryState, MockConfigRepository, PartitionRegistration} +import org.apache.kafka.metadata.{ConfigRepository, MockConfigRepository} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any -import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} +import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify} import java.io._ @@ -39,12 +38,12 @@ import java.lang.{Long => JLong} import java.nio.file.Files import java.nio.file.attribute.PosixFilePermission import java.util -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.{Collections, Optional, OptionalLong, Properties} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler} -import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} +import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.function.Executable @@ -56,7 +55,6 @@ import scala.jdk.CollectionConverters._ import scala.util.{Failure, Try} class LogManagerTest { - import LogManagerTest._ val time = new MockTime() val maxRollInterval = 100 @@ -592,7 +590,7 @@ class LogManagerTest { } logManager.checkpointLogRecoveryOffsets() - val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile), null).read() + val checkpoints = new OffsetCheckpointFile(new File(logDir, JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), null).read() topicPartitions.zip(logs).foreach { case (tp, log) => assertEquals(checkpoints.get(tp), log.recoveryPoint, "Recovery point should equal checkpoint") @@ -672,7 +670,7 @@ class LogManagerTest { logManager.checkpointRecoveryOffsetsInDir(logDir) - val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile), null).read() + val checkpoints = new OffsetCheckpointFile(new File(logDir, JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), null).read() tps.zip(allLogs).foreach { case (tp, log) => assertEquals(checkpoints.get(tp), log.recoveryPoint, @@ -1094,36 +1092,6 @@ class LogManagerTest { verifyMetrics(1) } - @Test - def testWaitForAllToComplete(): Unit = { - var invokedCount = 0 - val success: Future[Boolean] = Mockito.mock(classOf[Future[Boolean]]) - Mockito.when(success.get()).thenAnswer { _ => - invokedCount += 1 - true - } - val failure: Future[Boolean] = Mockito.mock(classOf[Future[Boolean]]) - Mockito.when(failure.get()).thenAnswer{ _ => - invokedCount += 1 - throw new RuntimeException - } - - var failureCount = 0 - // all futures should be evaluated - assertFalse(LogManager.waitForAllToComplete(Seq(success, failure), _ => failureCount += 1)) - assertEquals(2, invokedCount) - assertEquals(1, failureCount) - assertFalse(LogManager.waitForAllToComplete(Seq(failure, success), _ => failureCount += 1)) - assertEquals(4, invokedCount) - assertEquals(2, failureCount) - assertTrue(LogManager.waitForAllToComplete(Seq(success, success), _ => failureCount += 1)) - assertEquals(6, invokedCount) - assertEquals(2, failureCount) - assertFalse(LogManager.waitForAllToComplete(Seq(failure, failure), _ => failureCount += 1)) - assertEquals(8, invokedCount) - assertEquals(4, failureCount) - } - @Test def testLoadDirectoryIds(): Unit = { val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir()) @@ -1161,7 +1129,7 @@ class LogManagerTest { remoteStorageSystemEnable = true ) - val checkpointFile = new File(logDir, LogManager.LogStartOffsetCheckpointFile) + val checkpointFile = new File(logDir, JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE) val checkpoint = new OffsetCheckpointFile(checkpointFile, null) val topicPartition = new TopicPartition("test", 0) val log = logManager.getOrCreateLog(topicPartition, topicId = Optional.empty) @@ -1192,7 +1160,7 @@ class LogManagerTest { @Test def testCheckpointLogStartOffsetForNormalTopic(): Unit = { - val checkpointFile = new File(logDir, LogManager.LogStartOffsetCheckpointFile) + val checkpointFile = new File(logDir, JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE) val checkpoint = new OffsetCheckpointFile(checkpointFile, null) val topicPartition = new TopicPartition("test", 0) val log = logManager.getOrCreateLog(topicPartition, topicId = Optional.empty) @@ -1233,65 +1201,6 @@ class LogManagerTest { new File(dir, MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false) } - val foo0 = new TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), new TopicPartition("foo", 0)) - val foo1 = new TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), new TopicPartition("foo", 1)) - val bar0 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new TopicPartition("bar", 0)) - val bar1 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new TopicPartition("bar", 1)) - val baz0 = new TopicIdPartition(Uuid.fromString("2Ik9_5-oRDOKpSXd2SuG5w"), new TopicPartition("baz", 0)) - val baz1 = new TopicIdPartition(Uuid.fromString("2Ik9_5-oRDOKpSXd2SuG5w"), new TopicPartition("baz", 1)) - val baz2 = new TopicIdPartition(Uuid.fromString("2Ik9_5-oRDOKpSXd2SuG5w"), new TopicPartition("baz", 2)) - val quux0 = new TopicIdPartition(Uuid.fromString("YS9owjv5TG2OlsvBM0Qw6g"), new TopicPartition("quux", 0)) - val recreatedFoo0 = new TopicIdPartition(Uuid.fromString("_dOOzPe3TfiWV21Lh7Vmqg"), new TopicPartition("foo", 0)) - val recreatedFoo1 = new TopicIdPartition(Uuid.fromString("_dOOzPe3TfiWV21Lh7Vmqg"), new TopicPartition("foo", 1)) - - @Test - def testIsStrayKraftReplicaWithEmptyImage(): Unit = { - val image: TopicsImage = topicsImage(Seq()) - val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog) - assertTrue(onDisk.forall(log => LogManager.isStrayKraftReplica(0, image, log))) - } - - @Test - def testIsStrayKraftReplicaInImage(): Unit = { - val image: TopicsImage = topicsImage(Seq( - topicImage(Map( - foo0 -> Seq(0, 1, 2), - )), - topicImage(Map( - bar0 -> Seq(0, 1, 2), - bar1 -> Seq(0, 1, 2), - )) - )) - val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog) - val expectedStrays = Set(foo1, quux0).map(_.topicPartition()) - - onDisk.foreach(log => assertEquals(expectedStrays.contains(log.topicPartition), LogManager.isStrayKraftReplica(0, image, log))) - } - - @Test - def testIsStrayKraftReplicaInImageWithRemoteReplicas(): Unit = { - val image: TopicsImage = topicsImage(Seq( - topicImage(Map( - foo0 -> Seq(0, 1, 2), - )), - topicImage(Map( - bar0 -> Seq(1, 2, 3), - bar1 -> Seq(2, 3, 0), - )) - )) - val onDisk = Seq(foo0, bar0, bar1).map(mockLog) - val expectedStrays = Set(bar0).map(_.topicPartition) - - onDisk.foreach(log => assertEquals(expectedStrays.contains(log.topicPartition), LogManager.isStrayKraftReplica(0, image, log))) - } - - @Test - def testIsStrayKraftMissingTopicId(): Unit = { - val log = Mockito.mock(classOf[UnifiedLog]) - Mockito.when(log.topicId).thenReturn(Optional.empty) - assertTrue(LogManager.isStrayKraftReplica(0, topicsImage(Seq()), log)) - } - /** * Test LogManager takes file lock by default and the lock is released after shutdown. */ @@ -1302,12 +1211,12 @@ class LogManagerTest { try { // ${tmpLogDir}.lock is acquired by tmpLogManager - val fileLock = new FileLock(new File(tmpLogDir, LogManager.LockFileName)) + val fileLock = new FileLock(new File(tmpLogDir, JLogManager.LOCK_FILE_NAME)) assertFalse(fileLock.tryLock()) } finally { // ${tmpLogDir}.lock is removed after shutdown tmpLogManager.shutdown() - val f = new File(tmpLogDir, LogManager.LockFileName) + val f = new File(tmpLogDir, JLogManager.LOCK_FILE_NAME) assertFalse(f.exists()) } } @@ -1376,56 +1285,3 @@ class LogManagerTest { } } } - -object LogManagerTest { - def mockLog( - topicIdPartition: TopicIdPartition - ): UnifiedLog = { - val log = Mockito.mock(classOf[UnifiedLog]) - Mockito.when(log.topicId).thenReturn(Optional.of(topicIdPartition.topicId())) - Mockito.when(log.topicPartition).thenReturn(topicIdPartition.topicPartition()) - log - } - - def topicImage( - partitions: Map[TopicIdPartition, Seq[Int]] - ): TopicImage = { - var topicName: String = null - var topicId: Uuid = null - partitions.keySet.foreach { - partition => if (topicId == null) { - topicId = partition.topicId() - } else if (!topicId.equals(partition.topicId())) { - throw new IllegalArgumentException("partition topic IDs did not match") - } - if (topicName == null) { - topicName = partition.topic() - } else if (!topicName.equals(partition.topic())) { - throw new IllegalArgumentException("partition topic names did not match") - } - } - if (topicId == null) { - throw new IllegalArgumentException("Invalid empty partitions map.") - } - val partitionRegistrations = partitions.map { case (partition, replicas) => - Int.box(partition.partition()) -> new PartitionRegistration.Builder(). - setReplicas(replicas.toArray). - setDirectories(DirectoryId.unassignedArray(replicas.size)). - setIsr(replicas.toArray). - setLeader(replicas.head). - setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). - setLeaderEpoch(0). - setPartitionEpoch(0). - build() - } - new TopicImage(topicName, topicId, partitionRegistrations.asJava) - } - - def topicsImage( - topics: Seq[TopicImage] - ): TopicsImage = { - var retval = TopicsImage.EMPTY - topics.foreach { t => retval = retval.including(t) } - retval - } -} diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index 4255648347c..3c816f635db 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -22,7 +22,6 @@ import java.nio.channels.OverlappingFileLockException import java.nio.file.{Files, Path, StandardOpenOption} import java.util.Properties import java.util.concurrent.CompletableFuture -import kafka.log.LogManager import kafka.server.KafkaConfig import kafka.tools.TestRaftServer.ByteArraySerde import kafka.utils.TestUtils @@ -35,6 +34,7 @@ import org.apache.kafka.raft.{Endpoints, MetadataLogConfig, QuorumConfig} import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} import org.apache.kafka.server.fault.FaultHandler +import org.apache.kafka.storage.internals.log.LogManager import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -164,7 +164,7 @@ class RaftManagerTest { ) ) - val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName) + val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LOCK_FILE_NAME) assertTrue(fileLocked(lockPath)) raftManager.shutdown() @@ -188,7 +188,7 @@ class RaftManagerTest { ) ) - val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName) + val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LOCK_FILE_NAME) assertTrue(fileLocked(lockPath)) raftManager.shutdown() diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index a6b71d912b3..39327bbeaf8 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -21,7 +21,6 @@ import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils} import java.io.File import java.util.concurrent.CancellationException import kafka.integration.KafkaServerTestHarness -import kafka.log.LogManager import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.security.auth.SecurityProtocol @@ -29,6 +28,7 @@ import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerial import org.apache.kafka.common.utils.Exit import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs} +import org.apache.kafka.storage.internals.log.LogManager import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.function.Executable @@ -105,7 +105,7 @@ class ServerShutdownTest extends KafkaServerTestHarness { // do a clean shutdown and check that offset checkpoint file exists shutdownBroker() for (logDir <- config.logDirs) { - val OffsetCheckpointFile = new File(logDir, LogManager.RecoveryPointCheckpointFile) + val OffsetCheckpointFile = new File(logDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE) assertTrue(OffsetCheckpointFile.exists) assertTrue(OffsetCheckpointFile.length() > 0) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java new file mode 100644 index 00000000000..cc46e4fc984 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.metadata.PartitionRegistration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +public class LogManager { + + private static final Logger LOG = LoggerFactory.getLogger(LogManager.class); + + public static final String LOCK_FILE_NAME = ".lock"; + public static final String RECOVERY_POINT_CHECKPOINT_FILE = "recovery-point-offset-checkpoint"; + public static final String LOG_START_OFFSET_CHECKPOINT_FILE = "log-start-offset-checkpoint"; + + /** + * Wait for all jobs to complete + * @param jobs The jobs + * @param callback This will be called to handle the exception caused by each Future#get + * @return true if all pass. Otherwise, false + */ + public static boolean waitForAllToComplete(List<Future<?>> jobs, Consumer<Throwable> callback) { + List<Future<?>> failed = new ArrayList<>(); + for (Future<?> job : jobs) { + try { + job.get(); + } catch (Exception e) { + callback.accept(e); + failed.add(job); + } + } + return failed.isEmpty(); + } + + /** + * Returns true if the given log should not be on the current broker + * according to the metadata image. + * + * @param brokerId The ID of the current broker. + * @param newTopicsImage The new topics image after broker has been reloaded + * @param log The log object to check + * @return true if the log should not exist on the broker, false otherwise. + */ + public static boolean isStrayKraftReplica(int brokerId, TopicsImage newTopicsImage, UnifiedLog log) { + if (log.topicId().isEmpty()) { + // Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing + // data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always + // flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as + // a stray log. + LOG.info("The topicId does not exist in {}, treat it as a stray log.", log); + return true; + } + + Uuid topicId = log.topicId().get(); + int partitionId = log.topicPartition().partition(); + PartitionRegistration partition = newTopicsImage.getPartition(topicId, partitionId); + if (partition == null) { + LOG.info("Found stray log dir {}: the topicId {} does not exist in the metadata image.", log, topicId); + return true; + } else { + List<Integer> replicas = Arrays.stream(partition.replicas).boxed().toList(); + if (!replicas.contains(brokerId)) { + LOG.info("Found stray log dir {}: the current replica assignment {} does not contain the local brokerId {}.", + log, replicas.stream().map(String::valueOf).collect(Collectors.joining(", ", "[", "]")), brokerId); + return true; + } else { + return false; + } + } + } +} diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java new file mode 100644 index 00000000000..a86ec3691cf --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.DirectoryId; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.metadata.LeaderRecoveryState; +import org.apache.kafka.metadata.PartitionRegistration; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class LogManagerTest { + + private static final TopicIdPartition FOO_0 = new TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), new TopicPartition("foo", 0)); + private static final TopicIdPartition FOO_1 = new TopicIdPartition(Uuid.fromString("Sl08ZXU2QW6uF5hIoSzc8w"), new TopicPartition("foo", 1)); + private static final TopicIdPartition BAR_0 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new TopicPartition("bar", 0)); + private static final TopicIdPartition BAR_1 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new TopicPartition("bar", 1)); + private static final TopicIdPartition QUUX_0 = new TopicIdPartition(Uuid.fromString("YS9owjv5TG2OlsvBM0Qw6g"), new TopicPartition("quux", 0)); + + @SuppressWarnings("unchecked") + @Test + public void testWaitForAllToComplete() throws ExecutionException, InterruptedException { + AtomicInteger invokedCount = new AtomicInteger(0); + Future<Boolean> success = mock(Future.class); + when(success.get()).thenAnswer(a -> { + invokedCount.incrementAndGet(); + return true; + }); + Future<Boolean> failure = mock(Future.class); + when(failure.get()).thenAnswer(a -> { + invokedCount.incrementAndGet(); + throw new RuntimeException(); + }); + + AtomicInteger failureCount = new AtomicInteger(0); + // all futures should be evaluated + assertFalse(LogManager.waitForAllToComplete(List.of(success, failure), t -> failureCount.incrementAndGet())); + assertEquals(2, invokedCount.get()); + assertEquals(1, failureCount.get()); + assertFalse(LogManager.waitForAllToComplete(List.of(failure, success), t -> failureCount.incrementAndGet())); + assertEquals(4, invokedCount.get()); + assertEquals(2, failureCount.get()); + assertTrue(LogManager.waitForAllToComplete(List.of(success, success), t -> failureCount.incrementAndGet())); + assertEquals(6, invokedCount.get()); + assertEquals(2, failureCount.get()); + assertFalse(LogManager.waitForAllToComplete(List.of(failure, failure), t -> failureCount.incrementAndGet())); + assertEquals(8, invokedCount.get()); + assertEquals(4, failureCount.get()); + } + + @Test + public void testIsStrayKraftReplicaWithEmptyImage() { + TopicsImage image = topicsImage(List.of()); + List<UnifiedLog> onDisk = Stream.of(FOO_0, FOO_1, BAR_0, BAR_1, QUUX_0).map(this::mockLog).toList(); + assertTrue(onDisk.stream().allMatch(log -> LogManager.isStrayKraftReplica(0, image, log))); + } + + @Test + public void testIsStrayKraftReplicaInImage() { + TopicsImage image = topicsImage(List.of( + topicImage(Map.of( + FOO_0, List.of(0, 1, 2))), + topicImage(Map.of( + BAR_0, List.of(0, 1, 2), + BAR_1, List.of(0, 1, 2))) + )); + List<UnifiedLog> onDisk = Stream.of(FOO_0, FOO_1, BAR_0, BAR_1, QUUX_0).map(this::mockLog).toList(); + Set<TopicPartition> expectedStrays = Stream.of(FOO_1, QUUX_0).map(TopicIdPartition::topicPartition).collect(Collectors.toSet()); + + onDisk.forEach(log -> assertEquals(expectedStrays.contains(log.topicPartition()), LogManager.isStrayKraftReplica(0, image, log))); + } + + @Test + public void testIsStrayKraftReplicaInImageWithRemoteReplicas() { + TopicsImage image = topicsImage(List.of( + topicImage(Map.of( + FOO_0, List.of(0, 1, 2))), + topicImage(Map.of( + BAR_0, List.of(1, 2, 3), + BAR_1, List.of(2, 3, 0))) + )); + List<UnifiedLog> onDisk = Stream.of(FOO_0, BAR_0, BAR_1).map(this::mockLog).toList(); + Set<TopicPartition> expectedStrays = Stream.of(BAR_0).map(TopicIdPartition::topicPartition).collect(Collectors.toSet()); + onDisk.forEach(log -> assertEquals(expectedStrays.contains(log.topicPartition()), LogManager.isStrayKraftReplica(0, image, log))); + } + + @Test + public void testIsStrayKraftMissingTopicId() { + UnifiedLog log = mock(UnifiedLog.class); + when(log.topicId()).thenReturn(Optional.empty()); + assertTrue(LogManager.isStrayKraftReplica(0, topicsImage(List.of()), log)); + } + + private TopicsImage topicsImage(List<TopicImage> topics) { + TopicsImage retval = TopicsImage.EMPTY; + for (TopicImage topic : topics) { + retval = retval.including(topic); + } + return retval; + } + + private TopicImage topicImage(Map<TopicIdPartition, List<Integer>> partitions) { + String topicName = null; + Uuid topicId = null; + for (TopicIdPartition partition : partitions.keySet()) { + if (topicId == null) { + topicId = partition.topicId(); + } else if (!topicId.equals(partition.topicId())) { + throw new IllegalArgumentException("partition topic IDs did not match"); + } + if (topicName == null) { + topicName = partition.topic(); + } else if (!topicName.equals(partition.topic())) { + throw new IllegalArgumentException("partition topic names did not match"); + } + } + if (topicId == null) { + throw new IllegalArgumentException("Invalid empty partitions map."); + } + Map<Integer, PartitionRegistration> partitionRegistrations = partitions.entrySet().stream().collect( + Collectors.toMap( + entry -> entry.getKey().partition(), + entry -> new PartitionRegistration.Builder() + .setReplicas(entry.getValue().stream().mapToInt(Integer::intValue).toArray()) + .setDirectories(DirectoryId.unassignedArray(entry.getValue().size())) + .setIsr(entry.getValue().stream().mapToInt(Integer::intValue).toArray()) + .setLeader(entry.getValue().get(0)) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build())); + return new TopicImage(topicName, topicId, partitionRegistrations); + } + + private UnifiedLog mockLog(TopicIdPartition topicIdPartition) { + UnifiedLog log = mock(UnifiedLog.class); + when(log.topicId()).thenReturn(Optional.of(topicIdPartition.topicId())); + when(log.topicPartition()).thenReturn(topicIdPartition.topicPartition()); + return log; + } +} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java index 73699bffac6..e82aeff7f8f 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/FetchFromLeaderWithCorruptedCheckpointTest.java @@ -16,10 +16,10 @@ */ package org.apache.kafka.tiered.storage.integration; -import kafka.log.LogManager; import kafka.server.ReplicaManager; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; +import org.apache.kafka.storage.internals.log.LogManager; import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; import org.apache.kafka.tiered.storage.TieredStorageTestHarness; import org.apache.kafka.tiered.storage.specs.KeyValueSpec; @@ -50,7 +50,7 @@ public class FetchFromLeaderWithCorruptedCheckpointTest extends TieredStorageTes final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, List.of(broker0, broker1))); final List<String> checkpointFiles = List.of( ReplicaManager.HighWatermarkFilename(), - LogManager.RecoveryPointCheckpointFile(), + LogManager.RECOVERY_POINT_CHECKPOINT_FILE, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME); builder.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment,