[ https://issues.apache.org/jira/browse/SPARK-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728807#comment-14728807 ]
Apache Spark commented on SPARK-10301: -------------------------------------- User 'liancheng' has created a pull request for this issue: https://github.com/apache/spark/pull/8583 > For struct type, if parquet's global schema has less fields than a file's > schema, data reading will fail > -------------------------------------------------------------------------------------------------------- > > Key: SPARK-10301 > URL: https://issues.apache.org/jira/browse/SPARK-10301 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.0 > Reporter: Yin Huai > Assignee: Cheng Lian > Priority: Critical > Fix For: 1.6.0 > > > We hit this issue when reading a complex Parquet dateset without turning on > schema merging. The data set consists of Parquet files with different but > compatible schemas. In this way, the schema of the dataset is defined by > either a summary file or a random physical Parquet file if no summary files > are available. Apparently, this schema may not containing all fields > appeared in all physicla files. > Parquet was designed with schema evolution and column pruning in mind, so it > should be legal for a user to use a tailored schema to read the dataset to > save disk IO. For example, say we have a Parquet dataset consisting of two > physical Parquet files with the following two schemas: > {noformat} > message m0 { > optional group f0 { > optional int64 f00; > optional int64 f01; > } > } > message m1 { > optional group f0 { > optional int64 f01; > optional int64 f01; > optional int64 f02; > } > optional double f1; > } > {noformat} > Users should be allowed to read the dataset with the following schema: > {noformat} > message m1 { > optional group f0 { > optional int64 f01; > optional int64 f02; > } > } > {noformat} > so that {{f0.f00}} and {{f1}} are never touched. The above case can be > expressed by the following {{spark-shell}} snippet: > {noformat} > import sqlContext._ > import sqlContext.implicits._ > import org.apache.spark.sql.types.{LongType, StructType} > val path = "/tmp/spark/parquet" > range(3).selectExpr("NAMED_STRUCT('f00', id, 'f01', id) AS f0").coalesce(1) > .write.mode("overwrite").parquet(path) > range(3).selectExpr("NAMED_STRUCT('f00', id, 'f01', id, 'f02', id) AS f0", > "CAST(id AS DOUBLE) AS f1").coalesce(1) > .write.mode("append").parquet(path) > val tailoredSchema = > new StructType() > .add( > "f0", > new StructType() > .add("f01", LongType, nullable = true) > .add("f02", LongType, nullable = true), > nullable = true) > read.schema(tailoredSchema).parquet(path).show() > {noformat} > Expected output should be: > {noformat} > +--------+ > | f0| > +--------+ > |[0,null]| > |[1,null]| > |[2,null]| > | [0,0]| > | [1,1]| > | [2,2]| > +--------+ > {noformat} > However, current 1.5-SNAPSHOT version throws the following exception: > {noformat} > org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in > block -1 in file > hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 > at > org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206) > at > org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99) > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208) > ... 25 more > 15/08/30 16:42:59 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; > aborting job > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 > (TID 2, localhost): org.apache.parquet.io.ParquetDecodingException: Can not > read value at 0 in block -1 in file > hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 > at > org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206) > at > org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99) > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208) > ... 25 more > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) > at > org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) > at > org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1403) > at > org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1403) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) > at > org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1921) > at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1402) > at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1332) > at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1395) > at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) > at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402) > at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) > at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55) > at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57) > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:59) > at $iwC$$iwC$$iwC.<init>(<console>:61) > at $iwC$$iwC.<init>(<console>:63) > at $iwC.<init>(<console>:65) > at <init>(<console>:67) > at .<init>(<console>:71) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) > at > org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825) > at > org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) > at > org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) > at > scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) > at > scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) > at > scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809) > at > org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) > at > org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at > org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value > at 0 in block -1 in file > hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 > at > org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206) > at > org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99) > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208) > ... 25 more > {noformat} > This issue can be generalized a step further. Taking interoperability into > consideration, we may have a Parquet dataset consisting of physical Parquet > files sharing compatible logical schema, but created by different Parquet > libraries. Because of the historical nested type compatibility issue, > physical Parquet schemas generated by those libraries may be different. It > would be nice to be able to operate on such datasets. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org