[
https://issues.apache.org/jira/browse/SPARK-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yin Huai updated SPARK-10301:
-----------------------------
Fix Version/s: 1.5.1
> For struct type, if parquet's global schema has less fields than a file's
> schema, data reading will fail
> --------------------------------------------------------------------------------------------------------
>
> Key: SPARK-10301
> URL: https://issues.apache.org/jira/browse/SPARK-10301
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.5.0
> Reporter: Yin Huai
> Assignee: Cheng Lian
> Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> We hit this issue when reading a complex Parquet dateset without turning on
> schema merging. The data set consists of Parquet files with different but
> compatible schemas. In this way, the schema of the dataset is defined by
> either a summary file or a random physical Parquet file if no summary files
> are available. Apparently, this schema may not containing all fields
> appeared in all physicla files.
> Parquet was designed with schema evolution and column pruning in mind, so it
> should be legal for a user to use a tailored schema to read the dataset to
> save disk IO. For example, say we have a Parquet dataset consisting of two
> physical Parquet files with the following two schemas:
> {noformat}
> message m0 {
> optional group f0 {
> optional int64 f00;
> optional int64 f01;
> }
> }
> message m1 {
> optional group f0 {
> optional int64 f01;
> optional int64 f01;
> optional int64 f02;
> }
> optional double f1;
> }
> {noformat}
> Users should be allowed to read the dataset with the following schema:
> {noformat}
> message m1 {
> optional group f0 {
> optional int64 f01;
> optional int64 f02;
> }
> }
> {noformat}
> so that {{f0.f00}} and {{f1}} are never touched. The above case can be
> expressed by the following {{spark-shell}} snippet:
> {noformat}
> import sqlContext._
> import sqlContext.implicits._
> import org.apache.spark.sql.types.{LongType, StructType}
> val path = "/tmp/spark/parquet"
> range(3).selectExpr("NAMED_STRUCT('f00', id, 'f01', id) AS f0").coalesce(1)
> .write.mode("overwrite").parquet(path)
> range(3).selectExpr("NAMED_STRUCT('f00', id, 'f01', id, 'f02', id) AS f0",
> "CAST(id AS DOUBLE) AS f1").coalesce(1)
> .write.mode("append").parquet(path)
> val tailoredSchema =
> new StructType()
> .add(
> "f0",
> new StructType()
> .add("f01", LongType, nullable = true)
> .add("f02", LongType, nullable = true),
> nullable = true)
> read.schema(tailoredSchema).parquet(path).show()
> {noformat}
> Expected output should be:
> {noformat}
> +--------+
> | f0|
> +--------+
> |[0,null]|
> |[1,null]|
> |[2,null]|
> | [0,0]|
> | [1,1]|
> | [2,2]|
> +--------+
> {noformat}
> However, current 1.5-SNAPSHOT version throws the following exception:
> {noformat}
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
> block -1 in file
> hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> at
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
> at
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
> ... 25 more
> 15/08/30 16:42:59 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times;
> aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0
> (TID 2, localhost): org.apache.parquet.io.ParquetDecodingException: Can not
> read value at 0 in block -1 in file
> hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> at
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
> at
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
> ... 25 more
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
> at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
> at
> org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1403)
> at
> org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1403)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
> at
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1921)
> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1402)
> at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1332)
> at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1395)
> at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
> at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
> at $iwC$$iwC$$iwC.<init>(<console>:61)
> at $iwC$$iwC.<init>(<console>:63)
> at $iwC.<init>(<console>:65)
> at <init>(<console>:67)
> at .<init>(<console>:71)
> at .<clinit>(<console>)
> at .<init>(<console>:7)
> at .<clinit>(<console>)
> at $print(<console>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
> at
> org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
> at
> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
> at
> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
> at
> scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809)
> at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
> at
> org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at
> org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
> at 0 in block -1 in file
> hdfs://localhost:9000/tmp/spark/parquet/part-r-00000-56c4604e-c546-4f97-a316-05da8ab1a0bf.gz.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> at
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1844)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
> at
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter.getConverter(CatalystRowConverter.scala:206)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:269)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
> ... 25 more
> {noformat}
> This issue can be generalized a step further. Taking interoperability into
> consideration, we may have a Parquet dataset consisting of physical Parquet
> files sharing compatible logical schema, but created by different Parquet
> libraries. Because of the historical nested type compatibility issue,
> physical Parquet schemas generated by those libraries may be different. It
> would be nice to be able to operate on such datasets.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]