[ 
https://issues.apache.org/jira/browse/SPARK-16449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15368143#comment-15368143
 ] 

Sean Owen commented on SPARK-16449:
-----------------------------------

Interesting. Looks like something is serializing a Scala Iterator and it 
doesn't work. 

The relevant subset is below. Hm, maybe something in LocalTableScan can be 
rejiggered to avoid this.

{code}
Caused by: java.io.NotSerializableException: scala.collection.Iterator$$anon$11
Serialization stack:
        - object not serializable (class: scala.collection.Iterator$$anon$11, 
value: empty iterator)
        - field (class: scala.collection.Iterator$$anonfun$toStream$1, name: 
$outer, type: interface scala.collection.Iterator)
        - object (class scala.collection.Iterator$$anonfun$toStream$1, 
<function0>)
        - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: 
interface scala.Function0)
        - object (class scala.collection.immutable.Stream$Cons, 
Stream(WrappedArray(3526154, 3526154, 1580402, 3526154, 3526154), 
WrappedArray(5.50388599500189E11, 4.178168090221903, 234846.780654818, 
5.134865351881966, 354.7084951479714), WrappedArray(2.596112361975223E11, 
0.34382335723646484, 118170.68592261613, 3.3833930336063456, 
4.011812510792076), WrappedArray(100002091588, 2.75, 0.85, -1, 292), 
WrappedArray(999995696635, 6.125, 1193544.39, 34, 480)))
        - field (class: scala.collection.immutable.Stream$$anonfun$zip$1, name: 
$outer, type: class scala.collection.immutable.Stream)
        - object (class scala.collection.immutable.Stream$$anonfun$zip$1, 
<function0>)
        - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: 
interface scala.Function0)
        - object (class scala.collection.immutable.Stream$Cons, 
Stream((WrappedArray(3526154, 3526154, 1580402, 3526154, 
3526154),(count,<function1>)), (WrappedArray(5.50388599500189E11, 
4.178168090221903, 234846.780654818, 5.134865351881966, 
354.7084951479714),(mean,<function1>)), (WrappedArray(2.596112361975223E11, 
0.34382335723646484, 118170.68592261613, 3.3833930336063456, 
4.011812510792076),(stddev,<function1>)), (WrappedArray(100002091588, 2.75, 
0.85, -1, 292),(min,<function1>)), (WrappedArray(999995696635, 6.125, 
1193544.39, 34, 480),(max,<function1>))))
        - field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: 
$outer, type: class scala.collection.immutable.Stream)
        - object (class scala.collection.immutable.Stream$$anonfun$map$1, 
<function0>)
        - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: 
interface scala.Function0)
        - object (class scala.collection.immutable.Stream$Cons, 
Stream([count,3526154,3526154,1580402,3526154,3526154], 
[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],
 
[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],
 [min,100002091588,2.75,0.85,-1,292], 
[max,999995696635,6.125,1193544.39,34,480]))
        - field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: 
$outer, type: class scala.collection.immutable.Stream)
        - object (class scala.collection.immutable.Stream$$anonfun$map$1, 
<function0>)
        - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: 
interface scala.Function0)
        - object (class scala.collection.immutable.Stream$Cons, 
Stream([count,3526154,3526154,1580402,3526154,3526154], 
[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],
 
[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],
 [min,100002091588,2.75,0.85,-1,292], 
[max,999995696635,6.125,1193544.39,34,480]))
        - field (class: org.apache.spark.sql.execution.LocalTableScan, name: 
rows, type: interface scala.collection.Seq)
        - object (class org.apache.spark.sql.execution.LocalTableScan, 
LocalTableScan [summary#228,C0#229,C3#230,C4#231,C5#232,C6#233], 
[[count,3526154,3526154,1580402,3526154,3526154],[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],[min,100002091588,2.75,0.85,-1,292],[max,999995696635,6.125,1193544.39,34,480]]
)
        - field (class: org.apache.spark.sql.execution.ConvertToUnsafe, name: 
child, type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.ConvertToUnsafe, 
ConvertToUnsafe
+- LocalTableScan [summary#228,C0#229,C3#230,C4#231,C5#232,C6#233], 
[[count,3526154,3526154,1580402,3526154,3526154],[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],[min,100002091588,2.75,0.85,-1,292],[max,999995696635,6.125,1193544.39,34,480]]
)
        - field (class: 
org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, name: $outer, type: 
class org.apache.spark.sql.execution.ConvertToUnsafe)
        - object (class 
org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, <function1>)
{code}

> unionAll raises "Task not serializable"
> ---------------------------------------
>
>                 Key: SPARK-16449
>                 URL: https://issues.apache.org/jira/browse/SPARK-16449
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.6.1
>         Environment: AWS EMR, Jupyter notebook
>            Reporter: Jeff Levy
>            Priority: Minor
>
> Goal: Take the output from `describe` on a large DataFrame, then use a loop 
> to calculate `skewness` and `kurtosis` from pyspark.sql.functions for each 
> column, build them into a DataFrame of two rows, then use `unionAll` to merge 
> them together.
> Issue: Despite having the same column names, in the same order with the same 
> dtypes, the `unionAll` fails with "Task not serializable".  However, if I 
> build two test rows using dummy data then `unionAll` works fine.  Also, if I 
> collect my results then turn them straight back into DataFrames, `unionAll` 
> succeeds.  
> Step-by-step code and output with comments can be seen here: 
> https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/unionAll%20error.ipynb
> The issue appears to be in the way the loop in code block 6 is building the 
> rows before parallelizing, but the results look no different from the test 
> rows that do work.  I reproduced this on multiple datasets, so downloading 
> the notebook and pointing it to any data of your own should replicate it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to