This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new a5845b20550 KAFKA-19571: Race condition between log segment flush and
file deletion causing log dir to go offline (#20289)
a5845b20550 is described below
commit a5845b205503f6bb59409aaa584b304a0e0801d4
Author: Ilyas Toumlilt <[email protected]>
AuthorDate: Thu Jan 29 12:11:53 2026 +0100
KAFKA-19571: Race condition between log segment flush and file deletion
causing log dir to go offline (#20289)
Reviewers: Jun Rao <[email protected]>, Luke Chen <[email protected]>,
Mickael Maison <[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 10 +++--
.../test/scala/unit/kafka/log/LogManagerTest.scala | 45 +++++++++++++++++++++-
2 files changed, 50 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index d9da339e215..b7284dc6f08 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1242,9 +1242,13 @@ class LogManager(logDirs: Seq[File],
try {
sourceLog.foreach { srcLog =>
srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true)
- // Now that replica in source log directory has been successfully
renamed for deletion.
- // Close the log, update checkpoint files, and enqueue this log to be
deleted.
- srcLog.close()
+ // Now that replica in source log directory has been successfully
renamed for deletion,
+ // update checkpoint files and enqueue this log to be deleted.
+ // Note: We intentionally do NOT close the log here to avoid race
conditions where concurrent
+ // operations (e.g., log flusher, fetch requests) might encounter
ClosedChannelException.
+ // The log will be deleted asynchronously by the background
delete-logs thread.
+ // File handles are intentionally left open; Unix semantics allow the
renamed files
+ // to remain accessible until all handles are closed.
val logDir = srcLog.parentDirFile
val logsToCheckpoint = logsInDir(logDir)
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a8946a3d139..7736f6736e9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
-import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times,
verify}
+import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times,
verify, when}
import java.io._
import java.lang.{Long => JLong}
@@ -43,7 +43,7 @@ import java.util.{Collections, Optional, OptionalLong,
Properties}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime,
Scheduler}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo,
LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager,
LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig,
RemoteIndexCache, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo,
LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogManager =>
JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason,
ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
import
org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler,
OffsetCheckpointFile}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.function.Executable
@@ -1112,6 +1112,47 @@ class LogManagerTest {
assertEquals(2, logManager.directoryIdsSet.size)
}
+ /**
+ * Test that replaceCurrentWithFutureLog does not close the source log,
preventing race conditions
+ * where a concurrent read/flush could fail with ClosedChannelException.
+ */
+ @Test
+ def testReplaceCurrentWithFutureLogDoesNotCloseSourceLog(): Unit = {
+ val logDir1 = TestUtils.tempDir()
+ val logDir2 = TestUtils.tempDir()
+ logManager = createLogManager(Seq(logDir1, logDir2))
+ logManager.startup(Set.empty)
+
+ val topicName = "replace-log"
+ val tp = new TopicPartition(topicName, 0)
+ val currentLog = logManager.getOrCreateLog(tp, topicId = Optional.empty)
+ // Create a future log in a different directory
+ logManager.maybeUpdatePreferredLogDir(tp, logDir2.getAbsolutePath)
+ logManager.getOrCreateLog(tp, isFuture = true, topicId = Optional.empty)
+
+ // Spy on the source log to verify close() is not called
+ val spyCurrentLog = spy(currentLog)
+ // Inject the spy into the map
+ val field = classOf[LogManager].getDeclaredField("currentLogs")
+ field.setAccessible(true)
+ val currentLogs =
field.get(logManager).asInstanceOf[ConcurrentHashMap[TopicPartition,
UnifiedLog]]
+ currentLogs.put(tp, spyCurrentLog)
+
+ logManager.replaceCurrentWithFutureLog(tp)
+
+ // Verify close() was NOT called on the source log
+ verify(spyCurrentLog, never()).close()
+
+ // Verify the source log was renamed to .delete
+
assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX))
+
+ // Verify that flush() can be called without error (no
ClosedChannelException)
+ // Mock logEndOffset > 0 to trigger actual flush (flush only happens when
flushOffset > recoveryPoint)
+ when(spyCurrentLog.logEndOffset()).thenReturn(100L)
+ val flushLog: Executable = () => spyCurrentLog.flush(false)
+ assertDoesNotThrow(flushLog)
+ }
+
@Test
def testCheckpointLogStartOffsetForRemoteTopic(): Unit = {
logManager.shutdown()