[ https://issues.apache.org/jira/browse/SPARK-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148657#comment-14148657 ]
Josh Rosen commented on SPARK-2546: ----------------------------------- A synchronization wrapper (whether written by hand or generated using macros) might introduce an unwanted runtime dependency on the exact compile-time version of Hadoop that we used. For example, say we compile against Hadoop 1.x and run on Hadoop 1.y (where y > x) and the runtime version of JobConf contains methods that were not present in the version that we wrapped at compile-time. What happens in this case? Before we explore this option, I should probably re-visit SPARK-2585 to see if I can understand why the patch seemed to introduce a performance regression, since that approach is Hadoop version agnostic. > Configuration object thread safety issue > ---------------------------------------- > > Key: SPARK-2546 > URL: https://issues.apache.org/jira/browse/SPARK-2546 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 0.9.1 > Reporter: Andrew Ash > Assignee: Josh Rosen > Priority: Critical > > // observed in 0.9.1 but expected to exist in 1.0.1 as well > This ticket is copy-pasted from a thread on the dev@ list: > {quote} > We discovered a very interesting bug in Spark at work last week in Spark > 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to > thread safety issues. I believe it still applies in Spark 1.0.1 as well. > Let me explain: > Observations > - Was running a relatively simple job (read from Avro files, do a map, do > another map, write back to Avro files) > - 412 of 413 tasks completed, but the last task was hung in RUNNING state > - The 412 successful tasks completed in median time 3.4s > - The last hung task didn't finish even in 20 hours > - The executor with the hung task was responsible for 100% of one core of > CPU usage > - Jstack of the executor attached (relevant thread pasted below) > Diagnosis > After doing some code spelunking, we determined the issue was concurrent use > of a Configuration object for each task on an executor. In Hadoop each task > runs in its own JVM, but in Spark multiple tasks can run in the same JVM, so > the single-threaded access assumptions of the Configuration object no longer > hold in Spark. > The specific issue is that the AvroRecordReader actually _modifies_ the > JobConf it's given when it's instantiated! It adds a key for the RPC > protocol engine in the process of connecting to the Hadoop FileSystem. When > many tasks start at the same time (like at the start of a job), many tasks > are adding this configuration item to the one Configuration object at once. > Internally Configuration uses a java.lang.HashMap, which isn't threadsafe… > The below post is an excellent explanation of what happens in the situation > where multiple threads insert into a HashMap at the same time. > http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html > The gist is that you have a thread following a cycle of linked list nodes > indefinitely. This exactly matches our observations of the 100% CPU core and > also the final location in the stack trace. > So it seems the way Spark shares a Configuration object between task threads > in an executor is incorrect. We need some way to prevent concurrent access > to a single Configuration object. > Proposed fix > We can clone the JobConf object in HadoopRDD.getJobConf() so each task gets > its own JobConf object (and thus Configuration object). The optimization of > broadcasting the Configuration object across the cluster can remain, but on > the other side I think it needs to be cloned for each task to allow for > concurrent access. I'm not sure the performance implications, but the > comments suggest that the Configuration object is ~10KB so I would expect a > clone on the object to be relatively speedy. > Has this been observed before? Does my suggested fix make sense? I'd be > happy to file a Jira ticket and continue discussion there for the right way > to fix. > Thanks! > Andrew > P.S. For others seeing this issue, our temporary workaround is to enable > spark.speculation, which retries failed (or hung) tasks on other machines. > {noformat} > "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000 > nid=0x54b1 runnable [0x00007f92d74f1000] > java.lang.Thread.State: RUNNABLE > at java.util.HashMap.transfer(HashMap.java:601) > at java.util.HashMap.resize(HashMap.java:581) > at java.util.HashMap.addEntry(HashMap.java:879) > at java.util.HashMap.put(HashMap.java:505) > at org.apache.hadoop.conf.Configuration.set(Configuration.java:803) > at org.apache.hadoop.conf.Configuration.set(Configuration.java:783) > at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662) > at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193) > at > org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343) > at > org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168) > at > org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403) > at > org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194) > at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37) > at > org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43) > at > org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52) > at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156) > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) > at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org