This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9511ad50b1889aec38b794534ab047bc83e7059a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Oct 30 02:19:39 2025 -0700

    fix: Avoid changing table configs when creating a table with an existing 
base path on Spark (#14175)
---
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  | 41 ++++------
 .../spark/sql/hudi/ddl/TestCreateTable.scala       | 89 ++++++++++++++++++++--
 2 files changed, 96 insertions(+), 34 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 89577b224ab3..e2931d9c981d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -19,15 +19,12 @@ package org.apache.spark.sql.catalyst.catalog
 
 import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
 import org.apache.hudi.DataSourceWriteOptions.OPERATION
-import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.HoodieWriterUtils._
-import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
TypedProperties}
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
 import org.apache.hudi.common.table.timeline.TimelineUtils
-import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -196,30 +193,22 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
    */
   def initHoodieTable(): Unit = {
     logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}")
+    // This runs validation against table to make sure there is no conflict of 
the table already exists
     val (finalSchema, tableConfigs) = parseSchemaAndConfigs()
-    // The TableSchemaResolver#getTableAvroSchemaInternal has a premise that
-    // the table create schema does not include the metadata fields.
-    // When flag includeMetadataFields is false, no metadata fields should be 
included in the resolved schema.
-    val dataSchema = removeMetaFields(finalSchema)
-
-    table = table.copy(schema = finalSchema)
-
-    // Save all the table config to the hoodie.properties.
-    val properties = TypedProperties.fromMap(tableConfigs.asJava)
-
-    val catalogDatabaseName = formatName(spark,
-      
table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
-    if (hoodieTableExists) {
-      checkArgument(StringUtils.isNullOrEmpty(databaseName) || databaseName == 
catalogDatabaseName,
-        "The database names from this hoodie path and this catalog table is 
not same.")
-      val recordName = 
AvroSchemaUtils.getAvroRecordQualifiedName(table.identifier.table)
-      // just persist hoodie.table.create.schema
-      HoodieTableMetaClient.newTableBuilder()
-        .fromProperties(properties)
-        .setDatabaseName(catalogDatabaseName)
-        .setTableCreateSchema(SchemaConverters.toAvroType(dataSchema, 
recordName = recordName).toString())
-        .initTable(storageConf, tableLocation)
-    } else {
+    if (!hoodieTableExists) {
+      // The TableSchemaResolver#getTableAvroSchemaInternal has a premise that
+      // the table create schema does not include the metadata fields.
+      // When flag includeMetadataFields is false, no metadata fields should 
be included in the resolved schema.
+      val dataSchema = removeMetaFields(finalSchema)
+
+      table = table.copy(schema = finalSchema)
+
+      // Save all the table config to the hoodie.properties.
+      val properties = TypedProperties.fromMap(tableConfigs.asJava)
+
+      val catalogDatabaseName = formatName(spark,
+        
table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
+
       val (recordName, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
       val schema = SchemaConverters.toAvroType(dataSchema, nullable = false, 
recordName, namespace)
       val partitionColumns = if 
(SparkConfigUtils.containsConfigProperty(tableConfigs, 
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index 7d3a7f2a175a..93e54cbf7beb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -19,23 +19,26 @@ package org.apache.spark.sql.hudi.ddl
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, 
WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
 import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StoragePath}
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 import org.apache.hudi.testutils.Assertions
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, 
HoodieCatalogTable}
+import org.apache.spark.sql.functions.{col, concat, expr, lit}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.{disableComplexKeygenValidation,
 getLastCommitMetadata}
 import org.apache.spark.sql.types._
-import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNull, assertTrue}
 
 import scala.collection.JavaConverters._
 
@@ -230,6 +233,81 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Create External Hoodie Table with data") {
+    withTempDir { tmp =>
+      val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.MERGE_ON_READ.name(),
+        HoodieTableConfig.ORDERING_FIELDS.key -> "ordering",
+        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid",
+        DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
+        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+        HoodieWriteConfig.TBL_NAME.key -> "table1")
+      val df = spark.range(0, 3).toDF("keyid")
+        .withColumn("ordering", expr("keyid"))
+        .withColumn("label", concat(lit("a"), col("keyid")))
+        .withColumn("age", lit(1))
+        .withColumn("p", lit(2))
+
+      val basePath = tmp.getCanonicalPath
+      df.write.format("hudi")
+        .options(options)
+        .option(DataSourceWriteOptions.OPERATION.key, "insert")
+        .option("hoodie.insert.shuffle.parallelism", "4")
+        .mode(SaveMode.Overwrite).save(tmp.getCanonicalPath)
+
+      val storage = HoodieStorageUtils.getStorage(
+        basePath, new 
HadoopStorageConfiguration(spark.sparkContext.hadoopConfiguration))
+      assertTrue(storage.exists(new StoragePath(basePath)))
+      val tableConfigPath = new StoragePath(basePath, 
".hoodie/hoodie.properties")
+      assertTrue(storage.exists(tableConfigPath))
+      val tableConfigModificationTime = 
storage.getPathInfo(tableConfigPath).getModificationTime
+      validateExternalTableCreation(storage, basePath, 
tableConfigModificationTime, Option.empty)
+      validateExternalTableCreation(storage, basePath, 
tableConfigModificationTime, Option("new_database"))
+      validateExternalTableCreation(storage, basePath, 
tableConfigModificationTime, Option("another_database"))
+    }
+  }
+
+  private def validateExternalTableCreation(storage: HoodieStorage,
+                                            basePath: String,
+                                            tableConfigModificationTime: Long,
+                                            databaseName: Option[String]): 
Unit = {
+    val tableName = "table"
+    if (databaseName.isDefined) {
+      spark.sql(
+        s"""
+           |create database if not exists ${databaseName.get}
+         """.stripMargin)
+    }
+    val tableIdentifier = if (databaseName.isDefined) databaseName.get + "." + 
tableName else tableName
+    spark.sql(
+      s"""
+         |create table $tableIdentifier
+         |using hudi
+         |location '$basePath'
+         """.stripMargin)
+    // Table config should not be changed for an external table
+    val tableConfigPath = new StoragePath(basePath, 
".hoodie/hoodie.properties")
+    assertEquals(tableConfigModificationTime, 
storage.getPathInfo(tableConfigPath).getModificationTime)
+    val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName, 
databaseName))
+    assertResult(table.tableType)(CatalogTableType.EXTERNAL)
+    assertResult(table.properties("type"))("mor")
+    assertResult(table.properties("primaryKey"))("keyid")
+    assertResult(
+      HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType))
+        ++ Seq(
+        StructField("keyid", LongType),
+        StructField("ordering", LongType),
+        StructField("label", StringType),
+        StructField("age", IntegerType),
+        StructField("p", IntegerType)
+      )
+    )(table.schema.fields)
+    checkAnswer(s"select keyid, label from $tableIdentifier")(
+      Seq(0L, "a0"),
+      Seq(1L, "a1"),
+      Seq(2L, "a2")
+    )
+  }
+
   test("Test Table Column Validate") {
     withTempDir { tmp =>
       val tableName = generateTableName
@@ -907,10 +985,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
         // Check the missing properties for spark sql
         val metaClient = createMetaClient(spark, tablePath)
         val properties = metaClient.getTableConfig.getProps.asScala.toMap
-        
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
         assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
         assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key))
-        
assertResult("hudi_database")(metaClient.getTableConfig.getDatabaseName)
+        assertNull(metaClient.getTableConfig.getDatabaseName)
         
assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName)
 
         // Test insert into
@@ -978,7 +1055,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
         // Check the missing properties for spark sql
         val metaClient = createMetaClient(spark, tablePath)
         val properties = metaClient.getTableConfig.getProps.asScala.toMap
-        
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
         
assertResult("day,hh")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
         assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key))
 
@@ -1064,7 +1140,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
         // Check the missing properties for spark sql
         val metaClient = createMetaClient(spark, tablePath)
         val properties = metaClient.getTableConfig.getProps.asScala.toMap
-        
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
         
assertResult("day,hh")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
         assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key))
 
@@ -1156,7 +1231,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
       // Check the missing properties for spark sql
       val metaClient = createMetaClient(spark, tablePath)
       val properties = metaClient.getTableConfig.getProps.asScala.toMap
-      
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
       
assertResult("id,name")(properties(HoodieTableConfig.RECORDKEY_FIELDS.key))
       
assertResult("day,hh")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
       assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key))
@@ -1227,7 +1301,6 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
       // Check the missing properties for spark sql
       val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
       val properties = metaClient.getTableConfig.getProps.asScala.toMap
-      
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
       assertResult("ts")(properties(HoodieTableConfig.ORDERING_FIELDS.key))
 
       // Test insert into

Reply via email to