Alexandru Rosianu created SPARK-12675:
-----------------------------------------

             Summary: Spark 1.6.0 executor dies because of ClassCastException 
and causes timeout
                 Key: SPARK-12675
                 URL: https://issues.apache.org/jira/browse/SPARK-12675
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.6.0, 2.0.0
         Environment: 64-bit Linux Ubuntu 15.10, 16GB RAM, 8 cores 3ghz
            Reporter: Alexandru Rosianu
            Priority: Critical


I'm trying to fit a Spark ML pipeline but my executor dies. Here's the script 
which doesn't work (a bit simplified):
{code:title=Script.scala}
    // Prepare data sets
    logInfo("Getting datasets")
    val emoTrainingData = 
sqlc.read.parquet("/tw/sentiment/emo/parsed/data.parquet")
    val trainingData = emoTrainingData

    // Configure the pipeline
    val pipeline = new Pipeline().setStages(Array(
      new FeatureReducer().setInputCol("raw_text").setOutputCol("reduced_text"),
      new StringSanitizer().setInputCol("reduced_text").setOutputCol("text"),
      new Tokenizer().setInputCol("text").setOutputCol("raw_words"),
      new StopWordsRemover().setInputCol("raw_words").setOutputCol("words"),
      new HashingTF().setInputCol("words").setOutputCol("features"),
      new NaiveBayes().setSmoothing(0.5).setFeaturesCol("features"),
      new ColumnDropper().setDropColumns("raw_text", "reduced_text", "text", 
"raw_words", "words", "features")
    ))

    // Fit the pipeline
    logInfo(s"Training model on ${trainingData.count()} rows")
    val model = pipeline.fit(trainingData)
{code}

It executes up to the last line. It prints "Training model on xx rows", then it 
starts fitting, the executor dies, the drivers doesn't receive heartbeats from 
the executor and it times out, then the script exits. It doesn't get past that 
line.

This is the exception that kills the executor:

{code}
    java.io.IOException: java.lang.ClassCastException: cannot assign instance 
of scala.collection.immutable.HashMap$SerializationProxy to field 
org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
scala.collection.immutable.Map in instance of 
org.apache.spark.executor.TaskMetrics
      at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
      at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
      at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
      at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:497)
      at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
      at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
      at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
      at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436)
      at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426)
      at scala.Option.foreach(Option.scala:257)
      at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426)
      at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424)
      at scala.collection.Iterator$class.foreach(Iterator.scala:742)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424)
      at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468)
      at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
      at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
      at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.HashMap$SerializationProxy to field 
org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
scala.collection.immutable.Map in instance of 
org.apache.spark.executor.TaskMetrics
      at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
      at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
      at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
      at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
      at 
org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
      at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
      ... 32 more
{code}

Which, later on, causes a timeout:

{code}
ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat 
timed out after 142918 ms
{code}

I uploaded the INFO-level log file [here|https://infinit.io/_/DMii66J]. The 
DEBUG log is ~500MB.

The build file and dependencies seem to be all right:

{code:title=build.sbt}
    name := "tweeather"

    version := "1.0.0"

    scalaVersion := "2.11.7"

    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "1.6.0",
      "org.apache.spark" %% "spark-mllib" % "1.6.0",
      "org.apache.spark" %% "spark-streaming" % "1.6.0",
      "org.apache.hadoop" % "hadoop-client" % "2.7.1",
      "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
      "org.twitter4j" % "twitter4j-stream" % "4.0.4",
      "org.scalaj" %% "scalaj-http" % "2.0.0",
      "com.jsuereth" %% "scala-arm" % "1.4",
      "edu.ucar" % "grib" % "4.6.3"
    )

    dependencyOverrides ++= Set(
      "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4",
      "org.scala-lang" % "scala-compiler" % scalaVersion.value,
      "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
      "org.scala-lang.modules" %% "scala-xml" % "1.0.4",
      "jline" % "jline" % "2.12.1"
    )

    resolvers ++= Seq(
      "Unidata Releases" at 
"http://artifacts.unidata.ucar.edu/content/repositories/unidata-releases/";
    )
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to