Luciano Sabença created KAFKA-17727:
---------------------------------------
Summary: Log dirs marked as offline incorrectly due to race
conditions on segment delete
Key: KAFKA-17727
URL: https://issues.apache.org/jira/browse/KAFKA-17727
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 3.5.2
Reporter: Luciano Sabença
We are using a kafka cluster deployed on-premise. The brokers are JBOD with
around 5/6 disks per broker. When running a intra broker rebalance (ie moving
partitions between the log dirs) triggered by cruise-control, some nodes had a
log dir marked as offline. When we looked closed the disk was normal and with a
broker restart the log dir became online again.
Investigating the issue, it's seems very similar to KAFKA-15391 and, specially,
with KAFKA-15572. The main difference on the logs between the issue we
encountered and the one described at KAFKA-15572 is that there the exception
that marked the log dir as offline was a `java.nio.file.NoSuchFileException`.
In our case, we had a `java.nio.channels.ClosedChannelException`:
{noformat}
[2024-10-03 09:48:04,704] ERROR Error while flushing log for mytopic-20 in dir
/data/0/kafka with offset 844857727 (exclusive) and recovery point 844857727
(org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:471)
at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:470)
at com.yammer.metrics.core.Timer.time(Timer.java:91)
at kafka.log.LogSegment.flush(LogSegment.scala:470)
at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at kafka.log.LocalLog.flush(LocalLog.scala:174)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1537)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1724)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1518)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1499)
at
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829){noformat}
Investigating kafka code, I believe the root cause is very similar to the one
[KAFKA-15572](https://issues.apache.org/jira/browse/KAFKA-15572). In our case,
we can see via logs that LogManager.scala was able to replace the old log with
the new one:
{noformat}
[2024-10-03 09:47:06,349] INFO Attempting to replace current log
Log(dir=/data/0/kafka/mytopic-20, topicId=jl0IzzqWSHedIunbTLziYg,
topic=mytopic, partition=20, highWatermark=844861246,
lastStableOffset=844861246, logStartOffset=842164116, logEndOffset=844861247)
with Log(dir=/data/5/kafka/mytopic-20.9fa1dfec1c4a4045b8806565eced19bd-future,
topicId=jl0IzzqWSHedIunbTLziYg, topic=mytopic, partition=20,
highWatermark=844861246, lastStableOffset=844861246, logStartOffset=842164116,
logEndOffset=844861247) for mytopic-20 (kafka.log.LogManager)
INFO Cleaning for partition mytopic-20 is resumed (kafka.log.LogManager)
[2024-10-03 09:47:06,364] INFO The current replica is successfully replaced
with the future replica for mytopic-20 (kafka.log.LogManager){noformat}
During this process, it closes the old log
([LogManager.scala#L1125|[https://github.com/apache/kafka/blob/3.5.2/core/src/main/scala/kafka/log/LogManager.scala#L1125]])
and schedule if it to be deleted. Something triggers
[UnifiedLog.roll|[https://github.com/apache/kafka/blob/3.5.2/core/src/main/scala/kafka/log/UnifiedLog.scala#L1499]]
and anything that triggers flush after that for this segment will eventually
call _LocalLog.flush_ which will try to close each segment to flush
([[LocalLog.scala#L174|https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174).]|[https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174]).|https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174).]
Here, however, the segment is already closed and thus the exception. It's not
clear to me what triggers the roll. In theory the old log was already replaced
and the only thing remaining for this old segments it to be deleted. As this is
a high volume topic, however, it's not surprising to have segments rolling
frequently.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)