This is an automated email from the ASF dual-hosted git repository. clolov pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit c052349b1b86a7a457322956ea865ae386856298 Author: Ilyas Toumlilt <[email protected]> AuthorDate: Thu Jan 29 12:11:53 2026 +0100 KAFKA-19571: Race condition between log segment flush and file deletion causing log dir to go offline (#20289) Reviewers: Jun Rao <[email protected]>, Luke Chen <[email protected]>, Mickael Maison <[email protected]> --- core/src/main/scala/kafka/log/LogManager.scala | 10 +++-- .../test/scala/unit/kafka/log/LogManagerTest.scala | 45 +++++++++++++++++++++- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index bfee35061f8..1d9e4edffc9 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1247,9 +1247,13 @@ class LogManager(logDirs: Seq[File], try { sourceLog.foreach { srcLog => srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true) - // Now that replica in source log directory has been successfully renamed for deletion. - // Close the log, update checkpoint files, and enqueue this log to be deleted. - srcLog.close() + // Now that replica in source log directory has been successfully renamed for deletion, + // update checkpoint files and enqueue this log to be deleted. + // Note: We intentionally do NOT close the log here to avoid race conditions where concurrent + // operations (e.g., log flusher, fetch requests) might encounter ClosedChannelException. + // The log will be deleted asynchronously by the background delete-logs thread. + // File handles are intentionally left open; Unix semantics allow the renamed files + // to remain accessible until all handles are closed. val logDir = srcLog.parentDirFile val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index a8946a3d139..7736f6736e9 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.{ArgumentCaptor, ArgumentMatchers} -import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify} +import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify, when} import java.io._ import java.lang.{Long => JLong} @@ -43,7 +43,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler} -import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} +import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.function.Executable @@ -1112,6 +1112,47 @@ class LogManagerTest { assertEquals(2, logManager.directoryIdsSet.size) } + /** + * Test that replaceCurrentWithFutureLog does not close the source log, preventing race conditions + * where a concurrent read/flush could fail with ClosedChannelException. + */ + @Test + def testReplaceCurrentWithFutureLogDoesNotCloseSourceLog(): Unit = { + val logDir1 = TestUtils.tempDir() + val logDir2 = TestUtils.tempDir() + logManager = createLogManager(Seq(logDir1, logDir2)) + logManager.startup(Set.empty) + + val topicName = "replace-log" + val tp = new TopicPartition(topicName, 0) + val currentLog = logManager.getOrCreateLog(tp, topicId = Optional.empty) + // Create a future log in a different directory + logManager.maybeUpdatePreferredLogDir(tp, logDir2.getAbsolutePath) + logManager.getOrCreateLog(tp, isFuture = true, topicId = Optional.empty) + + // Spy on the source log to verify close() is not called + val spyCurrentLog = spy(currentLog) + // Inject the spy into the map + val field = classOf[LogManager].getDeclaredField("currentLogs") + field.setAccessible(true) + val currentLogs = field.get(logManager).asInstanceOf[ConcurrentHashMap[TopicPartition, UnifiedLog]] + currentLogs.put(tp, spyCurrentLog) + + logManager.replaceCurrentWithFutureLog(tp) + + // Verify close() was NOT called on the source log + verify(spyCurrentLog, never()).close() + + // Verify the source log was renamed to .delete + assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX)) + + // Verify that flush() can be called without error (no ClosedChannelException) + // Mock logEndOffset > 0 to trigger actual flush (flush only happens when flushOffset > recoveryPoint) + when(spyCurrentLog.logEndOffset()).thenReturn(100L) + val flushLog: Executable = () => spyCurrentLog.flush(false) + assertDoesNotThrow(flushLog) + } + @Test def testCheckpointLogStartOffsetForRemoteTopic(): Unit = { logManager.shutdown()
