[ 
https://issues.apache.org/jira/browse/KAFKA-15486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15486:
--------------------------------------
    Description: 
Currently, Apache Kafka offers the ability to detect and capture I/O errors 
when accessing the file system via the standard {{IOException}} from the JDK. 
There are cases however, where I/O errors are only reported via exceptions such 
as {{{}BufferOverflowException{}}}, without associated {{IOException}} on the 
produce or read path, so that the data volume is not detected as unhealthy and 
not included in the list of offline directories.

Specifically, we faced the following scenario on a broker:
 * The data volume hosting a log directory became saturated.
 * As expected, {{IOException}} were generated on the read/write path.
 * The log directory was set as offline and since it was the only log directory 
configured on the broker, Kafka automatically shut down.
 * Additional space was added to the data volume.
 * Kafka was then restarted.
 * No more {{IOException}} occurred, however {{BufferOverflowException}} *[*]* 
were raised while trying to delete log segments in oder to honour the retention 
settings of a topic. The log directory was not moved to offline and the 
exceptions kept re-occurring indefinitely.

The retention settings were therefore not applied in this case. The mitigation 
consisted in restarting Kafka.

It may be worth considering adding {{BufferOverflowException}} and 
{{BufferUnderflowException}} (and any other related exception from the JDK NIO 
library which surfaces an I/O error) to the current {{IOException}} as a proxy 
of storage I/O failure, although there may be known unintended consequences in 
doing so which is the reason they were not added already, or, it may be too 
marginal of an impact to modify the main I/O failure handing path to risk 
exposing it to such unknown unintended consequences.

*[*]*
{code:java}
java.nio.BufferOverflowException
        at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)
        at 
java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882)
        at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
        at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
        at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506)
        at kafka.log.Log.$anonfun$roll$8(Log.scala:2066)
        at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066)
        at scala.Option.foreach(Option.scala:437)
        at kafka.log.Log.$anonfun$roll$2(Log.scala:2066)
        at kafka.log.Log.roll(Log.scala:2482)
        at kafka.log.Log.maybeRoll(Log.scala:2017)
        at kafka.log.Log.append(Log.scala:1292)
        at kafka.log.Log.appendAsFollower(Log.scala:1155)
        at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1023)
        at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1030)
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:178)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:356)
        at scala.Option.foreach(Option.scala:437)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:345)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:344)
        at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
      
        at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:344)
        at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:141)
        at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:140)
        at scala.Option.foreach(Option.scala:437)
        at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:140)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:123)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{code}
 

 

 

 

  was:
Currently, Apache Kafka offers the ability to detect and capture I/O errors 
when accessing the file system via the standard {{IOException}} from the JDK. 
There are cases however, where I/O errors are only reported via exceptions such 
as {{{}BufferOverflowException{}}}, without associated {{IOException}} on the 
produce or read path, so that the data volume is not detected as unhealthy and 
not included in the list of offline directories.

Specifically, we faced the following scenario on a broker:
 * The data volume hosting a log directory became saturated.
 * As expected, {{IOException}} were generated on the read/write path.
 * The log directory was set as offline and since it was the only log directory 
configured on the broker, Kafka automatically shut down.
 * Additional space was added to the data volume.
 * Kafka was then restarted.
 * No more {{IOException}} occurred, however {{BufferOverflowException}} *[*]* 
were raised while trying to delete log segments in oder to honour the retention 
settings of a topic. The log directory was not moved to offline and the 
exceptions kept re-occurring indefinitely.

The retention settings were therefore not applied in this case. The mitigation 
consisted in restarting Kafka.

It may be worth considering adding {{BufferOverflowException}} and 
{{BufferUnderflowException}} (and any other related exception from the JDK NIO 
library which surfaces an I/O error) to the current {{IOException}} as a proxy 
of storage I/O failure, although there may be known unintended consequences in 
doing so which is the reason they were not added already, or, it may be too 
marginal of an impact to modify the main I/O failure handing path to risk 
exposing it to such unknown unintended consequences.

*[*]*
{code:java}
java.nio.BufferOverflowException     at 
java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)     at 
java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882)     at 
kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)     at 
kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)     at 
kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506)     at 
kafka.log.Log.$anonfun$roll$8(Log.scala:2066)     at 
kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066)     at 
scala.Option.foreach(Option.scala:437)     at 
kafka.log.Log.$anonfun$roll$2(Log.scala:2066)     at 
kafka.log.Log.roll(Log.scala:2482)     at 
kafka.log.Log.maybeRoll(Log.scala:2017)     at 
kafka.log.Log.append(Log.scala:1292)     at 
kafka.log.Log.appendAsFollower(Log.scala:1155)     at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1023)
     at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1030)
     at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:178)
     at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:356)
     at scala.Option.foreach(Option.scala:437)     at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:345)
     at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:344)
     at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
     at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
     at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
     at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
     at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:344)
     at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:141)
     at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:140)
     at scala.Option.foreach(Option.scala:437)     at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:140)  
   at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:123)     
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{code}
 

 

 

 


> Include NIO exceptions as I/O exceptions to be part of the disk failure 
> handling mechanism
> ------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15486
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15486
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core, jbod
>            Reporter: Alexandre Dupriez
>            Priority: Minor
>
> Currently, Apache Kafka offers the ability to detect and capture I/O errors 
> when accessing the file system via the standard {{IOException}} from the JDK. 
> There are cases however, where I/O errors are only reported via exceptions 
> such as {{{}BufferOverflowException{}}}, without associated {{IOException}} 
> on the produce or read path, so that the data volume is not detected as 
> unhealthy and not included in the list of offline directories.
> Specifically, we faced the following scenario on a broker:
>  * The data volume hosting a log directory became saturated.
>  * As expected, {{IOException}} were generated on the read/write path.
>  * The log directory was set as offline and since it was the only log 
> directory configured on the broker, Kafka automatically shut down.
>  * Additional space was added to the data volume.
>  * Kafka was then restarted.
>  * No more {{IOException}} occurred, however {{BufferOverflowException}} 
> *[*]* were raised while trying to delete log segments in oder to honour the 
> retention settings of a topic. The log directory was not moved to offline and 
> the exceptions kept re-occurring indefinitely.
> The retention settings were therefore not applied in this case. The 
> mitigation consisted in restarting Kafka.
> It may be worth considering adding {{BufferOverflowException}} and 
> {{BufferUnderflowException}} (and any other related exception from the JDK 
> NIO library which surfaces an I/O error) to the current {{IOException}} as a 
> proxy of storage I/O failure, although there may be known unintended 
> consequences in doing so which is the reason they were not added already, or, 
> it may be too marginal of an impact to modify the main I/O failure handing 
> path to risk exposing it to such unknown unintended consequences.
> *[*]*
> {code:java}
> java.nio.BufferOverflowException
>         at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)
>         at 
> java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882)
>         at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
>         at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
>         at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506)
>         at kafka.log.Log.$anonfun$roll$8(Log.scala:2066)
>         at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066)
>         at scala.Option.foreach(Option.scala:437)
>         at kafka.log.Log.$anonfun$roll$2(Log.scala:2066)
>         at kafka.log.Log.roll(Log.scala:2482)
>         at kafka.log.Log.maybeRoll(Log.scala:2017)
>         at kafka.log.Log.append(Log.scala:1292)
>         at kafka.log.Log.appendAsFollower(Log.scala:1155)
>         at 
> kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1023)
>         at 
> kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1030)
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:178)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:356)
>         at scala.Option.foreach(Option.scala:437)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:345)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:344)
>         at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>         at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
>         at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
>       
>         at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:344)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:141)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:140)
>         at scala.Option.foreach(Option.scala:437)
>         at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:140)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:123)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to