[ https://issues.apache.org/jira/browse/SPARK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14723705#comment-14723705 ]
Marcelo Vanzin commented on SPARK-10358: ---------------------------------------- This is fixed in 1.4 and later, I doubt we'll accept patches to 1.3 at this point in time... > Spark-sql throws IOException on exit when using HDFS to store event log. > ------------------------------------------------------------------------ > > Key: SPARK-10358 > URL: https://issues.apache.org/jira/browse/SPARK-10358 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.3.1 > Environment: * spark-1.3.1-bin-hadoop2.6 > * hadoop-2.6.0 > * Red hat 2.6.32-504.el6.x86_64 > Reporter: Sioa Song > Priority: Minor > Fix For: 1.3.1 > > > h2. Summary > In Spark 1.3.1, if using HDFS to store event log, spark-sql will throw an > "java.io.IOException: Filesystem closed" when exit. > h2. How to reproduce > 1. Enable event log mechanism, and configure the file location to HDFS. > You can do this by setting these two properties in spark-defaults.conf: > spark.eventLog.enabled true > spark.eventLog.dir hdfs://xxxxx:xxxxx/spark-events > 2. start spark-sql, and type exit once it starts. > {noformat} > spark-sql> exit; > 15/08/14 06:29:20 ERROR scheduler.LiveListenerBus: Listener > EventLoggingListener threw an exception > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) > > at > org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) > > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144) > > at > org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:181) > > at > org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54) > > at > org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) > > at > org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) > > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) > at > org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36) > > at > org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76) > > at > org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) > > at > org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1678) > at > org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60) > > Caused by: java.io.IOException: Filesystem closed > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) > at > org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985) > at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) > at > org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) > ... 19 more > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/metrics/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages/stage/kill,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/static,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors/threadDump,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/environment/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/environment,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/storage/rdd/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/storage/rdd,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/storage/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/storage,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages/pool/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages/pool,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages/stage/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages/stage,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/jobs/job/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/jobs/job,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/jobs/json,null} > 15/08/14 06:29:20 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/jobs,null} > 15/08/14 06:29:20 INFO ui.SparkUI: Stopped Spark web UI at > http://9.111.251.177:4040 > 15/08/14 06:29:20 INFO scheduler.DAGScheduler: Stopping DAGScheduler > 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Shutting down all > executors > 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Asking each > executor to shut down > Exception in thread "Thread-4" java.io.IOException: Filesystem closed > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1986) > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118) > > at > org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114) > > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114) > > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) > at > org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:198) > > at > org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) > at > org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) > at scala.Option.foreach(Option.scala:236) > at org.apache.spark.SparkContext.stop(SparkContext.scala:1391) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66) > > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107) > > {noformat} > h2. Analysis > Enabling DEBUG level, we can see more information: > {noformat} > 15/08/14 09:32:34 DEBUG SessionState: Removing resource dir > /tmp/a9d3b5b8-3329-4e93-aee2-0a3496d661fa_resources > 15/08/14 09:32:34 DEBUG SparkSQLEnv: Shutting down Spark SQL Environment > 15/08/14 09:32:34 DEBUG DiskBlockManager: Shutdown hook called > 15/08/14 09:32:34 DEBUG DFSClient: DFSClient writeChunk allocating new packet > seqno=2, src=/spark/events/app-20150814093225-0002.snappy.inprogress, > packetSize=65532, chunksPerPacket=127,bytesCurBlock=2048 > 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 2 > 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 3 > 15/08/14 09:32:34 DEBUG DFSClient: Waiting for ack for: 3 > 15/08/14 09:32:34 DEBUG Utils: Shutdown hook called > 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block > BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet > packet seqno:2 offsetInBlock:2048 lastPacketInBlock:false > lastByteOffsetInBlock: 2399 > 15/08/14 09:32:34 INFO SparkContext: END POST APPLICATION END > 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 2 status: SUCCESS > downstreamAckTimeNanos: 0 > 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block > BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet > packet seqno:3 offsetInBlock:2399 lastPacketInBlock:true > lastByteOffsetInBlock: 2399 > 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 3 status: SUCCESS > downstreamAckTimeNanos: 0 > 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping > org.spark-project.jetty.server.Server@489bb457 > 15/08/14 09:32:34 DEBUG DFSClient: Closing old block > BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 > 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping > SelectChannelConnector@0.0.0.0:4040 > 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping > org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@15e3d24a > > 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-45 > Selector0,5,main] on > org.spark-project.jetty.io.nio.SelectorManager$1@25964fe8 > 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to > /9.111.251.177:9000 from root sending #6 > 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to > /9.111.251.177:9000 from root got value #6 > 15/08/14 09:32:34 DEBUG ProtobufRpcEngine: Call: complete took 4ms > 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-46 > Selector1,5,main] on > org.spark-project.jetty.io.nio.SelectorManager$1@2f581b9f > 15/08/14 09:32:34 DEBUG Client: stopping client from cache: > org.apache.hadoop.ipc.Client@3aca5e2 > 15/08/14 09:32:34 DEBUG Client: removing client from cache: > org.apache.hadoop.ipc.Client@3aca5e2 > 15/08/14 09:32:34 DEBUG Client: stopping actual client because no more > references remain: org.apache.hadoop.ipc.Client@3aca5e2 > 15/08/14 09:32:34 DEBUG Client: Stopping client > 15/08/14 09:32:34 *ERROR* LiveListenerBus: Listener EventLoggingListener > threw an exception > java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) > > at > org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) > > at scala.Option.foreach(Option.scala:236) > {noformat} > From the stacktrace, we can see a "Shutdown hook called" of > org.apache.spark.util.Utils, and then the shutdown hook of HDFS where > DFSClient is stopped, and finally the SparkSQLEnv.stop, where the exception > was thrown. > Clearly, *Spark is trying to add new event log message after HDFS client is > closed, which caused the exception*. > After checking org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver, we > found that the SparkSQLEnv.stop() is invoked from a normal java runtime > shutdown hook: > {code} > // Clean up after we exit > Runtime.getRuntime.addShutdownHook( > new Thread() { > override def run() { > SparkSQLEnv.stop() > } > } > ) > {code} > This shutdown hook has lower priority, and will be executed after the > DFSClient is closed. > h2. Solution > We changed it by *using Hadoop's ShutdownHookManager and assigning a higher > priority than HDFS's shutdown hook*. It fixed the problem. > We'll link a pull request later for review. > The link is [https://github.com/apache/spark/pull/8530#issuecomment-136236733] -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org