[ 
https://issues.apache.org/jira/browse/SPARK-31116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-31116:
----------------------------------
    Description: 
After upgrading spark versrion to 3.0.0-SNAPSHOT. Selecting parquet columns got 
exception in case insensitive manner. Even we set spark.sql.caseSensitive to 
false. Reading parquet with case ignored schema (which means columns in parquet 
and catalyst types are same with respect to case insensitive manner)

 

To reproduce error executing follow code cause 
java.lang.IllegalArgumentException

 
{code:java}
import org.apache.spark.sql.types._
val path = "/some/temp/path"

spark
  .range(1L)
  .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS 
StructColumn")
  .write.parquet(path)

val caseInsensitiveSchema = new StructType()
  .add(
    "StructColumn",
    new StructType()
      .add("LowerCase", LongType)
      .add("camelcase", LongType))

spark.read.schema(caseInsensitiveSchema).parquet(path).show(){code}
Then we got following error.


{code:java}
23:57:09.077 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in 
stage 215.0 (TID 366)23:57:09.077 ERROR org.apache.spark.executor.Executor: 
Exception in task 0.0 in stage 215.0 (TID 
366)java.lang.IllegalArgumentException: lowercase does not exist. Available: 
LowerCase, camelcase at 
org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:306)
 at scala.collection.immutable.Map$Map2.getOrElse(Map.scala:147) at 
org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:305) at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:182)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:181)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:351)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:185)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:181)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.<init>(ParquetRecordMaterializer.scala:43)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:130)
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
 at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
 at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:341)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804) at 
org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1229) at 
org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1229) at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2144) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:127) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:460)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:463) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)


{code}
 

I think from 3.0.0, 
`org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter` does 
not have equal number of fields between parquetRequestedSchema and 
catalystRequestedSchema ([https://github.com/apache/spark/pull/22880]). So we 
consider case sensitivity in ParquetRowConverter or some related classes.

  was:
After upgrading spark versrion to 3.0.0-SNAPSHOT. Selecting parquet columns got 
exception in case insensitive manner. Even we set spark.sql.caseSensitive to 
false. Reading parquet with case ignored schema (which means columns in parquet 
and catalyst types are same with respect to case insensitive manner)

 

To reproduce error executing follow code cause 
java.lang.IllegalArgumentException

 
{code:java}
val path = "/some/temp/path"

spark
  .range(1L)
  .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS 
StructColumn")
  .write.parquet(path)

val caseInsensitiveSchema = new StructType()
  .add(
    "StructColumn",
    new StructType()
      .add("LowerCase", LongType)
      .add("camelcase", LongType)

spark.read.schema(caseInsensitiveSchema).parquet(path).show(){code}
Then we got following error.


{code:java}
23:57:09.077 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in 
stage 215.0 (TID 366)23:57:09.077 ERROR org.apache.spark.executor.Executor: 
Exception in task 0.0 in stage 215.0 (TID 
366)java.lang.IllegalArgumentException: lowercase does not exist. Available: 
LowerCase, camelcase at 
org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:306)
 at scala.collection.immutable.Map$Map2.getOrElse(Map.scala:147) at 
org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:305) at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:182)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:181)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:351)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:185)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:181)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.<init>(ParquetRecordMaterializer.scala:43)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:130)
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
 at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
 at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:341)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804) at 
org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1229) at 
org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1229) at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2144) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:127) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:460)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:463) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)


{code}
 

I think from 3.0.0, 
`org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter` does 
not have equal number of fields between parquetRequestedSchema and 
catalystRequestedSchema ([https://github.com/apache/spark/pull/22880]). So we 
consider case sensitivity in ParquetRowConverter or some related classes.


> PrquetRowConverter does not follow case sensitivity
> ---------------------------------------------------
>
>                 Key: SPARK-31116
>                 URL: https://issues.apache.org/jira/browse/SPARK-31116
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Tae-kyeom, Kim
>            Priority: Critical
>
> After upgrading spark versrion to 3.0.0-SNAPSHOT. Selecting parquet columns 
> got exception in case insensitive manner. Even we set spark.sql.caseSensitive 
> to false. Reading parquet with case ignored schema (which means columns in 
> parquet and catalyst types are same with respect to case insensitive manner)
>  
> To reproduce error executing follow code cause 
> java.lang.IllegalArgumentException
>  
> {code:java}
> import org.apache.spark.sql.types._
> val path = "/some/temp/path"
> spark
>   .range(1L)
>   .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS 
> StructColumn")
>   .write.parquet(path)
> val caseInsensitiveSchema = new StructType()
>   .add(
>     "StructColumn",
>     new StructType()
>       .add("LowerCase", LongType)
>       .add("camelcase", LongType))
> spark.read.schema(caseInsensitiveSchema).parquet(path).show(){code}
> Then we got following error.
> {code:java}
> 23:57:09.077 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 
> in stage 215.0 (TID 366)23:57:09.077 ERROR 
> org.apache.spark.executor.Executor: Exception in task 0.0 in stage 215.0 (TID 
> 366)java.lang.IllegalArgumentException: lowercase does not exist. Available: 
> LowerCase, camelcase at 
> org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:306)
>  at scala.collection.immutable.Map$Map2.getOrElse(Map.scala:147) at 
> org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:305) at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:182)
>  at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:181)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:351)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:185)
>  at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:181)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.<init>(ParquetRecordMaterializer.scala:43)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:130)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
>  at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
>  at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:341)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
>  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
> org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1804) at 
> org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1229) at 
> org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1229) at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2144) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:127) at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:460)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:463) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> I think from 3.0.0, 
> `org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter` does 
> not have equal number of fields between parquetRequestedSchema and 
> catalystRequestedSchema ([https://github.com/apache/spark/pull/22880]). So we 
> consider case sensitivity in ParquetRowConverter or some related classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to