[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15440817#comment-15440817
 ] 

Tomer Kaftan commented on SPARK-17110:
--------------------------------------

Hi Miao, 

That setup wouldn't cause this bug to appear (However, I can try the most 
recent master tomorrow anyway just in case someone else has fixed it)

I should have explicitly specified 2 slaves, not 2 nodes (as I suppose that is 
too ambiguous. I've updated the description). It is also critical that each 
slave is set to use *only 1 worker core* (as I did specify above) for this 
example.

This is because this specific example & setup is designed to cause 
(non-deterministically, but with high probability) a situation where one of the 
pyspark workers reads data non-locally, which is what I have observed to cause 
this error consistently.

To provide a mental model of how this example & code snippet forces this 
situation:
1. The workers initially cache the data, forcing it to be stored in memory 
locally. Worker A contains the large number (1000), Worker B contains only 
small numbers (1).
2. The two workers each process their local numbers one at a time.
3. Once Worker A hits the large wait, Worker B continues on to process all of 
its local data
4. Because Worker A is taking so long to finish its task (1000 seconds, but it 
can be set much smaller), the spark.locality.wait setting leads Worker B to 
begin processing data that is stored on Worker A
5. Worker B attempts to read non-local data not stored on that node, leading 
the stream corrupted exception to occur. 

The case in which this does not happen is the one run every few times where 
Worker A processes the large number (1000) last, as then there will be no data 
remaining on Worker A to attempt to launch on Worker B.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> ----------------------------------------------------------------
>
>                 Key: SPARK-17110
>                 URL: https://issues.apache.org/jira/browse/SPARK-17110
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.0
>         Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>            Reporter: Tomer Kaftan
>            Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
>         at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
>         at java.io.ObjectInputStream.<init>(ObjectInputStream.java:302)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
>         at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
>         at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
>         at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
>         at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
>         at scala.Option.map(Option.scala:146)
>         at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
>         at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
>         at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
>         at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>         at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 slaves set to use only one 
> worker core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
>     time.sleep(x)
>     return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
>     time.sleep(x)
>     return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to