[ 
https://issues.apache.org/jira/browse/SPARK-5775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14339010#comment-14339010
 ] 

Cheng Lian commented on SPARK-5775:
-----------------------------------

Hey [~avignon], sorry for the delay. I've left comments on the PR page. Thanks 
a lot for working on this!

> GenericRow cannot be cast to SpecificMutableRow when nested data and 
> partitioned table
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-5775
>                 URL: https://issues.apache.org/jira/browse/SPARK-5775
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.2.1
>            Reporter: Ayoub Benali
>            Assignee: Cheng Lian
>            Priority: Blocker
>              Labels: hivecontext, nested, parquet, partition
>
> Using the "LOAD" sql command in Hive context to load parquet files into a 
> partitioned table causes exceptions during query time. 
> The bug requires the table to have a column of *type Array of struct* and to 
> be *partitioned*. 
> The example bellow shows how to reproduce the bug and you can see that if the 
> table is not partitioned the query works fine. 
> {noformat}
> scala> val data1 = """{"data_array":[{"field1":1,"field2":2}]}"""
> scala> val data2 = """{"data_array":[{"field1":3,"field2":4}]}"""
> scala> val jsonRDD = sc.makeRDD(data1 :: data2 :: Nil)
> scala> val schemaRDD = hiveContext.jsonRDD(jsonRDD)
> scala> schemaRDD.printSchema
> root
>  |-- data_array: array (nullable = true)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- field1: integer (nullable = true)
>  |    |    |-- field2: integer (nullable = true)
> scala> hiveContext.sql("create external table if not exists 
> partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) 
> Partitioned by (date STRING) STORED AS PARQUET Location 
> 'hdfs://****/partitioned_table'")
> scala> hiveContext.sql("create external table if not exists 
> none_partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) 
> STORED AS PARQUET Location 'hdfs://****/none_partitioned_table'")
> scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_1")
> scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_2")
> scala> hiveContext.sql("LOAD DATA INPATH 
> 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_1' INTO TABLE 
> partitioned_table PARTITION(date='2015-02-12')")
> scala> hiveContext.sql("LOAD DATA INPATH 
> 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_2' INTO TABLE 
> none_partitioned_table")
> scala> hiveContext.sql("select data.field1 from none_partitioned_table 
> LATERAL VIEW explode(data_array) nestedStuff AS data").collect
> res23: Array[org.apache.spark.sql.Row] = Array([1], [3])
> scala> hiveContext.sql("select data.field1 from partitioned_table LATERAL 
> VIEW explode(data_array) nestedStuff AS data").collect
> 15/02/12 16:21:03 INFO ParseDriver: Parsing command: select data.field1 from 
> partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data
> 15/02/12 16:21:03 INFO ParseDriver: Parse Completed
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(260661) called with 
> curMem=0, maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18 stored as values in 
> memory (estimated size 254.6 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(28615) called with 
> curMem=260661, maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes 
> in memory (estimated size 27.9 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory 
> on *****:51990 (size: 27.9 KB, free: 267.2 MB)
> 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block 
> broadcast_18_piece0
> 15/02/12 16:21:03 INFO SparkContext: Created broadcast 18 from NewHadoopRDD 
> at ParquetTableOperations.scala:119
> 15/02/12 16:21:03 INFO FileInputFormat: Total input paths to process : 3
> 15/02/12 16:21:03 INFO ParquetInputFormat: Total input paths to process : 3
> 15/02/12 16:21:03 INFO FilteringParquetRowInputFormat: Using Task Side 
> Metadata Split Strategy
> 15/02/12 16:21:03 INFO SparkContext: Starting job: collect at 
> SparkPlan.scala:84
> 15/02/12 16:21:03 INFO DAGScheduler: Got job 12 (collect at 
> SparkPlan.scala:84) with 3 output partitions (allowLocal=false)
> 15/02/12 16:21:03 INFO DAGScheduler: Final stage: Stage 13(collect at 
> SparkPlan.scala:84)
> 15/02/12 16:21:03 INFO DAGScheduler: Parents of final stage: List()
> 15/02/12 16:21:03 INFO DAGScheduler: Missing parents: List()
> 15/02/12 16:21:03 INFO DAGScheduler: Submitting Stage 13 (MappedRDD[111] at 
> map at SparkPlan.scala:84), which has no missing parents
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(7632) called with 
> curMem=289276, maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19 stored as values in 
> memory (estimated size 7.5 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(4230) called with 
> curMem=296908, maxMem=280248975
> 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes 
> in memory (estimated size 4.1 KB, free 267.0 MB)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory 
> on *****:51990 (size: 4.1 KB, free: 267.2 MB)
> 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block 
> broadcast_19_piece0
> 15/02/12 16:21:03 INFO SparkContext: Created broadcast 19 from broadcast at 
> DAGScheduler.scala:838
> 15/02/12 16:21:03 INFO DAGScheduler: Submitting 3 missing tasks from Stage 13 
> (MappedRDD[111] at map at SparkPlan.scala:84)
> 15/02/12 16:21:03 INFO TaskSchedulerImpl: Adding task set 13.0 with 3 tasks
> 15/02/12 16:21:03 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID 
> 48, *****, NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:03 INFO TaskSetManager: Starting task 1.0 in stage 13.0 (TID 
> 49, *****, NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:03 INFO TaskSetManager: Starting task 2.0 in stage 13.0 (TID 
> 50, *****, NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory 
> on *****:39729 (size: 4.1 KB, free: 133.6 MB)
> 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory 
> on *****:48213 (size: 4.1 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory 
> on *****:45394 (size: 4.1 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory 
> on *****:39729 (size: 27.9 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory 
> on *****:48213 (size: 27.9 KB, free: 133.6 MB)
> 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory 
> on *****:45394 (size: 27.9 KB, free: 133.6 MB)
> 15/02/12 16:21:04 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 48, 
> *****): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147)
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.1 in stage 13.0 (TID 
> 51, *****, NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.0 in stage 13.0 (TID 49) 
> on executor *****: java.lang.ClassCastException 
> (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 1]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.1 in stage 13.0 (TID 
> 52, *****, NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.1 in stage 13.0 (TID 51) 
> on executor *****: java.lang.ClassCastException 
> (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 2]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.2 in stage 13.0 (TID 
> 53, *****, NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Finished task 2.0 in stage 13.0 (TID 
> 50) in 405 ms on ***** (1/3)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.1 in stage 13.0 (TID 52) 
> on executor *****: java.lang.ClassCastException 
> (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 3]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.2 in stage 13.0 (TID 
> 54, *****, NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.2 in stage 13.0 (TID 53) 
> on executor *****: java.lang.ClassCastException 
> (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 4]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.3 in stage 13.0 (TID 
> 55, *****, NODE_LOCAL, 1640 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.2 in stage 13.0 (TID 54) 
> on executor *****: java.lang.ClassCastException 
> (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 5]
> 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.3 in stage 13.0 (TID 
> 56, *****, NODE_LOCAL, 1641 bytes)
> 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.3 in stage 13.0 (TID 55) 
> on executor *****: java.lang.ClassCastException 
> (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 6]
> 15/02/12 16:21:04 ERROR TaskSetManager: Task 0 in stage 13.0 failed 4 times; 
> aborting job
> 15/02/12 16:21:04 INFO TaskSchedulerImpl: Cancelling stage 13
> 15/02/12 16:21:04 INFO TaskSchedulerImpl: Stage 13 was cancelled
> 15/02/12 16:21:04 INFO DAGScheduler: Job 12 failed: collect at 
> SparkPlan.scala:84, took 0.556942 s
> 15/02/12 16:21:04 WARN TaskSetManager: Lost task 1.3 in stage 13.0 (TID 56, 
> *****): TaskKilled (killed intentionally)
> 15/02/12 16:21:04 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks 
> have all completed, from pool 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 
> (TID 55, *****): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147)
>   at 
> org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to