This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d0916cb3359 [HUDI-7190] Fix nested columns vectorized read for
spark33+ legacy formats (#10265)
d0916cb3359 is described below
commit d0916cb3359759212191cdb2e13aa02490b1746c
Author: StreamingFlames <[email protected]>
AuthorDate: Wed Dec 20 13:25:30 2023 +0800
[HUDI-7190] Fix nested columns vectorized read for spark33+ legacy formats
(#10265)
* [HUDI-7190] Fix legacy parquet format nested columns vectorized read for
spark3.3+
* Fix nested type implicit schema evolution
* fix legacy format support batch read
* Add exception messages when vectorized read nested type with type change
---
.../parquet/LegacyHoodieParquetFileFormat.scala | 8 +-
.../hudi/TestAvroSchemaResolutionSupport.scala | 145 +++++++++++++++++----
.../apache/spark/sql/hudi/TestInsertTable.scala | 37 ++++++
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 9 +-
.../Spark33LegacyHoodieParquetFileFormat.scala | 12 +-
.../Spark34LegacyHoodieParquetFileFormat.scala | 19 +--
.../Spark35LegacyHoodieParquetFileFormat.scala | 19 +--
7 files changed, 193 insertions(+), 56 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
index 046640c11c1..d579c9052a4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
@@ -38,12 +38,8 @@ class LegacyHoodieParquetFileFormat extends
ParquetFileFormat with SparkAdapterS
override def toString: String = "Hoodie-Parquet"
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- if (HoodieSparkUtils.gteqSpark3_4) {
- val conf = sparkSession.sessionState.conf
- conf.parquetVectorizedReaderEnabled &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
- } else {
- super.supportBatch(sparkSession, schema)
- }
+ sparkAdapter
+
.createLegacyHoodieParquetFileFormat(true).get.supportBatch(sparkSession,
schema)
}
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 6e22db914d3..61964755519 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.spark.SparkException
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
@@ -386,15 +387,17 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// upsert
upsertData(df2, tempRecordPath, isCow)
- // read out the table
- val readDf = spark.read.format("hudi")
- // NOTE: long to int type change is not supported for the custom file
format and the filegroup reader
- // HUDI-7045 and PR#10007 in progress to fix the issue
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .load(tempRecordPath)
- readDf.printSchema()
- readDf.show(false)
- readDf.foreach(_ => {})
+ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
+ // read out the table
+ val readDf = spark.read.format("hudi")
+ // NOTE: long to int type change is not supported for the custom file
format and the filegroup reader
+ // HUDI-7045 and PR#10007 in progress to fix the issue
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+ .load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
}
@ParameterizedTest
@@ -482,15 +485,17 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// upsert
upsertData(df2, tempRecordPath, isCow)
- // read out the table
- val readDf = spark.read.format("hudi")
- // NOTE: type promotion is not supported for the custom file format and
the filegroup reader
- // HUDI-7045 and PR#10007 in progress to fix the issue
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .load(tempRecordPath)
- readDf.printSchema()
- readDf.show(false)
- readDf.foreach(_ => {})
+ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
+ // read out the table
+ val readDf = spark.read.format("hudi")
+ // NOTE: type promotion is not supported for the custom file format
and the filegroup reader
+ // HUDI-7045 and PR#10007 in progress to fix the issue
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+ .load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
}
@ParameterizedTest
@@ -548,15 +553,17 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// upsert
upsertData(df2, tempRecordPath, isCow)
- // read out the table
- val readDf = spark.read.format("hudi")
- // NOTE: type promotion is not supported for the custom file format and
the filegroup reader
- // HUDI-7045 and PR#10007 in progress to fix the issue
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .load(tempRecordPath)
- readDf.printSchema()
- readDf.show(false)
- readDf.foreach(_ => {})
+ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
+ // read out the table
+ val readDf = spark.read.format("hudi")
+ // NOTE: type promotion is not supported for the custom file format
and the filegroup reader
+ // HUDI-7045 and PR#10007 in progress to fix the issue
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+ .load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
}
@ParameterizedTest
@@ -828,4 +835,88 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
readDf.show(false)
readDf.foreach(_ => {})
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testNestedTypeVectorizedReadWithTypeChange(isCow: Boolean): Unit = {
+ // test to change the value type of a MAP in a column of ARRAY< MAP<k,v> >
type
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayMapData = Seq(
+ Row(1, 100, List(Map("2022-12-01" -> 120), Map("2022-12-02" -> 130)),
"aaa")
+ )
+ val arrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("salesMap", ArrayType(
+ new MapType(StringType, IntegerType, true)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData),
arrayMapSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // read out the table, will not throw any exception
+ readTable(tempRecordPath)
+
+ // change value type from integer to long
+ val newArrayMapData = Seq(
+ Row(2, 200, List(Map("2022-12-01" -> 220L), Map("2022-12-02" -> 230L)),
"bbb")
+ )
+ val newArrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("salesMap", ArrayType(
+ new MapType(StringType, LongType, true)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData),
newArrayMapSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // after implicit type change, read the table with vectorized read enabled
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ assertThrows(classOf[SparkException]){
+ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"true") {
+ readTable(tempRecordPath)
+ }
+ }
+ }
+
+ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
+ readTable(tempRecordPath)
+ }
+ }
+
+
+ private def readTable(path: String): Unit = {
+ // read out the table
+ val readDf = spark.read.format("hudi")
+ // NOTE: type promotion is not supported for the custom file format and
the filegroup reader
+ // HUDI-7045 and PR#10007 in progress to fix the issue
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+ .load(path)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+ val conf = spark.sessionState.conf
+ val currentValues = pairs.unzip._1.map { k =>
+ if (conf.contains(k)) {
+ Some(conf.getConfString(k))
+ } else None
+ }
+ pairs.foreach { case (k, v) => conf.setConfString(k, v) }
+ try f finally {
+ pairs.unzip._1.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConfString(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b1b7353c2bc..044b6451cdf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -2173,6 +2173,43 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
+ test("Test vectorized read nested columns for
LegacyHoodieParquetFileFormat") {
+ withSQLConf(
+ "hoodie.datasource.read.use.new.parquet.file.format" -> "false",
+ "hoodie.file.group.reader.enabled" -> "false",
+ "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true",
+ "spark.sql.parquet.enableVectorizedReader" -> "true") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | attributes map<string, string>,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (primaryKey = 'id')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
+ | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
+ | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
+ """.stripMargin)
+ // Check the inserted records with map type attributes
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ }
+ }
+ }
+
def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String,
tmp: File,
expectedOperationtype: WriteOperationType,
setOptions: List[String] = List.empty) : Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 6ca1a72edcd..6a64c69021c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -544,12 +544,12 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test alter column with complex schema") {
withRecordType()(withTempDir { tmp =>
- Seq("mor").foreach { tableType =>
+ withSQLConf(s"$SPARK_SQL_INSERT_INTO_OPERATION" -> "upsert",
+ "hoodie.schema.on.read.enable" -> "true",
+ "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql(
s"""
|create table $tableName (
@@ -561,7 +561,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
|) using hudi
| location '$tablePath'
| options (
- | type = '$tableType',
+ | type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
@@ -628,7 +628,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
)
}
}
- spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
})
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index 3b53b753b69..3176668dab6 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -50,6 +50,8 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
import org.apache.spark.util.SerializableConfiguration
+import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
+
import java.net.URI
/**
@@ -121,8 +123,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
- sqlConf.parquetVectorizedReaderEnabled &&
- resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
@@ -243,6 +244,13 @@ class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
implicitTypeChangeInfo
}
+ if (enableVectorizedReader && shouldUseInternalSchema &&
+ !typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
+ throw new IllegalArgumentException(
+ "Nested types with type changes(implicit or explicit) cannot be read
in vectorized mode. " +
+ "To workaround this issue, set
spark.sql.parquet.enableVectorizedReader=false.")
+ }
+
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index cd76ce6f3b2..a1cfbb96212 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -47,6 +47,9 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
+
/**
* This class is an extension of [[ParquetFileFormat]] overriding
Spark-specific behavior
* that's not possible to customize in any other way
@@ -59,11 +62,6 @@ import org.apache.spark.util.SerializableConfiguration
*/
class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
- override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- val conf = sparkSession.sessionState.conf
- conf.parquetVectorizedReaderEnabled &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
- }
-
def supportsColumnar(sparkSession: SparkSession, schema: StructType):
Boolean = {
val conf = sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
@@ -133,9 +131,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
- val enableVectorizedReader: Boolean =
- sqlConf.parquetVectorizedReaderEnabled &&
- resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ val enableVectorizedReader: Boolean = supportBatch(sparkSession,
resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
@@ -259,6 +255,13 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
implicitTypeChangeInfo
}
+ if (enableVectorizedReader && shouldUseInternalSchema &&
+ !typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
+ throw new IllegalArgumentException(
+ "Nested types with type changes(implicit or explicit) cannot be read
in vectorized mode. " +
+ "To workaround this issue, set
spark.sql.parquet.enableVectorizedReader=false.")
+ }
+
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
index dd70aa08b85..b6177b942fc 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
@@ -48,6 +48,9 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
+
/**
* This class is an extension of [[ParquetFileFormat]] overriding
Spark-specific behavior
* that's not possible to customize in any other way
@@ -60,11 +63,6 @@ import org.apache.spark.util.SerializableConfiguration
*/
class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
- override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- val conf = sparkSession.sessionState.conf
- conf.parquetVectorizedReaderEnabled &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
- }
-
def supportsColumnar(sparkSession: SparkSession, schema: StructType):
Boolean = {
val conf = sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
@@ -134,9 +132,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
- val enableVectorizedReader: Boolean =
- sqlConf.parquetVectorizedReaderEnabled &&
- resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ val enableVectorizedReader: Boolean = supportBatch(sparkSession,
resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
@@ -260,6 +256,13 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
implicitTypeChangeInfo
}
+ if (enableVectorizedReader && shouldUseInternalSchema &&
+ !typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
+ throw new IllegalArgumentException(
+ "Nested types with type changes(implicit or explicit) cannot be read
in vectorized mode. " +
+ "To workaround this issue, set
spark.sql.parquet.enableVectorizedReader=false.")
+ }
+
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)