[ 
https://issues.apache.org/jira/browse/SPARK-35700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366123#comment-17366123
 ] 

Saurabh Chawla commented on SPARK-35700:
----------------------------------------

[~hyukjin.kwon]/ [~Qin Yao] / [~cloud_fan] - I was able to reproduce this issue 
in the master branch for the steps given in the SPARK-35762.

 Below are some of the scenario found while debugging this issue.

1) This is issue is reproducible for all the orc data created using the Hive, 
but when the insertion is done using the SPARK, we are not getting this 
exception.

2) We are getting the exception due this, when the file is created using hive  
and while calling the readSchema the 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L63
 |http://example.com/]

In the reader.getSchema we get the varchar datatype.

But on reading the orc file created by the Spark we are getting the string 
datatype.

3) I tried adding the validation for converting varchar to String 
{code:java}
readSchema(file, conf, ignoreCorruptFiles) match {
 case Some(schema) =>
 val orcSchema = CatalystSqlParser.parseDataType(
 schema.toString).asInstanceOf[StructType]
 val varCharExist = orcSchema.fields.exists(
 x => CharVarcharUtils.hasCharVarchar(x.dataType))
 if (varCharExist) {
 Some(CharVarcharUtils.replaceCharVarcharWithStringInSchema(orcSchema))
 } else {
 Some(orcSchema)
 }{code}
After adding this fix , we are converting the varchar to string and query is 
working fine.

 

4) Similar conversion of data type change is needed on the 

`def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean) 
`, this called by inferSchema ,

readOrcSchemasInParallel. 

 

If this approach is fine. Than I can go head and create the PR for the same, 
otherwise if we want to see some other approach we can discuss on this

 

 

> spark.sql.orc.filterPushdown not working with Spark 3.1.1 for tables with 
> varchar data type
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-35700
>                 URL: https://issues.apache.org/jira/browse/SPARK-35700
>             Project: Spark
>          Issue Type: Bug
>          Components: Kubernetes, PySpark, Spark Core
>    Affects Versions: 3.1.1
>         Environment: Spark 3.1.1 on K8S
>            Reporter: Arghya Saha
>            Priority: Major
>
> We are not able to upgrade to Spark 3.1.1 from Spark 2.4.x as the join on 
> varchar column is failing which is unexpected and works on Spark 3.0.0.  We 
> are trying to run it on Spark 3.1.1 (MR 3.2) on K8s 
> Below is my use case:
> Tables are external hive table and files are stored as ORC. We do have 
> varchar column and when we are trying to perform join on varchar column we 
> are getting the exception.
> As I understand Spark 3.1.1 have introduced varchar data type but seems its 
> not well tested with ORC and does not have backward compatibility. I have 
> even tried with below config without luck
> *spark.sql.legacy.charVarcharAsString: "true"*
> We are not getting the error when *spark.sql.orc.filterPushdown=false*
> Below is the code: Here col1 is of type varchar(32) in hive
> {code:java}
> df = spark.sql("select col1, col2 from table1 a inner join table2 on b 
> (a.col1=b.col1 and a.col2 > b.col2 )") 
> df.write.format("orc").option("compression", 
> "zlib").mode("Append").save("<s3_path>")
> {code}
> Below is the error:
>  
> {code:java}
> Job aborted due to stage failure: Task 43 in stage 5.0 failed 4 times, most 
> recent failure: Lost task 43.3 in stage 5.0 (TID 524) (10.219.36.64 executor 
> 5): java.lang.UnsupportedOperationException: DataType: varchar(32)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getPredicateLeafType(OrcFilters.scala:150)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getType$1(OrcFilters.scala:222)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.buildLeafSearchArgument(OrcFilters.scala:266)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFiltersHelper$1(OrcFilters.scala:132)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.$anonfun$convertibleFilters$4(OrcFilters.scala:135)
>       at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
>       at scala.collection.immutable.List.foreach(List.scala:392)
>       at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
>       at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
>       at scala.collection.immutable.List.flatMap(List.scala:355)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFilters(OrcFilters.scala:134)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.createFilter(OrcFilters.scala:73)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4(OrcFileFormat.scala:189)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4$adapted(OrcFileFormat.scala:188)
>       at scala.Option.foreach(Option.scala:407)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$1(OrcFileFormat.scala:188)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
>       at 
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.columnartorow_nextBatch_0$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>       at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
>       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>       at java.base/java.lang.Thread.run(Unknown Source)
> Driver stacktrace:3
> {code}
>  
> I can see there is no mapping of varchar in OrcFilters.scala:150
> [https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala#L142]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to