[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-09-06 Thread Tomer Kaftan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469295#comment-15469295
 ] 

Tomer Kaftan commented on SPARK-17110:
--

Thanks all who helped out with this!

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Assignee: Josh Rosen
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 slaves set to use only one 
> worker core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-09-03 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15461441#comment-15461441
 ] 

Apache Spark commented on SPARK-17110:
--

User 'JoshRosen' has created a pull request for this issue:
https://github.com/apache/spark/pull/14952

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 slaves set to use only one 
> worker core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-29 Thread Tomer Kaftan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1544#comment-1544
 ] 

Tomer Kaftan commented on SPARK-17110:
--

Hi Miao, all that is needed using the fully default configurations with 2 
slaves is to just set the number of worker cores per slave to 1.

That is, putting the following in /spark/conf/spark-env.sh
{code}
SPARK_WORKER_CORES=1
{code}


More concretely (if you’re looking for exact steps), the way I start up the 
cluster & reproduce this example is as follows.

I use the Spark EC2 scripts from the PR here:
https://github.com/amplab/spark-ec2/pull/46

I launch the cluster on 2 r3.xlarge machines (but any machine should work, 
though you may need to change a later sed command):

{code}
./spark-ec2 -k ec2_ssh_key -i path_to_key_here -s 2 -t r3.xlarge launch 
temp-cluster --spot-price=1.00 --spark-version=2.0.0 --region=us-west-2 
--hadoop-major-version=yarn
{code}

I update the number of worker cores and launch the pyspark shell:
{code}
sed -i'f' 's/SPARK_WORKER_CORES=4/SPARK_WORKER_CORES=1/g' 
/root/spark/conf/spark-env.sh
~/spark-ec2/copy-dir ~/spark/conf/
~/spark/sbin/stop-all.sh
~/spark/sbin/start-all.sh
~/spark/bin/pyspark
{code}

And then I run the example I included at the start:
{code}
x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
x.count()

import time
def waitMap(x):
time.sleep(x)
return x

x.map(waitMap).count()
{code}

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 slaves set to use only one 
> worker core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This 

[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-29 Thread Gen TANG (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445591#comment-15445591
 ] 

Gen TANG commented on SPARK-17110:
--

Hi,

I tried the similar code in python, scala, java and with different 
StorageLevel. Only pyspark threw the error when spark wanted to launch non 
node-locality tasks. As In Python, stored objects will always be serialized 
with the Pickle library. I guess this bug is related to pickle serialization.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 slaves set to use only one 
> worker core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-29 Thread Miao Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445029#comment-15445029
 ] 

Miao Wang commented on SPARK-17110:
---

Can you post a sample configuration? It could be simpler to reproduce this 
issue.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge slaves launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 slaves set to use only one 
> worker core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-27 Thread Tomer Kaftan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440817#comment-15440817
 ] 

Tomer Kaftan commented on SPARK-17110:
--

Hi Miao, 

That setup wouldn't cause this bug to appear (However, I can try the most 
recent master tomorrow anyway just in case someone else has fixed it)

I should have explicitly specified 2 slaves, not 2 nodes (as I suppose that is 
too ambiguous. I've updated the description). It is also critical that each 
slave is set to use *only 1 worker core* (as I did specify above) for this 
example.

This is because this specific example & setup is designed to cause 
(non-deterministically, but with high probability) a situation where one of the 
pyspark workers reads data non-locally, which is what I have observed to cause 
this error consistently.

To provide a mental model of how this example & code snippet forces this 
situation:
1. The workers initially cache the data, forcing it to be stored in memory 
locally. Worker A contains the large number (1000), Worker B contains only 
small numbers (1).
2. The two workers each process their local numbers one at a time.
3. Once Worker A hits the large wait, Worker B continues on to process all of 
its local data
4. Because Worker A is taking so long to finish its task (1000 seconds, but it 
can be set much smaller), the spark.locality.wait setting leads Worker B to 
begin processing data that is stored on Worker A
5. Worker B attempts to read non-local data not stored on that node, leading 
the stream corrupted exception to occur. 

The case in which this does not happen is the one run every few times where 
Worker A processes the large number (1000) last, as then there will be no data 
remaining on Worker A to attempt to launch on Worker B.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 slaves set to use only one 
> worker core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> 

[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-26 Thread Miao Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440786#comment-15440786
 ] 

Miao Wang commented on SPARK-17110:
---

I set up a two-node cluster, one master, one worker, 48 cores. 1G memory. 
pyspark run the above code works fine. No exception. It seems that this bug has 
been fix in latest master branch. Can you upgrade and try again?

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-24 Thread Gen TANG (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436120#comment-15436120
 ] 

Gen TANG commented on SPARK-17110:
--

[~radost...@gmail.com], It seems spark scala doesn't have this bug in version 
2.0.0

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-24 Thread Tomer Kaftan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435418#comment-15435418
 ] 

Tomer Kaftan commented on SPARK-17110:
--

[~radost...@gmail.com] Unfortunately we haven't been able to find any for our 
project other than staying with spark 1.6 for the time being.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-22 Thread Jonathan Alvarado (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431139#comment-15431139
 ] 

Jonathan Alvarado commented on SPARK-17110:
---

Is there a workaround for this issue?  I'm affected by it.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-17 Thread Miao Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425925#comment-15425925
 ] 

Miao Wang commented on SPARK-17110:
---

The network layer of Spark must have some change for dealing with long running 
tasks, as you described it should not be cluster network issue. I can try to 
debug it and find what's going on here. 

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-17 Thread Tomer Kaftan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425752#comment-15425752
 ] 

Tomer Kaftan commented on SPARK-17110:
--

It is with the default configuration settings, and this worked fine with spark 
1.6.0. Still, it's not unthinkable to me that the default configurations have 
changed since 1.6. 
It should happen with any wait time greater than the spark.locality.wait 
setting (forcing the task to be launched on a different node than the one the 
data was cached on).

The initial workload we came across this in had each task taking ~20 seconds, 
and this error would consistently happen near the end of the stage, as soon as 
tasks began being executed non-locally. That workload also ran fine in spark 
1.6.

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17110) Pyspark with locality ANY throw java.io.StreamCorruptedException

2016-08-17 Thread Miao Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425595#comment-15425595
 ] 

Miao Wang commented on SPARK-17110:
---

It seems that it is a network I/O timeout, because there is one entry sleeps 
too long and the connection is reset by remote. Is it a configuration problem?

> Pyspark with locality ANY throw java.io.StreamCorruptedException
> 
>
> Key: SPARK-17110
> URL: https://issues.apache.org/jira/browse/SPARK-17110
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
> Environment: Cluster of 2 AWS r3.xlarge nodes launched via ec2 
> scripts, Spark 2.0.0, hadoop: yarn, pyspark shell
>Reporter: Tomer Kaftan
>Priority: Critical
>
> In Pyspark 2.0.0, any task that accesses cached data non-locally throws a 
> StreamCorruptedException like the stacktrace below:
> {noformat}
> WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): 
> java.io.StreamCorruptedException: invalid stream header: 12010A80
> at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
> at java.io.ObjectInputStream.(ObjectInputStream.java:302)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The simplest way I have found to reproduce this is by running the following 
> code in the pyspark shell, on a cluster of 2 nodes set to use only one worker 
> core each:
> {code}
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}
> Or by running the following via spark-submit:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
> x.count()
> import time
> def waitMap(x):
> time.sleep(x)
> return x
> x.map(waitMap).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org