Dave Knoester created SPARK-20525:
-------------------------------------
Summary: ClassCast exception when interpreting UDFs from a String
in spark-shell
Key: SPARK-20525
URL: https://issues.apache.org/jira/browse/SPARK-20525
Project: Spark
Issue Type: Bug
Components: Spark Core, Spark Shell
Affects Versions: 2.1.0
Environment: OS X 10.11.6, spark-2.1.0-bin-hadoop2.7, Scala version
2.11.8 (bundled w/ Spark), Java 1.8.0_121
Reporter: Dave Knoester
Priority: Blocker
I'm trying to interpret a string containing Scala code from inside a Spark
session. Everything is working fine, except for User Defined Function-like
things (UDFs, map, flatMap, etc). This is a blocker for production launch of a
large number of Spark jobs.
I've been able to boil the problem down to a number of spark-shell examples,
shown below. Because it's reproducible in the spark-shell, these related
issues **don't apply**:
https://issues.apache.org/jira/browse/SPARK-9219
https://issues.apache.org/jira/browse/SPARK-18075
https://issues.apache.org/jira/browse/SPARK-19938
http://apache-spark-developers-list.1001551.n3.nabble.com/This-Exception-has-been-really-hard-to-trace-td19362.html
https://community.mapr.com/thread/21488-spark-error-scalacollectionseq-in-instance-of-orgapachesparkrddmappartitionsrdd
https://github.com/scala/bug/issues/9237
Any help is appreciated!
========
Repro:
Run each of the below from a spark-shell.
Preamble:
import scala.tools.nsc.GenericRunnerSettings
import scala.tools.nsc.interpreter.IMain
val settings = new GenericRunnerSettings( println _ )
settings.usejavacp.value = true
val interpreter = new IMain(settings, new java.io.PrintWriter(System.out))
interpreter.bind("spark", spark);
These work:
// works:
interpreter.interpret("val x = 5")
// works:
interpreter.interpret("import spark.implicits._\nval df =
spark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.show")
// works:
val upper: String => String = _.toUpperCase
spark.udf.register("myUpper", upper)
interpreter.interpret("import org.apache.spark.sql.functions._\nimport
spark.implicits._\nval upper: String => String = _.toUpperCase\nval upperUDF =
udf(upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\",
callUDF(\"myUpper\", ($\"value\"))).show")
These do not work:
// doesn't work, fails with seq/RDD serialization error:
interpreter.interpret("import org.apache.spark.sql.functions._\nimport
spark.implicits._\nval upper: String => String = _.toUpperCase\nval upperUDF =
udf(upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\",
upperUDF($\"value\")).show")
// doesn't work, fails with seq/RDD serialization error:
interpreter.interpret("import org.apache.spark.sql.functions._\nimport
spark.implicits._\nval upper: String => String =
_.toUpperCase\nspark.udf.register(\"myUpper\",
upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\",
callUDF(\"myUpper\", ($\"value\"))).show")
The not-working ones fail with this exception:
Caused by: java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]