[ 
https://issues.apache.org/jira/browse/SPARK-11103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964337#comment-14964337
 ] 

Hyukjin Kwon edited comment on SPARK-11103 at 10/20/15 1:06 AM:
----------------------------------------------------------------

[~lian cheng] This looks clearly an issue and I made three version of patches. 
However, I want to be sure of which would be proper before making a PR.

1. Set {{false}} to {{spark.sql.parquet.filterPushdown}} when using 
{{mergeSchema}}
2. If {{spark.sql.parquet.filterPushdown}} is {{true}}, retrieve all the schema 
of every part-files (and also merged one) and check if each can accept the 
given schema and then, apply the filter only when they all can accept, which I 
think it's a bit over-implemented.
3. If {{spark.sql.parquet.filterPushdown}} is {{true}}, retrieve all the schema 
of every part-files (and also merged one) and apply the filter to each split 
(rather split) that can accept the filter which (I think it's hacky) ends up 
different configurations for each task in a job.

Would you please give me some feedbacks?


was (Author: hyukjin.kwon):
[~lian cheng] This looks clearly an issue and I made three version of patches. 
However, I want to be sure of which would be proper before making a PR.

1. Set {false} to {spark.sql.parquet.filterPushdown} when using {mergeSchema}
2. If {spark.sql.parquet.filterPushdown} is {true}, retrieve all the schema of 
every part-files (and also merged one) and check if each can accept the given 
schema and then, apply the filter only when they all can accept, which I think 
it's a bit over-implemented.
3. If {spark.sql.parquet.filterPushdown} is {true}, retrieve all the schema of 
every part-files (and also merged one) and apply the filter to each split 
(rather split) that can accept the filter which (I think it's hacky) ends up 
different configurations for each task in a job.

Would you please give me some feedbacks?

> Filter applied on Merged Parquet shema with new column fail with 
> (java.lang.IllegalArgumentException: Column [column_name] was not found in 
> schema!)
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-11103
>                 URL: https://issues.apache.org/jira/browse/SPARK-11103
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: Dominic Ricard
>
> When evolving a schema in parquet files, spark properly expose all columns 
> found in the different parquet files but when trying to query the data, it is 
> not possible to apply a filter on a column that is not present in all files.
> To reproduce:
> *SQL:*
> {noformat}
> create table `table1` STORED AS PARQUET LOCATION 
> 'hdfs://<SERVER>:<PORT>/path/to/table/id=1/' as select 1 as `col1`;
> create table `table2` STORED AS PARQUET LOCATION 
> 'hdfs://<SERVER>:<PORT>/path/to/table/id=2/' as select 1 as `col1`, 2 as 
> `col2`;
> create table `table3` USING org.apache.spark.sql.parquet OPTIONS (path 
> "hdfs://<SERVER>:<PORT>/path/to/table");
> select col1 from `table3` where col2 = 2;
> {noformat}
> The last select will output the following Stack Trace:
> {noformat}
> An error occurred when executing the SQL command:
> select col1 from `table3` where col2 = 2
> [Simba][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 
> 0, SQL state: TStatus(statusCode:ERROR_STATUS, 
> infoMessages:[*org.apache.hive.service.cli.HiveSQLException:org.apache.spark.SparkException:
>  Job aborted due to stage failure: Task 0 in stage 7212.0 failed 4 times, 
> most recent failure: Lost task 0.3 in stage 7212.0 (TID 138449, 
> 208.92.52.88): java.lang.IllegalArgumentException: Column [col2] was not 
> found in schema!
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:94)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$Eq.accept(Operators.java:180)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>       at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:160)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:155)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace::26:25, 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:runInternal:SparkExecuteStatementOperation.scala:259,
>  
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:run:SparkExecuteStatementOperation.scala:144,
>  
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:388,
>  
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:HiveSessionImpl.java:369,
>  sun.reflect.GeneratedMethodAccessor134:invoke::-1, 
> sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43,
>  java.lang.reflect.Method:invoke:Method.java:497, 
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78,
>  
> org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36,
>  
> org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63,
>  java.security.AccessController:doPrivileged:AccessController.java:-2, 
> javax.security.auth.Subject:doAs:Subject.java:422, 
> org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1628,
>  
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59,
>  com.sun.proxy.$Proxy25:executeStatement::-1, 
> org.apache.hive.service.cli.CLIService:executeStatement:CLIService.java:261, 
> org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:486,
>  
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1313,
>  
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1298,
>  org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, 
> org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, 
> org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56,
>  
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:285,
>  
> java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142,
>  
> java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617,
>  java.lang.Thread:run:Thread.java:745], errorCode:0, 
> errorMessage:org.apache.spark.SparkException: Job aborted due to stage 
> failure: Task 0 in stage 7212.0 failed 4 times, most recent failure: Lost 
> task 0.3 in stage 7212.0 (TID 138449, 208.92.52.88): 
> java.lang.IllegalArgumentException: Column [col2] was not found in schema!
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:94)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
>       at 
> org.apache.parquet.filter2.predicate.Operators$Eq.accept(Operators.java:180)
>       at 
> org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
>       at 
> org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
>       at 
> org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:160)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:155)
>       at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:), Query: select col1 from `table3` where col2 = 2. [SQL 
> State=HY000, DB Errorcode=500051]
> Execution time: 0.44s
> 1 statement failed.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to