This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc5566d5735 [HUDI-8817] Ensure we can create diff table versions(6 and 
8) using 1.0 binary  (#12615)
bc5566d5735 is described below

commit bc5566d573526720b1ed6091ae1f753009bf4bf3
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Mon Jan 27 11:57:18 2025 -0800

    [HUDI-8817] Ensure we can create diff table versions(6 and 8) using 1.0 
binary  (#12615)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  6 ++-
 .../org/apache/hudi/util/SparkConfigUtils.scala    |  4 +-
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |  4 +-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 44 +++++++++++++++--
 .../functional/TestWriteTableVersionConfig.scala   | 56 ++++++++++++++++++++++
 5 files changed, 105 insertions(+), 9 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c84d95abff9..c6a9559b2c8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -49,7 +49,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient, T
 import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
 import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
-import 
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
WRITE_TABLE_VERSION}
+import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
HoodieRecordCreationException, HoodieWriteConflictException}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -66,6 +66,7 @@ import org.apache.hudi.storage.HoodieStorage
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
 import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
+import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
 import org.apache.hudi.util.SparkKeyGenUtils
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import 
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
@@ -302,6 +303,7 @@ class HoodieSparkSqlWriterInternal {
           else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
         HoodieTableMetaClient.newTableBuilder()
           .setTableType(tableType)
+          .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters, 
HoodieWriteConfig.WRITE_TABLE_VERSION)))
           .setDatabaseName(databaseName)
           .setTableName(tblName)
           .setBaseFileFormat(baseFileFormat)
@@ -748,7 +750,7 @@ class HoodieSparkSqlWriterInternal {
           .setTableType(HoodieTableType.valueOf(tableType))
           .setTableName(tableName)
           .setRecordKeyFields(recordKeyFields)
-          .setTableVersion(hoodieConfig.getIntOrDefault(WRITE_TABLE_VERSION))
+          .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters, 
HoodieWriteConfig.WRITE_TABLE_VERSION)))
           .setArchiveLogFolder(archiveLogFolder)
           .setPayloadClassName(payloadClass)
           
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
index 88f223284dd..107a3968c82 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
@@ -28,11 +28,11 @@ object SparkConfigUtils {
    * the alternate config keys for the specified key as well.
    *
    * @param props          Configs in scala map
-   * @param configProperty {@link ConfigProperty} config of String type to 
fetch.
+   * @param configProperty {@link ConfigProperty} config of type T to fetch.
    * @return String value if the config exists; default String value if the 
config does not exist
    *         and there is default value defined in the {@link ConfigProperty} 
config; {@code null} otherwise.
    */
-  def getStringWithAltKeys(props: Map[String, String], configProperty: 
ConfigProperty[String]): String = {
+  def getStringWithAltKeys[T](props: Map[String, String], configProperty: 
ConfigProperty[T]): String = {
     ConfigUtils.getStringWithAltKeys(JFunction.toJavaFunction[String, 
Object](key => props.getOrElse(key, null)), configProperty)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 1a23356381f..dc819560e36 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -28,12 +28,13 @@ import 
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
 import org.apache.hudi.common.table.timeline.TimelineUtils
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.keygen.constant.{KeyGeneratorOptions, KeyGeneratorType}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.storage.HoodieStorageUtils
 import org.apache.hudi.util.SparkConfigUtils
-
+import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.avro.SchemaConverters
@@ -225,6 +226,7 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
 
       HoodieTableMetaClient.newTableBuilder()
         .fromProperties(properties)
+        .setTableVersion(Integer.valueOf(getStringWithAltKeys(tableConfigs, 
HoodieWriteConfig.WRITE_TABLE_VERSION)))
         .setDatabaseName(catalogDatabaseName)
         .setTableName(table.identifier.table)
         .setTableCreateSchema(schema.toString())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 9cfc01ae6b2..b020598aa24 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -237,6 +237,30 @@ class TestHoodieSparkSqlWriter extends 
HoodieSparkWriterTestBase {
     assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, 
tableModifier2, dataFrame2)._1)
   }
 
+  /**
+   * Test case for do not let the parttitonpath field change
+   */
+  @Test
+  def testChangeWriteTableVersion(): Unit = {
+    Seq(6, 8).foreach { tableVersion =>
+      val tempPath = s"$tempBasePath/${tableVersion}"
+      val tableModifier1 = Map(
+        "path" -> tempPath,
+        HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+        "hoodie.write.table.version" -> s"$tableVersion",
+        "hoodie.datasource.write.recordkey.field" -> "uuid",
+        "hoodie.datasource.write.partitionpath.field" -> "ts"
+      )
+      val dataFrame = 
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new 
Date().getTime)))
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, 
tableModifier1, dataFrame)
+
+      // Make sure table version is adopted.
+      val metaClient = HoodieTableMetaClient.builder().setBasePath(tempPath)
+        
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
+      assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), 
tableVersion)
+    }
+  }
+
   /**
    * Test case for each bulk insert sort mode
    *
@@ -518,13 +542,14 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
 
   /**
    * Test cases for HoodieSparkSqlWriter functionality with datasource 
bootstrap
-   * for different type of tables.
+   * for different type of tables and table versions.
    *
    * @param tableType Type of table
+   * @param tableVersion Version of table
    */
   @ParameterizedTest
-  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
-  def testWithDatasourceBootstrapForTableType(tableType: String): Unit = {
+  @MethodSource(Array("bootstrapTestParams"))
+  def testWithDatasourceBootstrapForTableType(tableType: String, tableVersion: 
Int): Unit = {
     val srcPath = 
java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path")
     try {
       val sourceDF = 
TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, 
Collections.emptyList(), sc,
@@ -542,6 +567,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
         DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
         DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[NonpartitionedKeyGenerator].getCanonicalName,
         DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getCanonicalName,
+        "hoodie.write.table.version" -> tableVersion.toString,
         HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false")
       val fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
       initializeMetaClientForBootstrap(fooTableParams, tableType, 
addBootstrapPath = true, initBasePath = false)
@@ -564,6 +590,9 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
       assertFalse(ignoreResult)
       verify(client, times(2)).close()
 
+      // Assert the table version is adopted.
+      val metaClient = createMetaClient(spark, tempBasePath)
+      assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), 
tableVersion)
       // fetch all records from parquet files generated from write to hudi
       val actualDf = sqlContext.read.parquet(tempBasePath)
       assert(actualDf.count == 100)
@@ -1137,7 +1166,6 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
     
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[NonpartitionedKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
   }
 
-
   @Test
   def testNoKeyGenToSimpleKeyGen(): Unit = {
     val _spark = spark
@@ -1325,4 +1353,12 @@ object TestHoodieSparkSqlWriter {
       Arguments.arguments("*5/03/1*", Seq("2016/03/15")),
       Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
   }
+
+  def bootstrapTestParams(): java.util.stream.Stream[Arguments] = {
+    java.util.stream.Stream.of(
+      Arguments.arguments("MERGE_ON_READ", Integer.valueOf(8)),
+      Arguments.arguments("MERGE_ON_READ", Integer.valueOf(6)),
+      Arguments.arguments("COPY_ON_WRITE", Integer.valueOf(8))
+    )
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala
new file mode 100644
index 00000000000..515e62ea72b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals}
+import org.apache.hudi.common.table.HoodieTableConfig.VERSION
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+class TestWriteTableVersionConfig extends HoodieSparkSqlTestBase {
+
+  test("Test create table with various write version") {
+    Seq(6, 8).foreach { tableVersion =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val basePath = tmp.getCanonicalPath
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id int,
+             |  ts long,
+             |  dt string
+             | ) using hudi
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.write.table.version = $tableVersion
+             | )
+             | partitioned by(dt)
+             | location '$basePath'
+         """.stripMargin)
+        val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
+          
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
+        assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), 
tableVersion)
+      }
+    }
+  }
+}

Reply via email to