[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425752#comment-15425752
 ] 

Tomer Kaftan commented on SPARK-17110:
--------------------------------------

It is with the default configuration settings, and this worked fine with spark 
1.6.0. Still, it's not unthinkable to me that the default configurations have 
changed since 1.6. 
It should happen with any wait time greater than the spark.locality.wait 
setting (forcing the task to be launched on a different node than the one the 
data was cached on).

The initial workload we came across this in had each task taking ~20 seconds, 
and this error would consistently happen near the end of the stage, as soon as 
tasks began being executed non-locally. That workload also ran fine in spark 
1.6.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> ----------------------------------------------------------------
>
>                 Key: SPARK-17110
>                 URL: https://issues.apache.org/jira/browse/SPARK-17110
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.0
>         Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>            Reporter: Tomer Kaftan
>            Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
>         at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
>         at java.io.ObjectInputStream.<init>(ObjectInputStream.java:302)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
>         at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
>         at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
>         at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
>         at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
>         at scala.Option.map(Option.scala:146)
>         at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
>         at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
>         at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
>         at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>         at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
>     time.sleep(x)
>     return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
>     time.sleep(x)
>     return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to