[ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7464.
-----------------------------
    Resolution: Fixed

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --------------------------------------------------------------
>
>                 Key: KAFKA-7464
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7464
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Critical
>             Fix For: 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
>         at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
>         at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
>         at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at 
> org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
> ~[kafka-clients-2.0.0.22.jar:?]
>         at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at scala.Option.foreach(Option.scala:257) 
> ~[scala-library-2.11.12.jar:?]
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1856) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.log.Log.appendAsFollower(Log.scala:743) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at scala.Option.foreach(Option.scala:257) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[scala-library-2.11.12.jar:?]
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
>  ~[kafka_2.11-2.0.0.22.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115) 
> ~[kafka_2.11-2.0.0.22.jar:?]
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) 
> ~[kafka_2.11-2.0.0.22.jar:?]{noformat}
> Worse more, we found out that if there is a exception thrown in 
> ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and 
> HW checkpoint and in our case we didn't see the "Shut down completely" log:
> {code:java}
>  def shutdown(checkpointHW: Boolean = true) {
>     info("Shutting down")
>     removeMetrics()
>     if (logDirFailureHandler != null)
>       logDirFailureHandler.shutdown()
>     replicaFetcherManager.shutdown()
>     replicaAlterLogDirsManager.shutdown()
>     delayedFetchPurgatory.shutdown()
>     delayedProducePurgatory.shutdown()
>     delayedDeleteRecordsPurgatory.shutdown()
>     if (checkpointHW)
>       checkpointHighWatermarks()
>     info("Shut down completely")
>   }
> {code}
> The reason why we see this is that after KAFKA-6051, we close leaderEndPoint 
> in replica fetcher thread initiateShutdown to try to preempt in-progress 
> fetch request and accelerate repica fetcher thread shutdown. However, 
> leaderEndpoint can throw an Exception when the replica fetcher thread is 
> still actively fetching.
>  
> I am wondering whether we should try to catch the exception thrown in 
> "leaderEndpoint.close()" instead of letting it throw up in the call stack. In 
> my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown 
> will be called when:
>  # Server shutdown – In this case we will shut down the process anyway so 
> even though we fail to close leader enpoint cleanly there is no harm.
>  # shutdownIdleFetcherThread – In this case the fetcher thread is idle and we 
> will not use it again anyway so there is no harm either.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to