[
https://issues.apache.org/jira/browse/KAFKA-695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13565524#comment-13565524
]
Jun Rao commented on KAFKA-695:
-------------------------------
Just realized there are a couple of other corner cases that we need to handle.
First, we shut down ExpiredRequestReaper by interrupting the thread. This of
course, can close a filechannel, which can cause a KafkaStorageException during
a subsequent log append. This means that we may unnecessarily do an unclean
shutdown. Second, the replicaFetcher thread is shut down through interruption
too. When the thread is interrupted, it can be in the middle of a log append.
Similarly, this means another unnecessary unclean shutdown.
We can remove interruption during shutdown and just rely on the isRunning flag.
This is fine for the first case since ExpiredRequestReaper wakes up every
200ms. In the second case, this means that we may have to wait for the last
outstanding fetch request to complete. This can potentially make shutdown
longer. Not sure if this is a big concern.
> Broker shuts down due to attempt to read a closed index file
> ------------------------------------------------------------
>
> Key: KAFKA-695
> URL: https://issues.apache.org/jira/browse/KAFKA-695
> Project: Kafka
> Issue Type: Bug
> Components: log
> Affects Versions: 0.8
> Reporter: Neha Narkhede
> Assignee: Jun Rao
> Priority: Blocker
> Labels: p1
> Fix For: 0.8
>
> Attachments: kafka-695.patch, kafka-695_v2.patch
>
>
> Broker shuts down with the following error message -
> 013/01/11 01:43:51.320 ERROR [KafkaApis] [request-expiration-task] [kafka] []
> [KafkaApi-277] error when processing request
> (service_metrics,2,39192,2000000)
> java.nio.channels.ClosedChannelException
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
> at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613)
> at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:82)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:76)
> at kafka.log.LogSegment.read(LogSegment.scala:106)
> at kafka.log.Log.read(Log.scala:386)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.Map$Map1.map(Map.scala:93)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
> at
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:519)
> at
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:501)
> at
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222)
> at java.lang.Thread.run(Thread.java:619)
> 2013/01/11 01:43:52.815 INFO [Processor] [kafka-processor-10251-2] [kafka] []
> Closing socket connection to /172.20.72.244.
> 2013/01/11 01:43:54.286 INFO [Processor] [kafka-processor-10251-3] [kafka] []
> Closing socket connection to /172.20.72.243.
> 2013/01/11 01:43:54.385 ERROR [LogManager] [kafka-logflusher-1] [kafka] []
> [Log Manager on Broker 277] Error flushing topic service_metrics
> java.nio.channels.ClosedChannelException
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
> at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349)
> at
> kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154)
> at
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
> at
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153)
> at kafka.log.LogSegment.flush(LogSegment.scala:151)
> at kafka.log.Log.flush(Log.scala:493)
> at
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319)
> at
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
> at
> kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310)
> at
> kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144)
> at kafka.utils.Utils$$anon$2.run(Utils.scala:66)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:619)
> 2013/01/11 01:43:54.447 FATAL [LogManager] [kafka-logflusher-1] [kafka] []
> [Log Manager on Broker 277] Halting due to unrecoverable I/O error while
> flushing logs: null
> java.nio.channels.ClosedChannelException
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
> at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349)
> at
> kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154)
> at
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
> at
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153)
> at kafka.log.LogSegment.flush(LogSegment.scala:151)
> at kafka.log.Log.flush(Log.scala:493)
> at kafka.log.LogSegment.flush(LogSegment.scala:151)
> at kafka.log.Log.flush(Log.scala:493)
> at
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319)
> at
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
> at
> kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310)
> at
> kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144)
> at kafka.utils.Utils$$anon$2.run(Utils.scala:66)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:619)
> 2013/01/11 01:43:54.512 INFO [ComponentsContextLoaderListener] [Thread-2]
> [kafka] [] Shutting down...
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira