This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9511ad50b1889aec38b794534ab047bc83e7059a Author: Y Ethan Guo <[email protected]> AuthorDate: Thu Oct 30 02:19:39 2025 -0700 fix: Avoid changing table configs when creating a table with an existing base path on Spark (#14175) --- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 41 ++++------ .../spark/sql/hudi/ddl/TestCreateTable.scala | 89 ++++++++++++++++++++-- 2 files changed, 96 insertions(+), 34 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 89577b224ab3..e2931d9c981d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -19,15 +19,12 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper} import org.apache.hudi.DataSourceWriteOptions.OPERATION -import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieWriterUtils._ -import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties} import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING import org.apache.hudi.common.table.timeline.TimelineUtils -import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hadoop.fs.HadoopFSUtils @@ -196,30 +193,22 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten */ def initHoodieTable(): Unit = { logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}") + // This runs validation against table to make sure there is no conflict of the table already exists val (finalSchema, tableConfigs) = parseSchemaAndConfigs() - // The TableSchemaResolver#getTableAvroSchemaInternal has a premise that - // the table create schema does not include the metadata fields. - // When flag includeMetadataFields is false, no metadata fields should be included in the resolved schema. - val dataSchema = removeMetaFields(finalSchema) - - table = table.copy(schema = finalSchema) - - // Save all the table config to the hoodie.properties. - val properties = TypedProperties.fromMap(tableConfigs.asJava) - - val catalogDatabaseName = formatName(spark, - table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase)) - if (hoodieTableExists) { - checkArgument(StringUtils.isNullOrEmpty(databaseName) || databaseName == catalogDatabaseName, - "The database names from this hoodie path and this catalog table is not same.") - val recordName = AvroSchemaUtils.getAvroRecordQualifiedName(table.identifier.table) - // just persist hoodie.table.create.schema - HoodieTableMetaClient.newTableBuilder() - .fromProperties(properties) - .setDatabaseName(catalogDatabaseName) - .setTableCreateSchema(SchemaConverters.toAvroType(dataSchema, recordName = recordName).toString()) - .initTable(storageConf, tableLocation) - } else { + if (!hoodieTableExists) { + // The TableSchemaResolver#getTableAvroSchemaInternal has a premise that + // the table create schema does not include the metadata fields. + // When flag includeMetadataFields is false, no metadata fields should be included in the resolved schema. + val dataSchema = removeMetaFields(finalSchema) + + table = table.copy(schema = finalSchema) + + // Save all the table config to the hoodie.properties. + val properties = TypedProperties.fromMap(tableConfigs.asJava) + + val catalogDatabaseName = formatName(spark, + table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase)) + val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table) val schema = SchemaConverters.toAvroType(dataSchema, nullable = false, recordName, namespace) val partitionColumns = if (SparkConfigUtils.containsConfigProperty(tableConfigs, KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index 7d3a7f2a175a..93e54cbf7beb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -19,23 +19,26 @@ package org.apache.spark.sql.hudi.ddl import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.keygen.constant.KeyGeneratorType +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StoragePath} +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import org.apache.hudi.testutils.Assertions import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.functions.{col, concat, expr, lit} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.{disableComplexKeygenValidation, getLastCommitMetadata} import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull, assertTrue} import scala.collection.JavaConverters._ @@ -230,6 +233,81 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } } + test("Test Create External Hoodie Table with data") { + withTempDir { tmp => + val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(), + HoodieTableConfig.ORDERING_FIELDS.key -> "ordering", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", + HoodieWriteConfig.TBL_NAME.key -> "table1") + val df = spark.range(0, 3).toDF("keyid") + .withColumn("ordering", expr("keyid")) + .withColumn("label", concat(lit("a"), col("keyid"))) + .withColumn("age", lit(1)) + .withColumn("p", lit(2)) + + val basePath = tmp.getCanonicalPath + df.write.format("hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION.key, "insert") + .option("hoodie.insert.shuffle.parallelism", "4") + .mode(SaveMode.Overwrite).save(tmp.getCanonicalPath) + + val storage = HoodieStorageUtils.getStorage( + basePath, new HadoopStorageConfiguration(spark.sparkContext.hadoopConfiguration)) + assertTrue(storage.exists(new StoragePath(basePath))) + val tableConfigPath = new StoragePath(basePath, ".hoodie/hoodie.properties") + assertTrue(storage.exists(tableConfigPath)) + val tableConfigModificationTime = storage.getPathInfo(tableConfigPath).getModificationTime + validateExternalTableCreation(storage, basePath, tableConfigModificationTime, Option.empty) + validateExternalTableCreation(storage, basePath, tableConfigModificationTime, Option("new_database")) + validateExternalTableCreation(storage, basePath, tableConfigModificationTime, Option("another_database")) + } + } + + private def validateExternalTableCreation(storage: HoodieStorage, + basePath: String, + tableConfigModificationTime: Long, + databaseName: Option[String]): Unit = { + val tableName = "table" + if (databaseName.isDefined) { + spark.sql( + s""" + |create database if not exists ${databaseName.get} + """.stripMargin) + } + val tableIdentifier = if (databaseName.isDefined) databaseName.get + "." + tableName else tableName + spark.sql( + s""" + |create table $tableIdentifier + |using hudi + |location '$basePath' + """.stripMargin) + // Table config should not be changed for an external table + val tableConfigPath = new StoragePath(basePath, ".hoodie/hoodie.properties") + assertEquals(tableConfigModificationTime, storage.getPathInfo(tableConfigPath).getModificationTime) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName, databaseName)) + assertResult(table.tableType)(CatalogTableType.EXTERNAL) + assertResult(table.properties("type"))("mor") + assertResult(table.properties("primaryKey"))("keyid") + assertResult( + HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType)) + ++ Seq( + StructField("keyid", LongType), + StructField("ordering", LongType), + StructField("label", StringType), + StructField("age", IntegerType), + StructField("p", IntegerType) + ) + )(table.schema.fields) + checkAnswer(s"select keyid, label from $tableIdentifier")( + Seq(0L, "a0"), + Seq(1L, "a1"), + Seq(2L, "a2") + ) + } + test("Test Table Column Validate") { withTempDir { tmp => val tableName = generateTableName @@ -907,10 +985,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase { // Check the missing properties for spark sql val metaClient = createMetaClient(spark, tablePath) val properties = metaClient.getTableConfig.getProps.asScala.toMap - assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key)) - assertResult("hudi_database")(metaClient.getTableConfig.getDatabaseName) + assertNull(metaClient.getTableConfig.getDatabaseName) assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName) // Test insert into @@ -978,7 +1055,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase { // Check the missing properties for spark sql val metaClient = createMetaClient(spark, tablePath) val properties = metaClient.getTableConfig.getProps.asScala.toMap - assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult("day,hh")(properties(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key)) @@ -1064,7 +1140,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase { // Check the missing properties for spark sql val metaClient = createMetaClient(spark, tablePath) val properties = metaClient.getTableConfig.getProps.asScala.toMap - assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult("day,hh")(properties(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key)) @@ -1156,7 +1231,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase { // Check the missing properties for spark sql val metaClient = createMetaClient(spark, tablePath) val properties = metaClient.getTableConfig.getProps.asScala.toMap - assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult("id,name")(properties(HoodieTableConfig.RECORDKEY_FIELDS.key)) assertResult("day,hh")(properties(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key)) @@ -1227,7 +1301,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase { // Check the missing properties for spark sql val metaClient = createMetaClient(spark, tmp.getCanonicalPath) val properties = metaClient.getTableConfig.getProps.asScala.toMap - assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key)) // Test insert into
