This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ea547e5681a [HUDI-6219] Ensure consistency between Spark catalog
schema and Hudi schema (#8725)
ea547e5681a is described below
commit ea547e5681a007e546b8ca8cb1399da0a4cd5012
Author: Wechar Yu <[email protected]>
AuthorDate: Wed May 24 21:53:11 2023 +0800
[HUDI-6219] Ensure consistency between Spark catalog schema and Hudi schema
(#8725)
---
.../apache/spark/sql/avro/SchemaConverters.scala | 2 +-
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 34 ++++++++++++----------
.../apache/spark/sql/hudi/TestCreateTable.scala | 23 ++++++++++++++-
.../hudi/TestNestedSchemaPruningOptimization.scala | 4 +--
4 files changed, 44 insertions(+), 19 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index c178d1b8491..b4e09f6d1f6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -168,7 +168,7 @@ private[sql] object SchemaConverters {
case FloatType => builder.floatType()
case DoubleType => builder.doubleType()
- case StringType => builder.stringType()
+ case StringType | CharType(_) | VarcharType(_) => builder.stringType()
case NullType => builder.nullType()
case d: DecimalType =>
val avroType = LogicalTypes.decimal(d.precision, d.scale)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index b8d93fa51e1..a329a943969 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -130,9 +130,20 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
lazy val baseFileFormat: String =
metaClient.getTableConfig.getBaseFileFormat.name()
/**
- * Table schema
+ * Firstly try to load table schema from meta directory on filesystem.
+ * If that fails then fallback to retrieving it from the Spark catalog.
*/
- lazy val tableSchema: StructType = table.schema
+ lazy val tableSchema: StructType = {
+ val schemaFromMetaOpt = loadTableSchemaByMetaClient()
+ if (schemaFromMetaOpt.nonEmpty) {
+ schemaFromMetaOpt.get
+ } else if (table.schema.nonEmpty) {
+ addMetaFields(table.schema)
+ } else {
+ throw new AnalysisException(
+ s"$catalogTableName does not contains schema fields.")
+ }
+ }
/**
* The schema without hoodie meta fields
@@ -237,16 +248,7 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
val options = extraTableConfig(hoodieTableExists, currentTableConfig)
++
mapSqlOptionsToTableConfigs(sqlOptions) ++ currentTableConfig
- val schemaFromMetaOpt = loadTableSchemaByMetaClient()
- val schema = if (schemaFromMetaOpt.nonEmpty) {
- schemaFromMetaOpt.get
- } else if (table.schema.nonEmpty) {
- addMetaFields(table.schema)
- } else {
- throw new AnalysisException(
- s"Missing schema fields when applying CREATE TABLE clause for
${catalogTableName}")
- }
- (schema, options)
+ (tableSchema, options)
case (_, false) =>
checkArgument(table.schema.nonEmpty,
@@ -314,7 +316,9 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
getTableSqlSchema(metaClient, includeMetadataFields =
true).map(originSchema => {
// Load table schema from meta on filesystem, and fill in 'comment'
// information from Spark catalog.
- val fields = originSchema.fields.map { f =>
+ // Hoodie newly added columns are positioned after partition columns,
+ // so it's necessary to reorder fields.
+ val (partFields, dataFields) = originSchema.fields.map { f =>
val nullableField: StructField = f.copy(nullable = true)
val catalogField = findColumnByName(table.schema, nullableField.name,
resolver)
if (catalogField.isDefined) {
@@ -322,8 +326,8 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
} else {
nullableField
}
- }
- StructType(fields)
+ }.partition(f => partitionFields.contains(f.name))
+ StructType(dataFields ++ partFields)
})
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 371ac5d97a1..ff474e7872c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -27,7 +27,7 @@ import
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType,
HoodieCatalogTable}
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
@@ -1082,4 +1082,25 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
assertTrue(exception.getMessage.contains(s"""$tableName is not a Hudi
table"""))
}
+
+ test("Test hoodie table schema consistency for non-Avro data types") {
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id tinyint,
+ | name varchar(10),
+ | price double,
+ | ts long
+ | ) using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val hoodieCatalogTable = new HoodieCatalogTable(spark, table)
+ val hoodieSchema =
HoodieSqlCommonUtils.getTableSqlSchema(hoodieCatalogTable.metaClient, true)
+ assertResult(hoodieSchema.get)(table.schema)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
index 4f758a9e4f7..f8fe24b2174 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
@@ -53,8 +53,8 @@ class TestNestedSchemaPruningOptimization extends
HoodieSparkSqlTestBase with Sp
val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName")
val expectedSchema = StructType(Seq(
- StructField("id", IntegerType, nullable = false),
- StructField("item" , StructType(Seq(StructField("name",
StringType, nullable = false))), nullable = false)
+ StructField("id", IntegerType, nullable = true),
+ StructField("item" , StructType(Seq(StructField("name",
StringType, nullable = false))), nullable = true)
))
val expectedReadSchemaClause = "ReadSchema:
struct<id:int,item:struct<name:string>>"