[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403098#comment-16403098
 ] 

Kevin Conaway commented on SPARK-16087:
---------------------------------------

[~srowen] I finally got around to digging in to this after two of my co-workers 
ran in to the issue.

I believe that this issue is isolated to the _LocalBackend_ scheduler.  It 
[hardcodes|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala#L56]
 the executor hostname to _localhost_ which is then used in 
[reviveOffers|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala#L83]

The count action on the unioned RDD gets split in two to types of tasks.  One 
group contains tasks to compute the count of the rdd2.  The other group 
contains the cached results of the count from rdd1.  Those tasks are 
_ResultTasks_ and the task [preferred 
locations|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L303]
 are generated from the hadoop input split.  In my case, the input split 
location host was _127.0.0.1_ as that what is reported from the name node.

When this task gets added as a [pending 
task|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L207]
 in the _TaskSetManager_, it is added to the _pendingTasksForHost_ map which 
maps the host name -> list of tasks.  So here, the key of the map would be 
_127.0.0.1_ and the list of tasks would be all of the pending tasks for the 
host.

On the task scheduling side, _reviveOffers_ is called on the _LocalBackend._ 
This passes in the list of _WorkerOffers_ (with the executor host hardcoded to 
_localhost_) to 
[TaskSchedulerImpl#resourceOffers|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L317]
 where for each task set, it calculates the available locality levels for that 
task set.  In this case, _NODE_LOCAL_ is added because the there is an executor 
alive on _localhost_ and there are tasks pending for _localhost_ (as well as on 
_127.0.0.1)._  For each available locality level, the call stack wends down to 
[TaskSetManager#resourceOffer|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L432]
 with the host and locality.  Even though the _maxLocality_ may be _ANY_, 
[TaskSetManager#getAllowedLocalityLevel|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L542]
 will force it _NODE_LOCAL_ because it sees that there are tasks waiting in the 
_pendingTasksForHost_ map.

_TaskSetManager#resourceOffer_ then calls _dequeueTask_ with the host/locality 
set to _localhost/NODE_LOCAL_.  In dequeueTask, it tries to [find pending 
tasks|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L376]
 for _localhost_ but there aren't any since they are actually keyed under 
_127.0.0.1_.  And thus, nothing happens.

The _localhost_ task location for the second group of tasks comes from the 
[BlockManager|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L187]
 which gets the hostname from the 
[NettyBlockTransferService|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L110].
  The _NettyBlockTransferService_ calls _Utils.localHostName_ to determine the 
current host.  This is normally resolved to the the IP of whatever _localhost_ 
is, unless something calls _Utils.setCustomHostname_.  It won't surprise you to 
learn that 
[Executor|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/executor/Executor.scala#L71]
 constructor body does indeed call this method with the hostname passed to it.  
As mentioned above, when using the LocalBackend, the executor hostname is [hard 
coded to 
localhost|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala#L59]

Thats whats happening in 1.6.3. 

As of 2.3.0, SPARK-14437 changed the 
[NettyTransferService|https://github.com/apache/spark/blob/v2.3.0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L49]
 to use the [canonical ip address of 
localhost|https://github.com/apache/spark/blob/v2.3.0/core/src/main/scala/org/apache/spark/internal/config/package.scala#L282].
  This binding happens before the Executor overwrites the host name.  
Therefore, the tasks in the task set will get a different ip address other than 
_localhost_ and _TaskSetManager#computeValidLocalityLevels_ will not add 
_NODE_LOCAL_ to the list of valid locality levels because none of the pending 
task locations match _localhost_.

However, its trivial to force the issue by setting 
_spark.driver.host=localhost_ because the underlying problem, as described 
above still exists.  This could be one refactor away from occurring again.

Thoughts on this?  I'm not sure of the best way forward.  My initial thought 
was to modify _LocalBackend/LocalSchedulerBackend_ to add _WorkerOffer_s for 
all possible bindings of _localhost_ but I don't know if thats the correct path

[~kayousterhout], any thoughts?  I see that you do a lot of work on the 
scheduler

> Spark Hangs When Using Union With Persisted Hadoop RDD
> ------------------------------------------------------
>
>                 Key: SPARK-16087
>                 URL: https://issues.apache.org/jira/browse/SPARK-16087
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.4.1, 1.6.1, 2.0.1
>            Reporter: Kevin Conaway
>            Priority: Critical
>         Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-00000, part-00001, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
>     public static void main(String [] args) throws Exception {
>         JavaSparkContext sc = new JavaSparkContext(
>             new SparkConf()
>                 .set("spark.serializer", KryoSerializer.class.getName())
>                 .set("spark.master", "local[*]")
>                 .setAppName(SparkBug.class.getName())
>         );
>         JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile(
>            "hdfs://localhost:9000/part-00000",
>             LongWritable.class,
>             BytesWritable.class
>         ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>, 
> LongWritable, BytesWritable>() {
>             @Override
>             public Tuple2<LongWritable, BytesWritable> 
> call(Tuple2<LongWritable, BytesWritable> tuple) throws Exception {
>                 return new Tuple2<>(
>                     new LongWritable(tuple._1.get()),
>                     new BytesWritable(tuple._2.copyBytes())
>                 );
>             }
>         }).persist(
>             StorageLevel.MEMORY_ONLY()
>         );
>         System.out.println("Before union: " + rdd1.count());
>         JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile(
>             "hdfs://localhost:9000/part-00001",
>             LongWritable.class,
>             BytesWritable.class
>         );
>         JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2);
>         System.out.println("After union: " + joined.count());
>     }
> }
> {code}
> You'll need to upload the attached part-00000 and part-00001 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to