[ 
https://issues.apache.org/jira/browse/SPARK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14723757#comment-14723757
 ] 

Marcelo Vanzin commented on SPARK-10358:
----------------------------------------

Original issue: SPARK-6014

> Spark-sql throws IOException on exit when using HDFS to store event log.
> ------------------------------------------------------------------------
>
>                 Key: SPARK-10358
>                 URL: https://issues.apache.org/jira/browse/SPARK-10358
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.1
>         Environment: * spark-1.3.1-bin-hadoop2.6
> * hadoop-2.6.0
> * Red hat 2.6.32-504.el6.x86_64
>            Reporter: Sioa Song
>            Priority: Minor
>             Fix For: 1.3.1
>
>
> h2. Summary 
> In Spark 1.3.1, if using HDFS to store event log, spark-sql will throw an 
> "java.io.IOException: Filesystem closed" when exit. 
> h2. How to reproduce 
> 1. Enable event log mechanism, and configure the file location to HDFS. 
>    You can do this by setting these two properties in spark-defaults.conf: 
> spark.eventLog.enabled          true 
> spark.eventLog.dir              hdfs://xxxxx:xxxxx/spark-events 
> 2. start spark-sql, and type exit once it starts. 
> {noformat} 
> spark-sql> exit; 
> 15/08/14 06:29:20 ERROR scheduler.LiveListenerBus: Listener 
> EventLoggingListener threw an exception 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>  
> at java.lang.reflect.Method.invoke(Method.java:597) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>  
> at scala.Option.foreach(Option.scala:236) 
> at 
> org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:181)
>  
> at 
> org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54)
>  
> at 
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>  
> at 
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>  
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) 
> at 
> org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
>  
> at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
>  
> at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>  
> at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>  
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1678) 
> at 
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
>  
> Caused by: java.io.IOException: Filesystem closed 
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) 
> at 
> org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985) 
> at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) 
> at 
> org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) 
> ... 19 more 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/metrics/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/static,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors/threadDump,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/environment/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/environment,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage/rdd,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/pool/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/pool,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/stage/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/stage,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs/job/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs/job,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs/json,null} 
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs,null} 
> 15/08/14 06:29:20 INFO ui.SparkUI: Stopped Spark web UI at 
> http://9.111.251.177:4040 
> 15/08/14 06:29:20 INFO scheduler.DAGScheduler: Stopping DAGScheduler 
> 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Shutting down all 
> executors 
> 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Asking each 
> executor to shut down 
> Exception in thread "Thread-4" java.io.IOException: Filesystem closed 
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) 
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1986) 
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
>  
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
>  
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
>  
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) 
> at 
> org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:198)
>  
> at 
> org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) 
> at 
> org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) 
> at scala.Option.foreach(Option.scala:236) 
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1391) 
> at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66)
>  
> at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)
>  
> {noformat} 
> h2. Analysis 
> Enabling DEBUG level, we can see more information: 
> {noformat} 
> 15/08/14 09:32:34 DEBUG SessionState: Removing resource dir 
> /tmp/a9d3b5b8-3329-4e93-aee2-0a3496d661fa_resources 
> 15/08/14 09:32:34 DEBUG SparkSQLEnv: Shutting down Spark SQL Environment 
> 15/08/14 09:32:34 DEBUG DiskBlockManager: Shutdown hook called 
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient writeChunk allocating new packet 
> seqno=2, src=/spark/events/app-20150814093225-0002.snappy.inprogress, 
> packetSize=65532, chunksPerPacket=127,bytesCurBlock=2048 
> 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 2 
> 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 3 
> 15/08/14 09:32:34 DEBUG DFSClient: Waiting for ack for: 3 
> 15/08/14 09:32:34 DEBUG Utils: Shutdown hook called 
> 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block 
> BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet 
> packet seqno:2 offsetInBlock:2048 lastPacketInBlock:false 
> lastByteOffsetInBlock: 2399 
> 15/08/14 09:32:34 INFO SparkContext: END POST APPLICATION END 
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 2 status: SUCCESS 
> downstreamAckTimeNanos: 0 
> 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block 
> BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet 
> packet seqno:3 offsetInBlock:2399 lastPacketInBlock:true 
> lastByteOffsetInBlock: 2399 
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 3 status: SUCCESS 
> downstreamAckTimeNanos: 0 
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping 
> org.spark-project.jetty.server.Server@489bb457 
> 15/08/14 09:32:34 DEBUG DFSClient: Closing old block 
> BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping 
> SelectChannelConnector@0.0.0.0:4040 
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping 
> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@15e3d24a
>  
> 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-45 
> Selector0,5,main] on 
> org.spark-project.jetty.io.nio.SelectorManager$1@25964fe8 
> 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to 
> /9.111.251.177:9000 from root sending #6 
> 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to 
> /9.111.251.177:9000 from root got value #6 
> 15/08/14 09:32:34 DEBUG ProtobufRpcEngine: Call: complete took 4ms 
> 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-46 
> Selector1,5,main] on 
> org.spark-project.jetty.io.nio.SelectorManager$1@2f581b9f 
> 15/08/14 09:32:34 DEBUG Client: stopping client from cache: 
> org.apache.hadoop.ipc.Client@3aca5e2 
> 15/08/14 09:32:34 DEBUG Client: removing client from cache: 
> org.apache.hadoop.ipc.Client@3aca5e2 
> 15/08/14 09:32:34 DEBUG Client: stopping actual client because no more 
> references remain: org.apache.hadoop.ipc.Client@3aca5e2 
> 15/08/14 09:32:34 DEBUG Client: Stopping client 
> 15/08/14 09:32:34 *ERROR* LiveListenerBus: Listener EventLoggingListener 
> threw an exception 
> java.lang.reflect.InvocationTargetException 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>  
> at java.lang.reflect.Method.invoke(Method.java:597) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>  
> at scala.Option.foreach(Option.scala:236) 
> {noformat} 
> From the stacktrace, we can see a "Shutdown hook called" of 
> org.apache.spark.util.Utils, and then the shutdown hook of HDFS where 
> DFSClient is stopped, and finally the SparkSQLEnv.stop, where the exception 
> was thrown. 
> Clearly, *Spark is trying to add new event log message after HDFS client is 
> closed, which caused the exception*. 
> After checking org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver, we 
> found that the SparkSQLEnv.stop() is invoked from a normal java runtime 
> shutdown hook: 
> {code} 
>     // Clean up after we exit 
>     Runtime.getRuntime.addShutdownHook( 
>       new Thread() { 
>         override def run() { 
>           SparkSQLEnv.stop() 
>         } 
>       } 
>     ) 
> {code} 
> This shutdown hook has lower priority, and will be executed after the 
> DFSClient is closed. 
> h2. Solution 
> We changed it by *using Hadoop's ShutdownHookManager and assigning a higher 
> priority than HDFS's shutdown hook*. It fixed the problem. 
> We'll link a pull request later for review. 
> The link is [https://github.com/apache/spark/pull/8530#issuecomment-136236733]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to