[
https://issues.apache.org/jira/browse/KAFKA-19571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mickael Maison resolved KAFKA-19571.
------------------------------------
Fix Version/s: 4.3.0
Resolution: Fixed
> Race condition between log segment flush and file deletion causing log dir to
> go offline
> ----------------------------------------------------------------------------------------
>
> Key: KAFKA-19571
> URL: https://issues.apache.org/jira/browse/KAFKA-19571
> Project: Kafka
> Issue Type: Bug
> Components: core, log
> Affects Versions: 3.7.1
> Reporter: Ilyas Toumlilt
> Priority: Major
> Fix For: 4.3.0
>
>
> h1. Context
> We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with
> multiple disks in a JBOD setup, routine intra-broker data rebalancing is
> performed using Cruise Control to manage disk utilization. During these
> rebalance operations, a race condition between a log segment flush operation
> and the file deletion that is part of the replica's directory move. This race
> leads to a `NoSuchFileException` when the flush operation targets a file path
> that has just been deleted by the rebalance process. This exception
> incorrectly forces the broker to take the entire log directory offline.
> h1. Logs / Stack trace
> {code:java}
> 2025-07-23 19:03:30,114 WARN Failed to flush file
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka.
> common.utils.Utils)
> java.nio.file.NoSuchFileException:
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot
> at
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
> at
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
> at
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
> at
> java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
> at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
> at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
> at
> org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029)
> at
> kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766)
> at
> kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915)
> at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679)
> at java.base/java.util.Optional.ifPresent(Optional.java:183)
> at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679)
> at
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir
> /var/lib/kafka-08 with offset 24420850595 (exclusi
> ve) and recovery point 24420850595
> (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
> at
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
> at
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
> at com.yammer.metrics.core.Timer.time(Timer.java:91)
> at
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
> at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
> at kafka.log.LocalLog.flush(LocalLog.scala:176)
> at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
> at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
> at
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
> at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
> at
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task
> 'flush-log' (org.apache.kafka.server.util.KafkaScheduler)
> org.apache.kafka.common.errors.KafkaStorageException: Error while flushing
> log for topic_01-12 in dir /var/lib/kafka-08 with off
> set 24420850595 (exclusive) and recovery point 24420850595
> Caused by: java.nio.channels.ClosedChannelException
> at
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
> at
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
> at com.yammer.metrics.core.Timer.time(Timer.java:91)
> at
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
> at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
> at kafka.log.LocalLog.flush(LocalLog.scala:176)
> at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
> at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
> at
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
> at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
> at
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving
> replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code}
> Stack Trace Analysis
> The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries
> to flush a producer state snapshot that was moved during a disk rebalance;
> this {{`NoSuchFileException`}} is anticipated and handled gracefully by the
> code. As implemented in https://issues.apache.org/jira/browse/KAFKA-13403 to
> swallow the exception.
> However, the same task then attempts to flush the actual log segment, which
> fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because
> the file handles were invalidated by the directory's move. This unhandled I/O
> error propagates up and terminates the background task, causing the
> `{{{}KafkaScheduler{}}}` to log it as an uncaught
> {{{}`{}}}{{{}KafkaStorageException`{}}}. As a direct consequence, the
> `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its
> safety mechanism, taking the entire log directory offline to prevent
> potential data corruption.
> h1. Expected Behavior
> A {{`NoSuchFileException`}} in this context should not cause the entire log
> directory to be marked as offline.
> h1. Workaround
> The current workaround is to manually restart the affected Kafka broker. The
> restart clears the in-memory state, and upon re-scanning the log directories,
> the broker marks the disk as healthy again.
> h1. Proposed fix
> The proposed solution is to address the race condition at the lowest possible
> level: the *{{LogSegment.flush()}}* method. The goal is to catch the
> {{ClosedChannelException}} that occurs during the race and intelligently
> differentiate it from a legitimate I/O error.
> The fix should be implemented within the {{catch}} block for
> {{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as
> follows:
> # When a {{ClosedChannelException}} is caught, perform a check to see if the
> underlying log segment file still exists ({{{}log.file().exists(){}}}).
> # {*}If the file does not exist{*}, it confirms our race condition
> hypothesis: the replica has been moved or deleted by a rebalance operation.
> The exception is benign and should be ignored, with a {{WARN}} message logged
> for visibility.
> # {*}If the file does still exist{*}, the {{ClosedChannelException}} is
> unexpected and could signal a real hardware or filesystem issue. In this
> case, the exception should be re-thrown to trigger Kafka's standard log
> directory failure-handling mechanism.
> This targeted fix would resolve the bug by gracefully handling the known race
> condition without masking other potentially critical storage errors.
> h2. Related issues
> * https://issues.apache.org/jira/browse/KAFKA-13403 was fixed to swallow the
> first `{{{}NoSuchFileException{}}}` WARN in the above stacktrace, but not the
> underlying exception.
> * https://issues.apache.org/jira/browse/KAFKA-15391 is similar but
> different, it swallows `NoSuchFileException` for race condition on log
> directory move/delete, but not on the segment file level.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)