This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bc5566d5735 [HUDI-8817] Ensure we can create diff table versions(6 and
8) using 1.0 binary (#12615)
bc5566d5735 is described below
commit bc5566d573526720b1ed6091ae1f753009bf4bf3
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Mon Jan 27 11:57:18 2025 -0800
[HUDI-8817] Ensure we can create diff table versions(6 and 8) using 1.0
binary (#12615)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 6 ++-
.../org/apache/hudi/util/SparkConfigUtils.scala | 4 +-
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 4 +-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 44 +++++++++++++++--
.../functional/TestWriteTableVersionConfig.scala | 56 ++++++++++++++++++++++
5 files changed, 105 insertions(+), 9 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c84d95abff9..c6a9559b2c8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -49,7 +49,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient, T
import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option =>
HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
-import
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY,
WRITE_TABLE_VERSION}
+import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -66,6 +66,7 @@ import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
+import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
import org.apache.hudi.util.SparkKeyGenUtils
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
@@ -302,6 +303,7 @@ class HoodieSparkSqlWriterInternal {
else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
HoodieTableMetaClient.newTableBuilder()
.setTableType(tableType)
+ .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters,
HoodieWriteConfig.WRITE_TABLE_VERSION)))
.setDatabaseName(databaseName)
.setTableName(tblName)
.setBaseFileFormat(baseFileFormat)
@@ -748,7 +750,7 @@ class HoodieSparkSqlWriterInternal {
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
.setRecordKeyFields(recordKeyFields)
- .setTableVersion(hoodieConfig.getIntOrDefault(WRITE_TABLE_VERSION))
+ .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters,
HoodieWriteConfig.WRITE_TABLE_VERSION)))
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(payloadClass)
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
index 88f223284dd..107a3968c82 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
@@ -28,11 +28,11 @@ object SparkConfigUtils {
* the alternate config keys for the specified key as well.
*
* @param props Configs in scala map
- * @param configProperty {@link ConfigProperty} config of String type to
fetch.
+ * @param configProperty {@link ConfigProperty} config of type T to fetch.
* @return String value if the config exists; default String value if the
config does not exist
* and there is default value defined in the {@link ConfigProperty}
config; {@code null} otherwise.
*/
- def getStringWithAltKeys(props: Map[String, String], configProperty:
ConfigProperty[String]): String = {
+ def getStringWithAltKeys[T](props: Map[String, String], configProperty:
ConfigProperty[T]): String = {
ConfigUtils.getStringWithAltKeys(JFunction.toJavaFunction[String,
Object](key => props.getOrElse(key, null)), configProperty)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 1a23356381f..dc819560e36 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -28,12 +28,13 @@ import
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
import org.apache.hudi.common.table.timeline.TimelineUtils
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.constant.{KeyGeneratorOptions, KeyGeneratorType}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.storage.HoodieStorageUtils
import org.apache.hudi.util.SparkConfigUtils
-
+import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
@@ -225,6 +226,7 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
HoodieTableMetaClient.newTableBuilder()
.fromProperties(properties)
+ .setTableVersion(Integer.valueOf(getStringWithAltKeys(tableConfigs,
HoodieWriteConfig.WRITE_TABLE_VERSION)))
.setDatabaseName(catalogDatabaseName)
.setTableName(table.identifier.table)
.setTableCreateSchema(schema.toString())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 9cfc01ae6b2..b020598aa24 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -237,6 +237,30 @@ class TestHoodieSparkSqlWriter extends
HoodieSparkWriterTestBase {
assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite,
tableModifier2, dataFrame2)._1)
}
+ /**
+ * Test case for do not let the parttitonpath field change
+ */
+ @Test
+ def testChangeWriteTableVersion(): Unit = {
+ Seq(6, 8).foreach { tableVersion =>
+ val tempPath = s"$tempBasePath/${tableVersion}"
+ val tableModifier1 = Map(
+ "path" -> tempPath,
+ HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ "hoodie.write.table.version" -> s"$tableVersion",
+ "hoodie.datasource.write.recordkey.field" -> "uuid",
+ "hoodie.datasource.write.partitionpath.field" -> "ts"
+ )
+ val dataFrame =
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new
Date().getTime)))
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite,
tableModifier1, dataFrame)
+
+ // Make sure table version is adopted.
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(tempPath)
+
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
+ assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(),
tableVersion)
+ }
+ }
+
/**
* Test case for each bulk insert sort mode
*
@@ -518,13 +542,14 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
/**
* Test cases for HoodieSparkSqlWriter functionality with datasource
bootstrap
- * for different type of tables.
+ * for different type of tables and table versions.
*
* @param tableType Type of table
+ * @param tableVersion Version of table
*/
@ParameterizedTest
- @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
- def testWithDatasourceBootstrapForTableType(tableType: String): Unit = {
+ @MethodSource(Array("bootstrapTestParams"))
+ def testWithDatasourceBootstrapForTableType(tableType: String, tableVersion:
Int): Unit = {
val srcPath =
java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path")
try {
val sourceDF =
TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100,
Collections.emptyList(), sc,
@@ -542,6 +567,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[NonpartitionedKeyGenerator].getCanonicalName,
DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getCanonicalName,
+ "hoodie.write.table.version" -> tableVersion.toString,
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false")
val fooTableParams =
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
initializeMetaClientForBootstrap(fooTableParams, tableType,
addBootstrapPath = true, initBasePath = false)
@@ -564,6 +590,9 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
assertFalse(ignoreResult)
verify(client, times(2)).close()
+ // Assert the table version is adopted.
+ val metaClient = createMetaClient(spark, tempBasePath)
+ assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(),
tableVersion)
// fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(tempBasePath)
assert(actualDf.count == 100)
@@ -1137,7 +1166,6 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[NonpartitionedKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
}
-
@Test
def testNoKeyGenToSimpleKeyGen(): Unit = {
val _spark = spark
@@ -1325,4 +1353,12 @@ object TestHoodieSparkSqlWriter {
Arguments.arguments("*5/03/1*", Seq("2016/03/15")),
Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
}
+
+ def bootstrapTestParams(): java.util.stream.Stream[Arguments] = {
+ java.util.stream.Stream.of(
+ Arguments.arguments("MERGE_ON_READ", Integer.valueOf(8)),
+ Arguments.arguments("MERGE_ON_READ", Integer.valueOf(6)),
+ Arguments.arguments("COPY_ON_WRITE", Integer.valueOf(8))
+ )
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala
new file mode 100644
index 00000000000..515e62ea72b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals}
+import org.apache.hudi.common.table.HoodieTableConfig.VERSION
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+class TestWriteTableVersionConfig extends HoodieSparkSqlTestBase {
+
+ test("Test create table with various write version") {
+ Seq(6, 8).foreach { tableVersion =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = tmp.getCanonicalPath
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | ts long,
+ | dt string
+ | ) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.write.table.version = $tableVersion
+ | )
+ | partitioned by(dt)
+ | location '$basePath'
+ """.stripMargin)
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
+
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
+ assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(),
tableVersion)
+ }
+ }
+ }
+}