[
https://issues.apache.org/jira/browse/SPARK-19039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joseph K. Bradley updated SPARK-19039:
--------------------------------------
Description:
When I try this:
* Define UDF
* Apply UDF to get Column
* Use Column in a DataFrame
I can find weird behavior in the spark-shell when using paste mode.
To reproduce this, paste this into the spark-shell:
{code}
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(
("hi", 1),
("there", 2),
("the", 3),
("end", 4)
)).toDF("a", "b")
val myNumbers = Set(1,2,3)
val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
val rowHasMyNumber = tmpUDF($"b")
df.where(rowHasMyNumber).show()
{code}
Stack trace for Spark 2.0 (similar for other versions):
{code}
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
at
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
at
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
at
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
at
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
at
linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value:
UDF(b))
- field (class:
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name:
rowHasMyNumber, type: class org.apache.spark.sql.Column)
- object (class
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw,
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a)
- field (class:
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, name:
$outer, type: class
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw)
- object (class
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1,
<function1>)
- field (class:
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2,
type: interface scala.Function1)
- object (class
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF,
name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
UDF(input[1, int, false]))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class:
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name:
references$1, type: class [Ljava.lang.Object;)
- object (class
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
at
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
at
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
at
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
at
linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
at
linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
{code}
was:
When I try this:
* Define UDF
* Apply UDF to get Column
* Use Column in a DataFrame
I can find weird behavior in the spark-shell when using paste mode.
To reproduce this, paste this into the spark-shell:
{code}
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(
("hi", 1),
("there", 2),
("the", 3),
("end", 4)
)).toDF("a", "b")
val myNumbers = Set(1,2,3)
val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
val rowHasMyNumber = tmpUDF($"b")
df.where(rowHasMyNumber).show()
{code}
> UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL
> ------------------------------------------------------------------
>
> Key: SPARK-19039
> URL: https://issues.apache.org/jira/browse/SPARK-19039
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.3, 2.0.2, 2.1.0
> Reporter: Joseph K. Bradley
>
> When I try this:
> * Define UDF
> * Apply UDF to get Column
> * Use Column in a DataFrame
> I can find weird behavior in the spark-shell when using paste mode.
> To reproduce this, paste this into the spark-shell:
> {code}
> import org.apache.spark.sql.functions._
> val df = spark.createDataFrame(Seq(
> ("hi", 1),
> ("there", 2),
> ("the", 3),
> ("end", 4)
> )).toDF("a", "b")
> val myNumbers = Set(1,2,3)
> val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
> val rowHasMyNumber = tmpUDF($"b")
> df.where(rowHasMyNumber).show()
> {code}
> Stack trace for Spark 2.0 (similar for other versions):
> {code}
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
> at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
> at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
> at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
> at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
> at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
> at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
> at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
> at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
> at
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
> at
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
> at
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
> at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
> at
> linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
> at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
> - object not serializable (class: org.apache.spark.sql.Column, value:
> UDF(b))
> - field (class:
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name:
> rowHasMyNumber, type: class org.apache.spark.sql.Column)
> - object (class
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw,
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a)
> - field (class:
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1,
> name: $outer, type: class
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw)
> - object (class
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1,
> <function1>)
> - field (class:
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2,
> type: interface scala.Function1)
> - object (class
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
> - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF,
> name: f, type: interface scala.Function1)
> - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
> UDF(input[1, int, false]))
> - element of array (index: 1)
> - array (class [Ljava.lang.Object;, size 2)
> - field (class:
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name:
> references$1, type: class [Ljava.lang.Object;)
> - object (class
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
> at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
> at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
> at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
> at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
> at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
> at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
> at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
> at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
> at
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
> at
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
> at
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
> at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
> at
> linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
> at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]