[ 
https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-20896.
-------------------------------
          Resolution: Not A Problem
       Fix Version/s:     (was: 1.6.4)
    Target Version/s:   (was: 1.6.2)

> spark executor get java.lang.ClassCastException when trigger two job at same 
> time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-20896
>                 URL: https://issues.apache.org/jira/browse/SPARK-20896
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.6.1
>            Reporter: poseidon
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {noformat}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>               
> .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>               .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq 
> :+ x)}
> {noformat}
> ---
> and code :
> {noformat}
> var df = sql("select b1,b2 from xxxx.xxxxx")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
>     .setOutputCol(inputCols(i)+"_binary")
>     .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
>         val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
>         val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>           .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>           .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq 
> ++ row2.toSeq)}
>         import org.apache.spark.sql.types.StructType
>         val rowSchema = StructType(saveDFBin.schema.fields ++ 
> dfAppendBin.schema.fields)
>         saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
> sql("alter table xxxx.xxxxxxxx set lifecycle 1")
> {noformat}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: 
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
>         at 
> $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
>         at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at 
> org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> OR 
> {quote}
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
>         at 
> $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> some log from executor:
> {quote}
> 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 
> 598)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
> variable 30
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored 
> as bytes in memory (estimated size 3.0 KB, free 909.7 KB)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
> 30 took 80 ms
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as 
> values in memory (estimated size 5.4 KB, free 915.0 KB)
> 17/05/26 16:39:44 INFO 
> parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: 
> ParquetInputSplit{part: 
> hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet
>  start: 0 end: 922 length: 922 hosts: []}
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast 
> variable 23
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored 
> as bytes in memory (estimated size 25.0 KB, free 940.0 KB)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
> 23 took 6 ms
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as 
> values in memory (estimated size 352.7 KB, free 1292.6 KB)
> 17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz]
> 17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 
> 36.0 (TID 598)
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
> org.apache.spark.mllib.linalg.DenseVector
>       at 
> $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {quote}
> {color:red}
> these two exception never show in pairs, and will never show when you run two 
> code separately
> {color}
> {color:red}
> If you delete the zipWithIndex part in any code ,you can not get the 
> exception. 
> {color}
> Feels like when driver making DAG for zipWithIndex with two RDD at same time 
> , some where should be synchronized or locked. 
> So far as I know , if you just use spark-shell, or spark-submit or 
> spark-thrift , you can not  recreate some situations like this case do. 
> So. Is this a wrong way to use spark-shell like this , multi-user with same 
> context , doing similar job? In other words ,  zeppelin SCOPE mode is not 
> going to be a steady mode , if we can not deal with exceptions like these 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to