This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eaad6ed14a4 KAFKA-19571: Race condition between log segment flush and 
file deletion causing log dir to go offline (#20289)
eaad6ed14a4 is described below

commit eaad6ed14a458a6b19ea5d430b6369d10fc44c3e
Author: Ilyas Toumlilt <[email protected]>
AuthorDate: Thu Jan 29 12:11:53 2026 +0100

    KAFKA-19571: Race condition between log segment flush and file deletion 
causing log dir to go offline (#20289)
    
    
    Reviewers: Jun Rao <[email protected]>, Luke Chen <[email protected]>, 
Mickael Maison <[email protected]>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 10 +++--
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 45 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 481fbf7eee5..efdab2a0f31 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1247,9 +1247,13 @@ class LogManager(logDirs: Seq[File],
     try {
       sourceLog.foreach { srcLog =>
         srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true)
-        // Now that replica in source log directory has been successfully 
renamed for deletion.
-        // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
-        srcLog.close()
+        // Now that replica in source log directory has been successfully 
renamed for deletion,
+        // update checkpoint files and enqueue this log to be deleted.
+        // Note: We intentionally do NOT close the log here to avoid race 
conditions where concurrent
+        // operations (e.g., log flusher, fetch requests) might encounter 
ClosedChannelException.
+        // The log will be deleted asynchronously by the background 
delete-logs thread.
+        // File handles are intentionally left open; Unix semantics allow the 
renamed files
+        // to remain accessible until all handles are closed.
         val logDir = srcLog.parentDirFile
         val logsToCheckpoint = logsInDir(logDir)
         checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a8946a3d139..7736f6736e9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
-import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, 
verify}
+import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, 
verify, when}
 
 import java.io._
 import java.lang.{Long => JLong}
@@ -43,7 +43,7 @@ import java.util.{Collections, Optional, OptionalLong, 
Properties}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, 
Scheduler}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, 
LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager, 
LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, 
RemoteIndexCache, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, 
LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogManager => 
JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, 
ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
 import 
org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, 
OffsetCheckpointFile}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.function.Executable
@@ -1112,6 +1112,47 @@ class LogManagerTest {
     assertEquals(2, logManager.directoryIdsSet.size)
   }
 
+  /**
+   * Test that replaceCurrentWithFutureLog does not close the source log, 
preventing race conditions
+   * where a concurrent read/flush could fail with ClosedChannelException.
+   */
+  @Test
+  def testReplaceCurrentWithFutureLogDoesNotCloseSourceLog(): Unit = {
+    val logDir1 = TestUtils.tempDir()
+    val logDir2 = TestUtils.tempDir()
+    logManager = createLogManager(Seq(logDir1, logDir2))
+    logManager.startup(Set.empty)
+
+    val topicName = "replace-log"
+    val tp = new TopicPartition(topicName, 0)
+    val currentLog = logManager.getOrCreateLog(tp, topicId = Optional.empty)
+    // Create a future log in a different directory
+    logManager.maybeUpdatePreferredLogDir(tp, logDir2.getAbsolutePath)
+    logManager.getOrCreateLog(tp, isFuture = true, topicId = Optional.empty)
+
+    // Spy on the source log to verify close() is not called
+    val spyCurrentLog = spy(currentLog)
+    // Inject the spy into the map
+    val field = classOf[LogManager].getDeclaredField("currentLogs")
+    field.setAccessible(true)
+    val currentLogs = 
field.get(logManager).asInstanceOf[ConcurrentHashMap[TopicPartition, 
UnifiedLog]]
+    currentLogs.put(tp, spyCurrentLog)
+
+    logManager.replaceCurrentWithFutureLog(tp)
+
+    // Verify close() was NOT called on the source log
+    verify(spyCurrentLog, never()).close()
+
+    // Verify the source log was renamed to .delete
+    
assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX))
+
+    // Verify that flush() can be called without error (no 
ClosedChannelException)
+    // Mock logEndOffset > 0 to trigger actual flush (flush only happens when 
flushOffset > recoveryPoint)
+    when(spyCurrentLog.logEndOffset()).thenReturn(100L)
+    val flushLog: Executable = () => spyCurrentLog.flush(false)
+    assertDoesNotThrow(flushLog)
+  }
+
   @Test
   def testCheckpointLogStartOffsetForRemoteTopic(): Unit = {
     logManager.shutdown()

Reply via email to