This is an automated email from the ASF dual-hosted git repository.
mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 64b814ea23 [HUDI-5400] Fix read issues when Hudi-FULL schema evolution
is not enabled (#7480)
64b814ea23 is described below
commit 64b814ea237bd1576af3673d04c7bb965218fdef
Author: voonhous <[email protected]>
AuthorDate: Sat Dec 24 15:41:59 2022 +0800
[HUDI-5400] Fix read issues when Hudi-FULL schema evolution is not enabled
(#7480)
---
.../parquet/HoodieParquetFileFormatHelper.scala | 72 ++
.../hudi/TestAvroSchemaResolutionSupport.scala | 794 +++++++++++++++++++++
...Spark24HoodieVectorizedParquetRecordReader.java | 185 +++++
.../parquet/Spark24HoodieParquetFileFormat.scala | 62 +-
.../parquet/Spark31HoodieParquetFileFormat.scala | 12 +-
.../Spark32PlusHoodieParquetFileFormat.scala | 10 +-
6 files changed, 1116 insertions(+), 19 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
new file mode 100644
index 0000000000..ce1a719cb9
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField,
StructType}
+
+object HoodieParquetFileFormatHelper {
+
+ def buildImplicitSchemaChangeInfo(hadoopConf: Configuration,
+ parquetFileMetaData: FileMetaData,
+ requiredSchema: StructType):
(java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType,
DataType]], StructType) = {
+ val implicitTypeChangeInfo: java.util.Map[Integer,
org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new
java.util.HashMap()
+ val convert = new ParquetToSparkSchemaConverter(hadoopConf)
+ val fileStruct = convert.convert(parquetFileMetaData.getSchema)
+ val fileStructMap = fileStruct.fields.map(f => (f.name, f.dataType)).toMap
+ val sparkRequestStructFields = requiredSchema.map(f => {
+ val requiredType = f.dataType
+ if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType,
fileStructMap(f.name))) {
+ implicitTypeChangeInfo.put(new
Integer(requiredSchema.fieldIndex(f.name)),
org.apache.hudi.common.util.collection.Pair.of(requiredType,
fileStructMap(f.name)))
+ StructField(f.name, fileStructMap(f.name), f.nullable)
+ } else {
+ f
+ }
+ })
+ (implicitTypeChangeInfo, StructType(sparkRequestStructFields))
+ }
+
+ def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean =
(requiredType, fileType) match {
+ case (requiredType, fileType) if requiredType == fileType => true
+
+ case (ArrayType(rt, _), ArrayType(ft, _)) =>
+ // Do not care about nullability as schema evolution require fields to
be nullable
+ isDataTypeEqual(rt, ft)
+
+ case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue,
_)) =>
+ // Likewise, do not care about nullability as schema evolution require
fields to be nullable
+ isDataTypeEqual(requiredKey, fileKey) && isDataTypeEqual(requiredValue,
fileValue)
+
+ case (StructType(requiredFields), StructType(fileFields)) =>
+ // Find fields that are in requiredFields and fileFields as they might
not be the same during add column + change column operations
+ val commonFieldNames = requiredFields.map(_.name) intersect
fileFields.map(_.name)
+
+ // Need to match by name instead of StructField as name will stay the
same whilst type may change
+ val fileFilteredFields = fileFields.filter(f =>
commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
+ val requiredFilteredFields = requiredFields.filter(f =>
commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
+
+ // Sorting ensures that the same field names are being compared for type
differences
+ requiredFilteredFields.zip(fileFilteredFields).forall {
+ case (requiredField, fileFilteredField) =>
+ isDataTypeEqual(requiredField.dataType, fileFilteredField.dataType)
+ }
+
+ case _ => false
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
new file mode 100644
index 0000000000..ad476fb38f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -0,0 +1,794 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hudi
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import scala.language.postfixOps
+
+/**
+ * Test cases to validate Hudi's support for writing and reading when evolving
schema implicitly via Avro's Schema Resolution
+ * Note: Test will explicitly write into different partitions to ensure that a
Hudi table will have multiple filegroups with different schemas.
+ */
+class TestAvroSchemaResolutionSupport extends HoodieClientTestBase {
+
+ var spark: SparkSession = _
+ val commonOpts: Map[String, String] = Map(
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_avro_schema_resolution_support",
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name",
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.SimpleKeyGenerator",
+ HoodieMetadataConfig.ENABLE.key -> "false"
+ )
+
+ /**
+ * Setup method running before each test.
+ */
+ @BeforeEach override def setUp(): Unit = {
+ setTableName("hoodie_avro_schema_resolution_support")
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ }
+
+ @AfterEach override def tearDown(): Unit = {
+ cleanupSparkContexts()
+ }
+
+ def castColToX(x: Int, colToCast: String, df: DataFrame): DataFrame = x
match {
+ case 0 => df.withColumn(colToCast, df.col(colToCast).cast("long"))
+ case 1 => df.withColumn(colToCast, df.col(colToCast).cast("float"))
+ case 2 => df.withColumn(colToCast, df.col(colToCast).cast("double"))
+ case 3 => df.withColumn(colToCast, df.col(colToCast).cast("binary"))
+ case 4 => df.withColumn(colToCast, df.col(colToCast).cast("string"))
+ }
+
+ def initialiseTable(df: DataFrame, saveDir: String, isCow: Boolean = true):
Unit = {
+ val opts = if (isCow) {
+ commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"COPY_ON_WRITE")
+ } else {
+ commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"MERGE_ON_READ")
+ }
+
+ df.write.format("hudi")
+ .options(opts)
+ .mode("overwrite")
+ .save(saveDir)
+ }
+
+ def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true): Unit
= {
+ val opts = if (isCow) {
+ commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"COPY_ON_WRITE")
+ } else {
+ commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"MERGE_ON_READ")
+ }
+
+ df.write.format("hudi")
+ .options(opts)
+ .mode("append")
+ .save(saveDir)
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testDataTypePromotions(isCow: Boolean): Unit = {
+ // test to read tables with columns that are promoted via avro schema
resolution
+ val tempRecordPath = basePath + "/record_tbl/"
+ val _spark = spark
+ import _spark.implicits._
+
+ val colToCast = "userId"
+ val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
+ val df2 = Seq((2, 200L, "bbb")).toDF("id", "userid", "name")
+
+ def prepDataFrame(df: DataFrame, colInitType: String): DataFrame = {
+ // convert int to string first before conversion to binary
+ // after which, initialise df with initType
+ if (colInitType == "binary") {
+ val castDf = df.withColumn(colToCast, df.col(colToCast).cast("string"))
+ castDf.withColumn(colToCast, castDf.col(colToCast).cast(colInitType))
+ } else {
+ df.withColumn(colToCast, df.col(colToCast).cast(colInitType))
+ }
+ }
+
+ def doTest(colInitType: String, start: Int, end: Int): Unit = {
+ for (a <- Range(start, end)) {
+ try {
+ Console.println(s"Performing test: $a with initialColType of:
$colInitType")
+
+ // convert int to string first before conversion to binary
+ val initDF = prepDataFrame(df1, colInitType)
+ initDF.printSchema()
+ initDF.show(false)
+
+ // recreate table
+ initialiseTable(initDF, tempRecordPath, isCow)
+
+ // perform avro supported casting
+ var upsertDf = prepDataFrame(df2, colInitType)
+ upsertDf = castColToX(a, colToCast, upsertDf)
+ upsertDf.printSchema()
+ upsertDf.show(false)
+
+ // upsert
+ upsertData(upsertDf, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+
+ assert(true)
+ } catch {
+ case e: Exception => {
+ // e.printStackTrace()
+ // Console.println(s"Test $a failed with error: ${e.getMessage}")
+ assert(false, e)
+ }
+ }
+ }
+ }
+
+ // INT -> [Long, Float, Double, String]
+ doTest("int", 0, 3)
+ // Long -> [Float, Double, String]
+ doTest("long", 1, 3)
+ // Float -> [Double, String]
+ doTest("float", 2, 3)
+ // String -> [Bytes]
+ doTest("string", 3, 4)
+ // Bytes -> [String]
+ doTest("binary", 4, 5)
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testAddNewColumn(isCow: Boolean): Unit = {
+ // test to add a column
+ val tempRecordPath = basePath + "/record_tbl/"
+ val _spark = spark
+ import _spark.implicits._
+
+ val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
+ val df2 = Seq((2, 200, "newCol", "bbb")).toDF("id", "userid", "newcol",
"name")
+
+ // convert int to string first before conversion to binary
+ val initDF = df1
+ initDF.printSchema()
+ initDF.show(false)
+
+ // recreate table
+ initialiseTable(initDF, tempRecordPath, isCow)
+
+ // perform avro supported operation of adding a new column at the end of
the table
+ val upsertDf = df2
+ upsertDf.printSchema()
+ upsertDf.show(false)
+
+ // upsert
+ upsertData(upsertDf, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testDeleteColumn(isCow: Boolean): Unit = {
+ // test to delete a column
+ val tempRecordPath = basePath + "/record_tbl/"
+ val _spark = spark
+ import _spark.implicits._
+
+ val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
+ val df2 = Seq((2, "bbb")).toDF("id", "name")
+
+ // convert int to string first before conversion to binary
+ val initDF = df1
+ initDF.printSchema()
+ initDF.show(false)
+
+ // recreate table
+ initialiseTable(initDF, tempRecordPath, isCow)
+
+ // perform avro supported operation of deleting a column
+ val upsertDf = df2
+ upsertDf.printSchema()
+ upsertDf.show(false)
+
+ // upsert
+ upsertData(upsertDf, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testColumnPositionChange(isCow: Boolean): Unit = {
+ // test to change column positions
+ val tempRecordPath = basePath + "/record_tbl/"
+ val _spark = spark
+ import _spark.implicits._
+
+ val df1 = Seq((1, 100, "col1", "aaa")).toDF("id", "userid", "newcol",
"name")
+ val df2 = Seq((2, "col2", 200, "bbb")).toDF("id", "newcol", "userid",
"name")
+
+ // convert int to string first before conversion to binary
+ val initDF = df1
+ initDF.printSchema()
+ initDF.show(false)
+
+ // recreate table
+ initialiseTable(initDF, tempRecordPath, isCow)
+
+ // perform avro supported operation of deleting a column
+ val upsertDf = df2
+ upsertDf.printSchema()
+ upsertDf.show(false)
+
+ // upsert
+ upsertData(upsertDf, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfStructsAddNewColumn(isCow: Boolean): Unit = {
+ // test to add a field to a STRUCT in a column of ARRAY< STRUCT<..> > type
+
+ // there is a bug on Spark3 that will prevent Array[Map/Struct] schema
evolved tables form being read
+ // bug fix:
https://github.com/apache/spark/commit/32a393395ee43b573ae75afba591b587ca51879b
+ // bug fix is only available Spark >= v3.1.3
+ if (HoodieSparkUtils.isSpark2 || (HoodieSparkUtils.isSpark3 &&
HoodieSparkUtils.gteqSpark3_1_3)) {
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayStructData = Seq(
+ Row(1, 100, List(Row("Java", "XX", 120), Row("Scala", "XA", 300)),
"aaa")
+ )
+ val arrayStructSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("language", ArrayType(new StructType()
+ .add("name", StringType)
+ .add("author", StringType)
+ .add("pages", IntegerType)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),
arrayStructSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // add a column to array of struct
+ val newArrayStructData = Seq(
+ Row(2, 200, List(Row("JavaV2", "XXX", 130, 20), Row("ScalaV2", "XXA",
310, 40)), "bbb")
+ )
+ val newArrayStructSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("language", ArrayType(new StructType()
+ .add("name", StringType)
+ .add("author", StringType)
+ .add("pages", IntegerType)
+ .add("progress", IntegerType)
+ ))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayStructData),
newArrayStructSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfStructsChangeColumnType(isCow: Boolean): Unit = {
+ // test to change the type of a field from a STRUCT in a column of ARRAY<
STRUCT<..> > type
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayStructData = Seq(
+ Row(1, 100, List(Row("Java", "XX", 120), Row("Scala", "XA", 300)), "aaa")
+ )
+ val arrayStructSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("language", ArrayType(new StructType()
+ .add("name", StringType)
+ .add("author", StringType)
+ .add("pages", IntegerType)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),
arrayStructSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // add a column to array of struct
+ val newArrayStructData = Seq(
+ Row(2, 200, List(Row("XXX", "JavaV2", 130L), Row("XXA", "ScalaV2",
310L)), "bbb")
+ )
+ val newArrayStructSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("language", ArrayType(new StructType()
+ .add("author", StringType)
+ .add("name", StringType)
+ .add("pages", LongType)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayStructData),
newArrayStructSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfStructsChangeColumnPosition(isCow: Boolean): Unit = {
+ // test to change the position of a field from a STRUCT in a column of
ARRAY< STRUCT<..> > type
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayStructData = Seq(
+ Row(1, 100, List(Row("Java", "XX", 120), Row("Scala", "XA", 300)), "aaa")
+ )
+ val arrayStructSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("language", ArrayType(new StructType()
+ .add("name", StringType)
+ .add("author", StringType)
+ .add("pages", IntegerType)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),
arrayStructSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // add a column to array of struct
+ val newArrayStructData = Seq(
+ Row(2, 200, List(Row(130, "JavaV2", "XXX"), Row(310, "ScalaV2", "XXA")),
"bbb")
+ )
+ val newArrayStructSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("language", ArrayType(new StructType()
+ .add("pages", IntegerType)
+ .add("name", StringType)
+ .add("author", StringType)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayStructData),
newArrayStructSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfMapsChangeValueType(isCow: Boolean): Unit = {
+ // test to change the value type of a MAP in a column of ARRAY< MAP<k,v> >
type
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayMapData = Seq(
+ Row(1, 100, List(Map("2022-12-01" -> 120), Map("2022-12-02" -> 130)),
"aaa")
+ )
+ val arrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("salesMap", ArrayType(
+ new MapType(StringType, IntegerType, true)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData),
arrayMapSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // change value type from integer to long
+ val newArrayMapData = Seq(
+ Row(2, 200, List(Map("2022-12-01" -> 220L), Map("2022-12-02" -> 230L)),
"bbb")
+ )
+ val newArrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("salesMap", ArrayType(
+ new MapType(StringType, LongType, true)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData),
newArrayMapSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfMapsStructChangeFieldType(isCow: Boolean): Unit = {
+ // test to change a field type of a STRUCT in a column of ARRAY< MAP<
k,STRUCT<..> > > type
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayMapData = Seq(
+ Row(1, 100,
+ List(
+ Map("2022-12-01" -> Row("a1", "b1", 20)),
+ Map("2022-12-02" -> Row("a2", "b2", 30))
+ ),
+ "aaa")
+ )
+ val innerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col2", StringType)
+ .add("col3", IntegerType)
+ val arrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, innerStructSchema, true)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData),
arrayMapSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // change inner struct's type from integer to long
+ val newArrayMapData = Seq(
+ Row(2, 200,
+ List(
+ Map("2022-12-03" -> Row("a3", "b3", 40L)),
+ Map("2022-12-04" -> Row("a4", "b4", 50L))
+ ),
+ "bbb")
+ )
+ val newInnerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col2", StringType)
+ .add("col3", LongType)
+ val newArrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, newInnerStructSchema, true)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData),
newArrayMapSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfMapsStructAddField(isCow: Boolean): Unit = {
+ // test to add a field to a STRUCT in a column of ARRAY< MAP< k,STRUCT<..>
> > type
+
+ // there is a bug on Spark3 that will prevent Array[Map/Struct] schema
evolved tables form being read
+ // bug fix:
https://github.com/apache/spark/commit/32a393395ee43b573ae75afba591b587ca51879b
+ // bug fix is only available Spark >= v3.1.3
+ if (HoodieSparkUtils.isSpark2 || (HoodieSparkUtils.isSpark3 &&
HoodieSparkUtils.gteqSpark3_1_3)) {
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayMapData = Seq(
+ Row(1, 100,
+ List(
+ Map("2022-12-01" -> Row("a1", "b1", 20)),
+ Map("2022-12-02" -> Row("a2", "b2", 30))
+ ),
+ "aaa")
+ )
+ val innerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col2", StringType)
+ .add("col3", IntegerType)
+ val arrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, innerStructSchema, true)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData),
arrayMapSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // add a new column
+ val newArrayMapData = Seq(
+ Row(2, 200,
+ List(
+ Map("2022-12-01" -> Row("a3", "b3", 20, 40)),
+ Map("2022-12-02" -> Row("a4", "b4", 30, 40))
+ ),
+ "bbb")
+ )
+ val newInnerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col2", StringType)
+ .add("col3", IntegerType)
+ .add("col4", IntegerType)
+ val newArrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, newInnerStructSchema, true)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData),
newArrayMapSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfMapsStructChangeFieldPosition(isCow: Boolean): Unit = {
+ // test to change the position of fields of a STRUCT in a column of ARRAY<
MAP< k,STRUCT<..> > > type
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayMapData = Seq(
+ Row(1, 100,
+ List(
+ Map("2022-12-01" -> Row("a1", "b1", 20)),
+ Map("2022-12-02" -> Row("a2", "b2", 30))
+ ),
+ "aaa")
+ )
+ val innerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col2", StringType)
+ .add("col3", IntegerType)
+ val arrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, innerStructSchema, true)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData),
arrayMapSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // change column position
+ val newArrayMapData = Seq(
+ Row(2, 200,
+ List(
+ Map("2022-12-01" -> Row("a3", 40, "b3")),
+ Map("2022-12-02" -> Row("a4", 50, "b4"))
+ ),
+ "bbb")
+ )
+ val newInnerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col3", IntegerType)
+ .add("col2", StringType)
+ val newArrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, newInnerStructSchema, true)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData),
newArrayMapSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testArrayOfMapsStructDeleteField(isCow: Boolean): Unit = {
+ // test to delete a field of a STRUCT in a column of ARRAY< MAP<
k,STRUCT<..> > > type
+
+ val tempRecordPath = basePath + "/record_tbl/"
+ val arrayMapData = Seq(
+ Row(1, 100,
+ List(
+ Map("2022-12-01" -> Row("a1", "b1", 20)),
+ Map("2022-12-02" -> Row("a2", "b2", 30))
+ ),
+ "aaa")
+ )
+ val innerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col2", StringType)
+ .add("col3", IntegerType)
+ val arrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, innerStructSchema, true)))
+ .add("name", StringType)
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData),
arrayMapSchema)
+ df1.printSchema()
+ df1.show(false)
+
+ // recreate table
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // change column position
+ val newArrayMapData = Seq(
+ Row(2, 200,
+ List(
+ Map("2022-12-01" -> Row("a3", 40)),
+ Map("2022-12-02" -> Row("a4", 50))
+ ),
+ "bbb")
+ )
+ val newInnerStructSchema = new StructType()
+ .add("col1", StringType)
+ .add("col3", IntegerType)
+ val newArrayMapSchema = new StructType()
+ .add("id", IntegerType)
+ .add("userid", IntegerType)
+ .add("structcol", ArrayType(
+ new MapType(StringType, newInnerStructSchema, true)))
+ .add("name", StringType)
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData),
newArrayMapSchema)
+ df2.printSchema()
+ df2.show(false)
+ // upsert
+ upsertData(df2, tempRecordPath, isCow)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testComplexOperationsOnTable(isCow: Boolean): Unit = {
+ // test a series of changes on a Hudi table
+
+ var defaultPartitionIdx = 0
+
+ def newPartition: String = {
+ defaultPartitionIdx = defaultPartitionIdx + 1
+ "aaa" + defaultPartitionIdx
+ }
+
+ val tempRecordPath = basePath + "/record_tbl/"
+ val _spark = spark
+ import _spark.implicits._
+
+ // 1. Initialise table
+ val df1 = Seq((1, 100, newPartition)).toDF("id", "userid", "name")
+ df1.printSchema()
+ df1.show(false)
+ initialiseTable(df1, tempRecordPath, isCow)
+
+ // 2. Promote INT type to LONG into a different partition
+ val df2 = Seq((2, 200L, newPartition)).toDF("id", "userid", "name")
+ df2.printSchema()
+ df2.show(false)
+ upsertData(df2, tempRecordPath, isCow)
+
+ // 3. Promote LONG to FLOAT
+ var df3 = Seq((3, 300, newPartition)).toDF("id", "userid", "name")
+ df3 = df3.withColumn("userid", df3.col("userid").cast("float"))
+ df3.printSchema()
+ df3.show(false)
+ upsertData(df3, tempRecordPath)
+
+ // 4. Promote FLOAT to DOUBLE
+ var df4 = Seq((4, 400, newPartition)).toDF("id", "userid", "name")
+ df4 = df4.withColumn("userid", df4.col("userid").cast("float"))
+ df4.printSchema()
+ df4.show(false)
+ upsertData(df4, tempRecordPath)
+
+ // 5. Add two new column
+ var df5 = Seq((5, 500, "newcol1", "newcol2", newPartition)).toDF("id",
"userid", "newcol1", "newcol2", "name")
+ df5 = df5.withColumn("userid", df5.col("userid").cast("float"))
+ df5.printSchema()
+ df5.show(false)
+ upsertData(df5, tempRecordPath)
+
+ // 6. Delete a column
+ var df6 = Seq((6, 600, "newcol1", newPartition)).toDF("id", "userid",
"newcol1", "name")
+ df6 = df6.withColumn("userid", df6.col("userid").cast("float"))
+ df6.printSchema()
+ df6.show(false)
+ upsertData(df6, tempRecordPath)
+
+ // 7. Rearrange column position
+ var df7 = Seq((7, "newcol1", 700, newPartition)).toDF("id", "newcol1",
"userid", "name")
+ df7 = df7.withColumn("userid", df7.col("userid").cast("float"))
+ df7.printSchema()
+ df7.show(false)
+ upsertData(df7, tempRecordPath)
+
+ // read out the table
+ val readDf = spark.read.format("hudi").load(tempRecordPath)
+ readDf.printSchema()
+ readDf.show(false)
+ readDf.foreach(_ => {})
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieVectorizedParquetRecordReader.java
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieVectorizedParquetRecordReader.java
new file mode 100644
index 0000000000..462a993580
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieVectorizedParquetRecordReader.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class Spark24HoodieVectorizedParquetRecordReader extends
VectorizedParquetRecordReader {
+
+ // save the col type change info.
+ private Map<Integer, Pair<DataType, DataType>> typeChangeInfos;
+
+ private ColumnarBatch columnarBatch;
+
+ private Map<Integer, WritableColumnVector> idToColumnVectors;
+
+ private WritableColumnVector[] columnVectors;
+
+ // The capacity of vectorized batch.
+ private int capacity;
+
+ // If true, this class returns batches instead of rows.
+ private boolean returnColumnarBatch;
+
+ // The memory mode of the columnarBatch.
+ private final MemoryMode memoryMode;
+
+ /**
+ * Batch of rows that we assemble and the current index we've returned.
Every time this
+ * batch is used up (batchIdx == numBatched), we populated the batch.
+ */
+ private int batchIdx = 0;
+ private int numBatched = 0;
+
+ public Spark24HoodieVectorizedParquetRecordReader(
+ TimeZone convertTz,
+ boolean useOffHeap,
+ int capacity,
+ Map<Integer, Pair<DataType, DataType>> typeChangeInfos) {
+ super(convertTz, useOffHeap, capacity);
+ memoryMode = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+ this.typeChangeInfos = typeChangeInfos;
+ this.capacity = capacity;
+ }
+
+ @Override
+ public void initBatch(StructType partitionColumns, InternalRow
partitionValues) {
+ super.initBatch(partitionColumns, partitionValues);
+ if (columnVectors == null) {
+ columnVectors = new WritableColumnVector[sparkSchema.length() +
partitionColumns.length()];
+ }
+ if (idToColumnVectors == null) {
+ idToColumnVectors = new HashMap<>();
+ typeChangeInfos.entrySet()
+ .stream()
+ .forEach(f -> {
+ WritableColumnVector vector =
+ memoryMode == MemoryMode.OFF_HEAP ? new
OffHeapColumnVector(capacity, f.getValue().getLeft()) : new
OnHeapColumnVector(capacity, f.getValue().getLeft());
+ idToColumnVectors.put(f.getKey(), vector);
+ });
+ }
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext) throws IOException, InterruptedException,
UnsupportedOperationException {
+ super.initialize(inputSplit, taskAttemptContext);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ for (Map.Entry<Integer, WritableColumnVector> e :
idToColumnVectors.entrySet()) {
+ e.getValue().close();
+ }
+ idToColumnVectors = null;
+ columnarBatch = null;
+ columnVectors = null;
+ }
+
+ @Override
+ public ColumnarBatch resultBatch() {
+ ColumnarBatch currentColumnBatch = super.resultBatch();
+ boolean changed = false;
+ for (Map.Entry<Integer, Pair<DataType, DataType>> entry :
typeChangeInfos.entrySet()) {
+ boolean rewrite = SparkInternalSchemaConverter
+ .convertColumnVectorType((WritableColumnVector)
currentColumnBatch.column(entry.getKey()),
+ idToColumnVectors.get(entry.getKey()),
currentColumnBatch.numRows());
+ if (rewrite) {
+ changed = true;
+ columnVectors[entry.getKey()] = idToColumnVectors.get(entry.getKey());
+ }
+ }
+ if (changed) {
+ if (columnarBatch == null) {
+ // fill other vector
+ for (int i = 0; i < columnVectors.length; i++) {
+ if (columnVectors[i] == null) {
+ columnVectors[i] = (WritableColumnVector)
currentColumnBatch.column(i);
+ }
+ }
+ columnarBatch = new ColumnarBatch(columnVectors);
+ }
+ columnarBatch.setNumRows(currentColumnBatch.numRows());
+ return columnarBatch;
+ } else {
+ return currentColumnBatch;
+ }
+ }
+
+ @Override
+ public boolean nextBatch() throws IOException {
+ boolean result = super.nextBatch();
+ if (idToColumnVectors != null) {
+ idToColumnVectors.entrySet().stream().forEach(e -> e.getValue().reset());
+ }
+ numBatched = resultBatch().numRows();
+ batchIdx = 0;
+ return result;
+ }
+
+ @Override
+ public void enableReturningBatches() {
+ returnColumnarBatch = true;
+ super.enableReturningBatches();
+ }
+
+ @Override
+ public Object getCurrentValue() {
+ if (typeChangeInfos == null || typeChangeInfos.isEmpty()) {
+ return super.getCurrentValue();
+ }
+
+ if (returnColumnarBatch) {
+ return columnarBatch == null ? super.getCurrentValue() : columnarBatch;
+ }
+
+ return columnarBatch == null ? super.getCurrentValue() :
columnarBatch.getRow(batchIdx - 1);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ resultBatch();
+
+ if (returnColumnarBatch) {
+ return nextBatch();
+ }
+
+ if (batchIdx >= numBatched) {
+ if (!nextBatch()) {
+ return false;
+ }
+ }
+ ++batchIdx;
+ return true;
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
index 6fb5c50c03..1a8585b38a 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
@@ -31,12 +31,12 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{PartitionedFile,
RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
import java.net.URI
@@ -159,8 +159,17 @@ class Spark24HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
}
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+
+ // Clone new conf
+ val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
+ val (implicitTypeChangeInfos, sparkRequestSchema) =
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(hadoopAttemptConf,
footerFileMetaData, requiredSchema)
+
+ if (!implicitTypeChangeInfos.isEmpty) {
+ hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
sparkRequestSchema.json)
+ }
+
val hadoopAttemptContext =
- new TaskAttemptContextImpl(broadcastedHadoopConf.value.value,
attemptId)
+ new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
@@ -169,17 +178,29 @@ class Spark24HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
- val vectorizedReader = new VectorizedParquetRecordReader(
- convertTz.orNull, enableOffHeapColumnVector &&
taskContext.isDefined, capacity)
+ val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) {
+ new Spark24HoodieVectorizedParquetRecordReader(
+ convertTz.orNull,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity,
+ implicitTypeChangeInfos
+ )
+ } else {
+ new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
+ }
+
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before
`initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
- logDebug(s"Appending $partitionSchema ${file.partitionValues}")
// NOTE: We're making appending of the partitioned values to the rows
read from the
// data file configurable
if (shouldAppendPartitionValues) {
+ logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
} else {
vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
@@ -194,11 +215,12 @@ class Spark24HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns UnsafeRow
+ val readSupport = new ParquetReadSupport(convertTz)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
- new ParquetRecordReader[UnsafeRow](new
ParquetReadSupport(convertTz), parquetFilter)
+ new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
} else {
- new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+ new ParquetRecordReader[UnsafeRow](readSupport)
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion lister before
`initialization`.
@@ -206,8 +228,21 @@ class Spark24HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
- val joinedRow = new JoinedRow()
- val appendPartitionColumns =
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+ val unsafeProjection = if (implicitTypeChangeInfos.isEmpty) {
+ GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+ } else {
+ val newFullSchema = new
StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
+ if (implicitTypeChangeInfos.containsKey(i)) {
+ StructField(f.name, implicitTypeChangeInfos.get(i).getRight,
f.nullable, f.metadata)
+ } else f
+ }).toAttributes ++ partitionSchema.toAttributes
+ val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
+ if (implicitTypeChangeInfos.containsKey(i)) {
+ Cast(attr, implicitTypeChangeInfos.get(i).getLeft)
+ } else attr
+ }
+ GenerateUnsafeProjection.generate(castSchema, newFullSchema)
+ }
// This is a horrible erasure hack... if we type the iterator above,
then it actually check
// the type in next() and we get a class cast exception. If we make
that function return
@@ -217,13 +252,14 @@ class Spark24HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
// data file configurable
if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
// There is no partition columns
- iter.asInstanceOf[Iterator[InternalRow]]
+ iter.asInstanceOf[Iterator[InternalRow]].map(unsafeProjection)
} else {
+ val joinedRow = new JoinedRow()
iter.asInstanceOf[Iterator[InternalRow]]
- .map(d => appendPartitionColumns(joinedRow(d,
file.partitionValues)))
+ .map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
-
}
}
}
+
}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
index 1a1a44d858..9edd1321b1 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
@@ -151,7 +151,7 @@ class Spark31HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
// Internal schema has to be pruned at this point
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
- val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) &&
querySchemaOption.isPresent
+ var shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) &&
querySchemaOption.isPresent
val tablePath =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val fileSchema = if (shouldUseInternalSchema) {
@@ -223,7 +223,8 @@ class Spark31HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
// Clone new conf
val hadoopAttemptConf = new
Configuration(broadcastedHadoopConf.value.value)
- var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
+
+ val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] =
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema,
querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema =
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
@@ -231,7 +232,12 @@ class Spark31HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(),
mergedInternalSchema)
} else {
- new java.util.HashMap()
+ val (implicitTypeChangeInfo, sparkRequestSchema) =
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(hadoopAttemptConf,
footerFileMetaData, requiredSchema)
+ if (!implicitTypeChangeInfo.isEmpty) {
+ shouldUseInternalSchema = true
+ hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
sparkRequestSchema.json)
+ }
+ implicitTypeChangeInfo
}
val hadoopAttemptContext =
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
index 52d450029e..ae686d33a3 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
@@ -146,7 +146,7 @@ class Spark32PlusHoodieParquetFileFormat(private val
shouldAppendPartitionValues
// Internal schema has to be pruned at this point
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
- val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) &&
querySchemaOption.isPresent
+ var shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) &&
querySchemaOption.isPresent
val tablePath =
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val fileSchema = if (shouldUseInternalSchema) {
@@ -228,7 +228,12 @@ class Spark32PlusHoodieParquetFileFormat(private val
shouldAppendPartitionValues
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(),
mergedInternalSchema)
} else {
- new java.util.HashMap()
+ val (implicitTypeChangeInfo, sparkRequestSchema) =
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(hadoopAttemptConf,
footerFileMetaData, requiredSchema)
+ if (!implicitTypeChangeInfo.isEmpty) {
+ shouldUseInternalSchema = true
+ hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
sparkRequestSchema.json)
+ }
+ implicitTypeChangeInfo
}
val hadoopAttemptContext =
@@ -394,7 +399,6 @@ class Spark32PlusHoodieParquetFileFormat(private val
shouldAppendPartitionValues
}
}
}
-
}
object Spark32PlusHoodieParquetFileFormat {