[
https://issues.apache.org/jira/browse/HUDI-6262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo reassigned HUDI-6262:
-------------------------------
Assignee: Ethan Guo
> Hudi-Spark Support Complex Data Types for Parquet Vectorized Reader
> --------------------------------------------------------------------
>
> Key: HUDI-6262
> URL: https://issues.apache.org/jira/browse/HUDI-6262
> Project: Apache Hudi
> Issue Type: Task
> Reporter: Rahil Chertara
> Assignee: Ethan Guo
> Priority: Major
>
> Recently when trying to upgrade to Spark 3.4.0
> [https://github.com/apache/hudi/actions/runs/4985991589/jobs/8926379795?pr=8682]
> and running test
> {code:java}
> org.apache.hudi.TestAvroSchemaResolutionSupport#testArrayOfMapsChangeValueType{code}
> saw the following exception
>
> {code:java}
> java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to
> org.apache.spark.sql.vectorized.ColumnarBatch
> 2023-05-16T01:46:35.0110639Z at
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:600)
> 2023-05-16T01:46:35.0110882Z at
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:589)
> 2023-05-16T01:46:35.0111237Z at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
> Source)
> 2023-05-16T01:46:35.0111621Z at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> 2023-05-16T01:46:35.0111933Z at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 2023-05-16T01:46:35.0112206Z at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
> 2023-05-16T01:46:35.0112432Z at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
> 2023-05-16T01:46:35.0112618Z at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
> 2023-05-16T01:46:35.0112814Z at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
> 2023-05-16T01:46:35.0113021Z at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 2023-05-16T01:46:35.0113220Z at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> 2023-05-16T01:46:35.0113374Z at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> 2023-05-16T01:46:35.0113580Z at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92){code}
>
> Currenlty in our {{Spark32PlusHoodieParquetFileFormat we do this}}
> {code:java}
> val enableVectorizedReader: Boolean =
> sqlConf.parquetVectorizedReaderEnabled &&
> resultSchema.forall(_.dataType.isInstanceOf[AtomicType]){code}
> the issue i think was that our enableVectorizedReader check was only checking
> for atomic type but the {{supportBatch}} was taking directly from spark
> parquet file format suportBatch, which is not the same check for only atomic
> type as spark supports now multiple types for vectorized reader
> [https://github.com/apache/spark/pull/33695/files#diff-1ce36caa3af5c079a8a0190c624ac4c9e95dcb91d42fc433b820023f73ed68ed]
> since spark 3.3.0
>
> We also cant directly do the same logic as spark ParquetFileFormat of calling
> ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
> since i think in hudi inside we only support a limited types for spark
> {code:java}
> SparkInternalSchemaConverter#convertColumnVectorType{code}
> i think we would need to port this spark pr logic inside hudi for a real long
> term support for vectorized reader.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)