Alexandre Dupriez created KAFKA-15486:
-----------------------------------------
Summary: Include NIO exceptions as I/O exceptions to be part of
disk failure handling
Key: KAFKA-15486
URL: https://issues.apache.org/jira/browse/KAFKA-15486
Project: Kafka
Issue Type: Improvement
Components: core, jbod
Reporter: Alexandre Dupriez
Currently, Apache Kafka offers the ability to detect and capture I/O errors
when accessing the file system via the standard {{IOException}} from the JDK.
There are cases however, where I/O errors are only reported via exceptions such
as {{{}BufferOverflowException{}}}, without associated {{IOException}} on the
produce or read path, so that the data volume is not detected as unhealthy and
not included in the list of offline directories.
Specifically, we faced the following scenario on a broker:
* The data volume hosting a log directory became saturated.
* As expected, {{IOException}} were generated on the read/write path.
* The log directory was set as offline and since it was the only log directory
configured on the broker, Kafka automatically shut down.
* Additional space was added to the data volume.
* Kafka was then restarted.
* No more {{IOException}} occurred, however {{BufferOverflowException}} *[*]*
were raised while trying to delete log segments in oder to honour the retention
settings of a topic. The log directory was not moved to offline and the
exceptions kept re-occurring indefinitely.
The retention settings were therefore not applied in this case. The mitigation
consisted in restarting Kafka.
It may be worth considering adding {{BufferOverflowException}} and
{{BufferUnderflowException}} (and any other related exception from the JDK NIO
library which surfaces an I/O error) to the current {{IOException}} as a proxy
of storage I/O failure, although there may be known unintended consequences in
doing so which is the reason they were not added already, or, it may be too
marginal of an impact to modify the main I/O failure handing path to risk
exposing it to such unknown unintended consequences.
*[*]*
{code:java}
java.nio.BufferOverflowException at
java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674) at
java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882) at
kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) at
kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) at
kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506) at
kafka.log.Log.$anonfun$roll$8(Log.scala:2066) at
kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066) at
scala.Option.foreach(Option.scala:437) at
kafka.log.Log.$anonfun$roll$2(Log.scala:2066) at
kafka.log.Log.roll(Log.scala:2482) at
kafka.log.Log.maybeRoll(Log.scala:2017) at
kafka.log.Log.append(Log.scala:1292) at
kafka.log.Log.appendAsFollower(Log.scala:1155) at
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1023)
at
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1030)
at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:178)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:356)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:345)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:344)
at
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
at
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:344)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:141)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:140)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:140)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:123)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)