This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d0916cb3359 [HUDI-7190] Fix nested columns vectorized read for 
spark33+ legacy formats (#10265)
d0916cb3359 is described below

commit d0916cb3359759212191cdb2e13aa02490b1746c
Author: StreamingFlames <[email protected]>
AuthorDate: Wed Dec 20 13:25:30 2023 +0800

    [HUDI-7190] Fix nested columns vectorized read for spark33+ legacy formats 
(#10265)
    
    * [HUDI-7190] Fix legacy parquet format nested columns vectorized read for 
spark3.3+
    * Fix nested type implicit schema evolution
    * fix legacy format support batch read
    * Add exception messages when vectorized read nested type with type change
---
 .../parquet/LegacyHoodieParquetFileFormat.scala    |   8 +-
 .../hudi/TestAvroSchemaResolutionSupport.scala     | 145 +++++++++++++++++----
 .../apache/spark/sql/hudi/TestInsertTable.scala    |  37 ++++++
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  |   9 +-
 .../Spark33LegacyHoodieParquetFileFormat.scala     |  12 +-
 .../Spark34LegacyHoodieParquetFileFormat.scala     |  19 +--
 .../Spark35LegacyHoodieParquetFileFormat.scala     |  19 +--
 7 files changed, 193 insertions(+), 56 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
index 046640c11c1..d579c9052a4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
@@ -38,12 +38,8 @@ class LegacyHoodieParquetFileFormat extends 
ParquetFileFormat with SparkAdapterS
   override def toString: String = "Hoodie-Parquet"
 
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    if (HoodieSparkUtils.gteqSpark3_4) {
-      val conf = sparkSession.sessionState.conf
-      conf.parquetVectorizedReaderEnabled && 
schema.forall(_.dataType.isInstanceOf[AtomicType])
-    } else {
-      super.supportBatch(sparkSession, schema)
-    }
+    sparkAdapter
+      
.createLegacyHoodieParquetFileFormat(true).get.supportBatch(sparkSession, 
schema)
   }
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 6e22db914d3..61964755519 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.SchemaCompatibilityException
 import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.spark.SparkException
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
@@ -386,15 +387,17 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     // upsert
     upsertData(df2, tempRecordPath, isCow)
 
-    // read out the table
-    val readDf = spark.read.format("hudi")
-      // NOTE: long to int type change is not supported for the custom file 
format and the filegroup reader
-      //       HUDI-7045 and PR#10007 in progress to fix the issue
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-      .load(tempRecordPath)
-    readDf.printSchema()
-    readDf.show(false)
-    readDf.foreach(_ => {})
+    withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
+      // read out the table
+      val readDf = spark.read.format("hudi")
+        // NOTE: long to int type change is not supported for the custom file 
format and the filegroup reader
+        //       HUDI-7045 and PR#10007 in progress to fix the issue
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+        .load(tempRecordPath)
+      readDf.printSchema()
+      readDf.show(false)
+      readDf.foreach(_ => {})
+    }
   }
 
   @ParameterizedTest
@@ -482,15 +485,17 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     // upsert
     upsertData(df2, tempRecordPath, isCow)
 
-    // read out the table
-    val readDf = spark.read.format("hudi")
-      // NOTE: type promotion is not supported for the custom file format and 
the filegroup reader
-      //       HUDI-7045 and PR#10007 in progress to fix the issue
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-      .load(tempRecordPath)
-    readDf.printSchema()
-    readDf.show(false)
-    readDf.foreach(_ => {})
+    withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
+      // read out the table
+      val readDf = spark.read.format("hudi")
+        // NOTE: type promotion is not supported for the custom file format 
and the filegroup reader
+        //       HUDI-7045 and PR#10007 in progress to fix the issue
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+        .load(tempRecordPath)
+      readDf.printSchema()
+      readDf.show(false)
+      readDf.foreach(_ => {})
+    }
   }
 
   @ParameterizedTest
@@ -548,15 +553,17 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     // upsert
     upsertData(df2, tempRecordPath, isCow)
 
-    // read out the table
-    val readDf = spark.read.format("hudi")
-      // NOTE: type promotion is not supported for the custom file format and 
the filegroup reader
-      //       HUDI-7045 and PR#10007 in progress to fix the issue
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-      .load(tempRecordPath)
-    readDf.printSchema()
-    readDf.show(false)
-    readDf.foreach(_ => {})
+    withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
+      // read out the table
+      val readDf = spark.read.format("hudi")
+        // NOTE: type promotion is not supported for the custom file format 
and the filegroup reader
+        //       HUDI-7045 and PR#10007 in progress to fix the issue
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+        .load(tempRecordPath)
+      readDf.printSchema()
+      readDf.show(false)
+      readDf.foreach(_ => {})
+    }
   }
 
   @ParameterizedTest
@@ -828,4 +835,88 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     readDf.show(false)
     readDf.foreach(_ => {})
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testNestedTypeVectorizedReadWithTypeChange(isCow: Boolean): Unit = {
+    // test to change the value type of a MAP in a column of ARRAY< MAP<k,v> > 
type
+    val tempRecordPath = basePath + "/record_tbl/"
+    val arrayMapData = Seq(
+      Row(1, 100, List(Map("2022-12-01" -> 120), Map("2022-12-02" -> 130)), 
"aaa")
+    )
+    val arrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("salesMap", ArrayType(
+        new MapType(StringType, IntegerType, true)))
+      .add("name", StringType)
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData), 
arrayMapSchema)
+    df1.printSchema()
+    df1.show(false)
+
+    // recreate table
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // read out the table, will not throw any exception
+    readTable(tempRecordPath)
+
+    // change value type from integer to long
+    val newArrayMapData = Seq(
+      Row(2, 200, List(Map("2022-12-01" -> 220L), Map("2022-12-02" -> 230L)), 
"bbb")
+    )
+    val newArrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("salesMap", ArrayType(
+        new MapType(StringType, LongType, true)))
+      .add("name", StringType)
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData), 
newArrayMapSchema)
+    df2.printSchema()
+    df2.show(false)
+    // upsert
+    upsertData(df2, tempRecordPath, isCow)
+
+    // after implicit type change, read the table with vectorized read enabled
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      assertThrows(classOf[SparkException]){
+        withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"true") {
+          readTable(tempRecordPath)
+        }
+      }
+    }
+
+    withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
+      readTable(tempRecordPath)
+    }
+  }
+
+
+  private def readTable(path: String): Unit = {
+    // read out the table
+    val readDf = spark.read.format("hudi")
+      // NOTE: type promotion is not supported for the custom file format and 
the filegroup reader
+      //       HUDI-7045 and PR#10007 in progress to fix the issue
+      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+      .load(path)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+    val conf = spark.sessionState.conf
+    val currentValues = pairs.unzip._1.map { k =>
+      if (conf.contains(k)) {
+        Some(conf.getConfString(k))
+      } else None
+    }
+    pairs.foreach { case (k, v) => conf.setConfString(k, v) }
+    try f finally {
+      pairs.unzip._1.zip(currentValues).foreach {
+        case (key, Some(value)) => conf.setConfString(key, value)
+        case (key, None) => conf.unsetConf(key)
+      }
+    }
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b1b7353c2bc..044b6451cdf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -2173,6 +2173,43 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     })
   }
 
+  test("Test vectorized read nested columns for 
LegacyHoodieParquetFileFormat") {
+    withSQLConf(
+      "hoodie.datasource.read.use.new.parquet.file.format" -> "false",
+      "hoodie.file.group.reader.enabled" -> "false",
+      "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true",
+      "spark.sql.parquet.enableVectorizedReader" -> "true") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  attributes map<string, string>,
+             |  price double,
+             |  ts long,
+             |  dt string
+             |) using hudi
+             | tblproperties (primaryKey = 'id')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+                    """.stripMargin)
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
+             | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
+             | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
+                    """.stripMargin)
+        // Check the inserted records with map type attributes
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+      }
+    }
+  }
+
   def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String, 
tmp: File,
                             expectedOperationtype: WriteOperationType,
                             setOptions: List[String] = List.empty) : Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 6ca1a72edcd..6a64c69021c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -544,12 +544,12 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
   test("Test alter column with complex schema") {
     withRecordType()(withTempDir { tmp =>
-      Seq("mor").foreach { tableType =>
+      withSQLConf(s"$SPARK_SQL_INSERT_INTO_OPERATION" -> "upsert",
+        "hoodie.schema.on.read.enable" -> "true",
+        "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
         val tableName = generateTableName
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
         if (HoodieSparkUtils.gteqSpark3_1) {
-          spark.sql("set hoodie.schema.on.read.enable=true")
-          spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
           spark.sql(
             s"""
                |create table $tableName (
@@ -561,7 +561,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
                |) using hudi
                | location '$tablePath'
                | options (
-               |  type = '$tableType',
+               |  type = 'mor',
                |  primaryKey = 'id',
                |  preCombineField = 'ts'
                | )
@@ -628,7 +628,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           )
         }
       }
-      spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
     })
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index 3b53b753b69..3176668dab6 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -50,6 +50,8 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
 import org.apache.spark.util.SerializableConfiguration
 
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
 import java.net.URI
 
 /**
@@ -121,8 +123,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
     val sqlConf = sparkSession.sessionState.conf
     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
     val enableVectorizedReader: Boolean =
-      sqlConf.parquetVectorizedReaderEnabled &&
-        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+      ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
     val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
     val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
     val capacity = sqlConf.parquetVectorizedReaderBatchSize
@@ -243,6 +244,13 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
         implicitTypeChangeInfo
       }
 
+      if (enableVectorizedReader && shouldUseInternalSchema &&
+        !typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
+        throw new IllegalArgumentException(
+          "Nested types with type changes(implicit or explicit) cannot be read 
in vectorized mode. " +
+            "To workaround this issue, set 
spark.sql.parquet.enableVectorizedReader=false.")
+      }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index cd76ce6f3b2..a1cfbb96212 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -47,6 +47,9 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
 import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
 /**
  * This class is an extension of [[ParquetFileFormat]] overriding 
Spark-specific behavior
  * that's not possible to customize in any other way
@@ -59,11 +62,6 @@ import org.apache.spark.util.SerializableConfiguration
  */
 class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
-  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    val conf = sparkSession.sessionState.conf
-    conf.parquetVectorizedReaderEnabled && 
schema.forall(_.dataType.isInstanceOf[AtomicType])
-  }
-
   def supportsColumnar(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
     // Only output columnar if there is WSCG to read it.
@@ -133,9 +131,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
     val resultSchema = StructType(partitionSchema.fields ++ 
requiredSchema.fields)
     val sqlConf = sparkSession.sessionState.conf
     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-    val enableVectorizedReader: Boolean =
-      sqlConf.parquetVectorizedReaderEnabled &&
-        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableVectorizedReader: Boolean = supportBatch(sparkSession, 
resultSchema)
     val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
     val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
     val capacity = sqlConf.parquetVectorizedReaderBatchSize
@@ -259,6 +255,13 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
         implicitTypeChangeInfo
       }
 
+      if (enableVectorizedReader && shouldUseInternalSchema &&
+        !typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
+        throw new IllegalArgumentException(
+          "Nested types with type changes(implicit or explicit) cannot be read 
in vectorized mode. " +
+            "To workaround this issue, set 
spark.sql.parquet.enableVectorizedReader=false.")
+      }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
index dd70aa08b85..b6177b942fc 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
@@ -48,6 +48,9 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
 import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
 /**
  * This class is an extension of [[ParquetFileFormat]] overriding 
Spark-specific behavior
  * that's not possible to customize in any other way
@@ -60,11 +63,6 @@ import org.apache.spark.util.SerializableConfiguration
  */
 class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
-  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    val conf = sparkSession.sessionState.conf
-    conf.parquetVectorizedReaderEnabled && 
schema.forall(_.dataType.isInstanceOf[AtomicType])
-  }
-
   def supportsColumnar(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
     // Only output columnar if there is WSCG to read it.
@@ -134,9 +132,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
     val resultSchema = StructType(partitionSchema.fields ++ 
requiredSchema.fields)
     val sqlConf = sparkSession.sessionState.conf
     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-    val enableVectorizedReader: Boolean =
-      sqlConf.parquetVectorizedReaderEnabled &&
-        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableVectorizedReader: Boolean = supportBatch(sparkSession, 
resultSchema)
     val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
     val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
     val capacity = sqlConf.parquetVectorizedReaderBatchSize
@@ -260,6 +256,13 @@ class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
         implicitTypeChangeInfo
       }
 
+      if (enableVectorizedReader && shouldUseInternalSchema &&
+        !typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
+        throw new IllegalArgumentException(
+          "Nested types with type changes(implicit or explicit) cannot be read 
in vectorized mode. " +
+            "To workaround this issue, set 
spark.sql.parquet.enableVectorizedReader=false.")
+      }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 

Reply via email to