[
https://issues.apache.org/jira/browse/SPARK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sioa Song resolved SPARK-10358.
-------------------------------
Resolution: Fixed
> Spark-sql throws IOException on exit when using HDFS to store event log.
> ------------------------------------------------------------------------
>
> Key: SPARK-10358
> URL: https://issues.apache.org/jira/browse/SPARK-10358
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.3.1
> Environment: * spark-1.3.1-bin-hadoop2.6
> * hadoop-2.6.0
> * Red hat 2.6.32-504.el6.x86_64
> Reporter: Sioa Song
> Priority: Minor
> Fix For: 1.3.1
>
>
> h2. Summary
> In Spark 1.3.1, if using HDFS to store event log, spark-sql will throw an
> "java.io.IOException: Filesystem closed" when exit.
> h2. How to reproduce
> 1. Enable event log mechanism, and configure the file location to HDFS.
> You can do this by setting these two properties in spark-defaults.conf:
> spark.eventLog.enabled true
> spark.eventLog.dir hdfs://xxxxx:xxxxx/spark-events
> 2. start spark-sql, and type exit once it starts.
> {noformat}
> spark-sql> exit;
> 15/08/14 06:29:20 ERROR scheduler.LiveListenerBus: Listener
> EventLoggingListener threw an exception
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>
> at
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
>
> at
> org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:181)
>
> at
> org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54)
>
> at
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>
> at
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
> at
> org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
>
> at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
>
> at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>
> at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1678)
> at
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
>
> Caused by: java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985)
> at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
> at
> org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
> ... 19 more
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/environment/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/environment,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/storage/rdd,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/storage/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/storage,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/pool/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/pool,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/jobs/job/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/jobs/job,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/jobs/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/jobs,null}
> 15/08/14 06:29:20 INFO ui.SparkUI: Stopped Spark web UI at
> http://9.111.251.177:4040
> 15/08/14 06:29:20 INFO scheduler.DAGScheduler: Stopping DAGScheduler
> 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Shutting down all
> executors
> 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Asking each
> executor to shut down
> Exception in thread "Thread-4" java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1986)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
>
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
>
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
> at
> org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:198)
>
> at
> org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391)
> at
> org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1391)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66)
>
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)
>
> {noformat}
> h2. Analysis
> Enabling DEBUG level, we can see more information:
> {noformat}
> 15/08/14 09:32:34 DEBUG SessionState: Removing resource dir
> /tmp/a9d3b5b8-3329-4e93-aee2-0a3496d661fa_resources
> 15/08/14 09:32:34 DEBUG SparkSQLEnv: Shutting down Spark SQL Environment
> 15/08/14 09:32:34 DEBUG DiskBlockManager: Shutdown hook called
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient writeChunk allocating new packet
> seqno=2, src=/spark/events/app-20150814093225-0002.snappy.inprogress,
> packetSize=65532, chunksPerPacket=127,bytesCurBlock=2048
> 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 2
> 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 3
> 15/08/14 09:32:34 DEBUG DFSClient: Waiting for ack for: 3
> 15/08/14 09:32:34 DEBUG Utils: Shutdown hook called
> 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block
> BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet
> packet seqno:2 offsetInBlock:2048 lastPacketInBlock:false
> lastByteOffsetInBlock: 2399
> 15/08/14 09:32:34 INFO SparkContext: END POST APPLICATION END
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 2 status: SUCCESS
> downstreamAckTimeNanos: 0
> 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block
> BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet
> packet seqno:3 offsetInBlock:2399 lastPacketInBlock:true
> lastByteOffsetInBlock: 2399
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 3 status: SUCCESS
> downstreamAckTimeNanos: 0
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping
> org.spark-project.jetty.server.Server@489bb457
> 15/08/14 09:32:34 DEBUG DFSClient: Closing old block
> BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping
> [email protected]:4040
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping
> org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@15e3d24a
>
> 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-45
> Selector0,5,main] on
> org.spark-project.jetty.io.nio.SelectorManager$1@25964fe8
> 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to
> /9.111.251.177:9000 from root sending #6
> 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to
> /9.111.251.177:9000 from root got value #6
> 15/08/14 09:32:34 DEBUG ProtobufRpcEngine: Call: complete took 4ms
> 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-46
> Selector1,5,main] on
> org.spark-project.jetty.io.nio.SelectorManager$1@2f581b9f
> 15/08/14 09:32:34 DEBUG Client: stopping client from cache:
> org.apache.hadoop.ipc.Client@3aca5e2
> 15/08/14 09:32:34 DEBUG Client: removing client from cache:
> org.apache.hadoop.ipc.Client@3aca5e2
> 15/08/14 09:32:34 DEBUG Client: stopping actual client because no more
> references remain: org.apache.hadoop.ipc.Client@3aca5e2
> 15/08/14 09:32:34 DEBUG Client: Stopping client
> 15/08/14 09:32:34 *ERROR* LiveListenerBus: Listener EventLoggingListener
> threw an exception
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>
> at
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
>
> at scala.Option.foreach(Option.scala:236)
> {noformat}
> From the stacktrace, we can see a "Shutdown hook called" of
> org.apache.spark.util.Utils, and then the shutdown hook of HDFS where
> DFSClient is stopped, and finally the SparkSQLEnv.stop, where the exception
> was thrown.
> Clearly, *Spark is trying to add new event log message after HDFS client is
> closed, which caused the exception*.
> After checking org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver, we
> found that the SparkSQLEnv.stop() is invoked from a normal java runtime
> shutdown hook:
> {code}
> // Clean up after we exit
> Runtime.getRuntime.addShutdownHook(
> new Thread() {
> override def run() {
> SparkSQLEnv.stop()
> }
> }
> )
> {code}
> This shutdown hook has lower priority, and will be executed after the
> DFSClient is closed.
> h2. Solution
> We changed it by *using Hadoop's ShutdownHookManager and assigning a higher
> priority than HDFS's shutdown hook*. It fixed the problem.
> We'll link a pull request later for review.
> The link is [https://github.com/apache/spark/pull/8530#issuecomment-136236733]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]