Sounds good -- I added comments to the ticket.

Since SPARK-2521 is scheduled for a 1.1.0 release and we can work around
with spark.speculation, I don't personally see a need for a 1.0.2 backport.

Thanks looking through this issue!


On Thu, Jul 17, 2014 at 2:14 AM, Patrick Wendell <pwend...@gmail.com> wrote:

> Hey Andrew,
>
> I think you are correct and a follow up to SPARK-2521 will end up
> fixing this. The desing of SPARK-2521 automatically broadcasts RDD
> data in tasks and the approach creates a new copy of the RDD and
> associated data for each task. A natural follow-up to that patch is to
> stop handling the jobConf separately (since we will now broadcast all
> referents of the RDD itself) and just have it broadcasted with the
> RDD. I'm not sure if Reynold plans to include this in SPARK-2521 or
> afterwards, but it's likely we'd do that soon.
>
> - Patrick
>
> On Wed, Jul 16, 2014 at 10:24 PM, Andrew Ash <and...@andrewash.com> wrote:
> > Hi Patrick, thanks for taking a look.  I filed as
> > https://issues.apache.org/jira/browse/SPARK-2546
> >
> > Would you recommend I pursue the cloned Configuration object approach now
> > and send in a PR?
> >
> > Reynold's recent announcement of the broadcast RDD object patch may also
> > have implications of the right path forward here.  I'm not sure I fully
> > understand the implications though:
> > https://github.com/apache/spark/pull/1452
> >
> > "Once this is committed, we can also remove the JobConf broadcast in
> > HadoopRDD."
> >
> > Thanks!
> > Andrew
> >
> >
> > On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell <pwend...@gmail.com>
> wrote:
> >
> >> Hey Andrew,
> >>
> >> Cloning the conf this might be a good/simple fix for this particular
> >> problem. It's definitely worth looking into.
> >>
> >> There are a few things we can probably do in Spark to deal with
> >> non-thread-safety inside of the Hadoop FileSystem and Configuration
> >> classes. One thing we can do in general is to add barriers around the
> >> locations where we knowingly access Hadoop FileSystem and
> >> Configuration state from multiple threads (e.g. during our own calls
> >> to getRecordReader in this case). But this will only deal with "writer
> >> writer" conflicts where we had multiple calls mutating the same object
> >> at the same time. It won't deal with "reader writer" conflicts where
> >> some of our initialization code touches state that is needed during
> >> normal execution of other tasks.
> >>
> >> - Patrick
> >>
> >> On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash <and...@andrewash.com>
> wrote:
> >> > Hi Shengzhe,
> >> >
> >> > Even if we did make Configuration threadsafe, it'd take quite some
> time
> >> for
> >> > that to trickle down to a Hadoop release that we could actually rely
> on
> >> > Spark users having installed.  I agree we should consider whether
> making
> >> > Configuration threadsafe is something that Hadoop should do, but for
> the
> >> > short term I think Spark needs to be able to handle the common
> scenario
> >> of
> >> > Configuration being single-threaded.
> >> >
> >> > Thanks!
> >> > Andrew
> >> >
> >> >
> >> > On Tue, Jul 15, 2014 at 2:43 PM, yao <yaosheng...@gmail.com> wrote:
> >> >
> >> >> Good catch Andrew. In addition to your proposed solution, is that
> >> possible
> >> >> to fix Configuration class and make it thread-safe ? I think the fix
> >> should
> >> >> be trivial, just use a ConcurrentHashMap, but I am not sure if we can
> >> push
> >> >> this change upstream (will hadoop guys accept this change ? for
> them, it
> >> >> seems they never expect Configuration object being accessed by
> multiple
> >> >> threads).
> >> >>
> >> >> -Shengzhe
> >> >>
> >> >>
> >> >> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <and...@andrewash.com>
> >> wrote:
> >> >>
> >> >> > Hi Spark devs,
> >> >> >
> >> >> > We discovered a very interesting bug in Spark at work last week in
> >> Spark
> >> >> > 0.9.1 -- that the way Spark uses the Hadoop Configuration object is
> >> prone
> >> >> to
> >> >> > thread safety issues.  I believe it still applies in Spark 1.0.1 as
> >> well.
> >> >> >  Let me explain:
> >> >> >
> >> >> >
> >> >> > *Observations*
> >> >> >
> >> >> >    - Was running a relatively simple job (read from Avro files, do
> a
> >> map,
> >> >> >    do another map, write back to Avro files)
> >> >> >    - 412 of 413 tasks completed, but the last task was hung in
> RUNNING
> >> >> >    state
> >> >> >    - The 412 successful tasks completed in median time 3.4s
> >> >> >    - The last hung task didn't finish even in 20 hours
> >> >> >    - The executor with the hung task was responsible for 100% of
> one
> >> core
> >> >> >    of CPU usage
> >> >> >    - Jstack of the executor attached (relevant thread pasted below)
> >> >> >
> >> >> >
> >> >> > *Diagnosis*
> >> >> >
> >> >> > After doing some code spelunking, we determined the issue was
> >> concurrent
> >> >> > use of a Configuration object for each task on an executor.  In
> Hadoop
> >> >> each
> >> >> > task runs in its own JVM, but in Spark multiple tasks can run in
> the
> >> same
> >> >> > JVM, so the single-threaded access assumptions of the Configuration
> >> >> object
> >> >> > no longer hold in Spark.
> >> >> >
> >> >> > The specific issue is that the AvroRecordReader actually _modifies_
> >> the
> >> >> > JobConf it's given when it's instantiated!  It adds a key for the
> RPC
> >> >> > protocol engine in the process of connecting to the Hadoop
> FileSystem.
> >> >> >  When many tasks start at the same time (like at the start of a
> job),
> >> >> many
> >> >> > tasks are adding this configuration item to the one Configuration
> >> object
> >> >> at
> >> >> > once.  Internally Configuration uses a java.lang.HashMap, which
> isn't
> >> >> > threadsafe... The below post is an excellent explanation of what
> >> happens in
> >> >> > the situation where multiple threads insert into a HashMap at the
> same
> >> >> time.
> >> >> >
> >> >> >
> http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
> >> >> >
> >> >> > The gist is that you have a thread following a cycle of linked list
> >> nodes
> >> >> > indefinitely.  This exactly matches our observations of the 100%
> CPU
> >> core
> >> >> > and also the final location in the stack trace.
> >> >> >
> >> >> > So it seems the way Spark shares a Configuration object between
> task
> >> >> > threads in an executor is incorrect.  We need some way to prevent
> >> >> > concurrent access to a single Configuration object.
> >> >> >
> >> >> >
> >> >> > *Proposed fix*
> >> >> >
> >> >> > We can clone the JobConf object in HadoopRDD.getJobConf() so each
> task
> >> >> > gets its own JobConf object (and thus Configuration object).  The
> >> >> > optimization of broadcasting the Configuration object across the
> >> cluster
> >> >> > can remain, but on the other side I think it needs to be cloned for
> >> each
> >> >> > task to allow for concurrent access.  I'm not sure the performance
> >> >> > implications, but the comments suggest that the Configuration
> object
> >> is
> >> >> > ~10KB so I would expect a clone on the object to be relatively
> speedy.
> >> >> >
> >> >> > Has this been observed before?  Does my suggested fix make sense?
> >>  I'd be
> >> >> > happy to file a Jira ticket and continue discussion there for the
> >> right
> >> >> way
> >> >> > to fix.
> >> >> >
> >> >> >
> >> >> > Thanks!
> >> >> > Andrew
> >> >> >
> >> >> >
> >> >> > P.S.  For others seeing this issue, our temporary workaround is to
> >> enable
> >> >> > spark.speculation, which retries failed (or hung) tasks on other
> >> >> machines.
> >> >> >
> >> >> >
> >> >> >
> >> >> > "Executor task launch worker-6" daemon prio=10
> tid=0x00007f91f01fe000
> >> >> > nid=0x54b1 runnable [0x00007f92d74f1000]
> >> >> >    java.lang.Thread.State: RUNNABLE
> >> >> >     at java.util.HashMap.transfer(HashMap.java:601)
> >> >> >     at java.util.HashMap.resize(HashMap.java:581)
> >> >> >     at java.util.HashMap.addEntry(HashMap.java:879)
> >> >> >     at java.util.HashMap.put(HashMap.java:505)
> >> >> >     at
> >> org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
> >> >> >     at
> >> org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
> >> >> >     at
> >> >> >
> org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
> >> >> >     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
> >> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
> >> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
> >> >> >     at
> >> >> >
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
> >> >> >     at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
> >> >> >     at
> >> >> >
> >> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
> >> >> >     at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
> >> >> >     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
> >> >> >     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
> >> >> >     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
> >> >> >     at
> >> >> >
> >> org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
> >> >> >     at
> >> org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
> >> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
> >> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
> >> >> >     at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >> >> >     at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >> >> >     at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >> >     at
> >> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> >> >> >     at org.apache.spark.scheduler.Task.run(Task.scala:53)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> >> >> >     at java.security.AccessController.doPrivileged(Native Method)
> >> >> >     at javax.security.auth.Subject.doAs(Subject.java:415)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> >> >> >     at
> >> >> >
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> >> >> >     at
> >> >> >
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> >> >     at
> >> >> >
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> >> >     at java.lang.Thread.run(Thread.java:745)
> >> >> >
> >> >> >
> >> >>
> >>
>

Reply via email to