This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e5f121cdd61 [HUDI-9215] Set partitionColumnsWithKeyGenerator based on 
table version (#13025)
e5f121cdd61 is described below

commit e5f121cdd6148ad365efa9f0d1703f0d821b9fd8
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue Mar 25 23:28:52 2025 +0530

    [HUDI-9215] Set partitionColumnsWithKeyGenerator based on table version 
(#13025)
---
 .../scala/org/apache/hudi/util/SparkKeyGenUtils.scala    |  6 ++++--
 .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala     | 10 ++++++----
 .../java/org/apache/hudi/cli/BootstrapExecutorUtils.java |  4 +++-
 .../hudi/utilities/streamer/BootstrapExecutor.java       |  3 ++-
 .../org/apache/hudi/utilities/streamer/StreamSync.java   |  3 ++-
 .../utilities/deltastreamer/TestHoodieDeltaStreamer.java | 16 ++++++++++++++++
 6 files changed, 33 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index 9df55526e2f..a495bbb4707 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.hudi.util
 
 import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.table.HoodieTableVersion
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -37,11 +38,12 @@ object SparkKeyGenUtils {
 
   /**
    * @param properties config properties
+   * @param writerTableVersion table version used by writer
    * @return partition columns
    */
-  def getPartitionColumnsForKeyGenerator(props: TypedProperties): String = {
+  def getPartitionColumnsForKeyGenerator(props: TypedProperties, 
writerTableVersion: HoodieTableVersion): String = {
     val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
-    getPartitionColumns(keyGenerator, props, true)
+    getPartitionColumns(keyGenerator, props, writerTableVersion.versionCode() 
> HoodieTableVersion.SIX.versionCode())
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 58cfebf596d..424dd9b4c20 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -277,6 +277,7 @@ class HoodieSparkSqlWriterInternal {
     }
 
     val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(hoodieConfig.getProps))
+    val tableVersion = Integer.valueOf(getStringWithAltKeys(parameters, 
HoodieWriteConfig.WRITE_TABLE_VERSION))
     if (mode == SaveMode.Ignore && tableExists) {
       log.warn(s"hoodie table at $basePath already exists. Ignoring & not 
performing actual writes.")
       (false, common.util.Option.empty(), common.util.Option.empty(), 
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
@@ -284,7 +285,7 @@ class HoodieSparkSqlWriterInternal {
       // Handle various save modes
       handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, 
tblName, operation, fs)
       val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters), 
false)
-      val partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters), 
true)
+      val partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters), 
HoodieTableVersion.fromVersionCode(tableVersion))
       val timelineTimeZone = 
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
       val tableMetaClient = if (tableExists) {
         HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
@@ -305,7 +306,7 @@ class HoodieSparkSqlWriterInternal {
           else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
         HoodieTableMetaClient.newTableBuilder()
           .setTableType(tableType)
-          .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters, 
HoodieWriteConfig.WRITE_TABLE_VERSION)))
+          .setTableVersion(tableVersion)
           .setDatabaseName(databaseName)
           .setTableName(tblName)
           .setBaseFileFormat(baseFileFormat)
@@ -729,7 +730,8 @@ class HoodieSparkSqlWriterInternal {
 
       if (!tableExists) {
         val archiveLogFolder = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_HISTORY_PATH)
-        val partitionColumnsWithType = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters))
+        val tableVersion = Integer.valueOf(getStringWithAltKeys(parameters, 
HoodieWriteConfig.WRITE_TABLE_VERSION))
+        val partitionColumnsWithType = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters), 
HoodieTableVersion.fromVersionCode(tableVersion))
         val recordKeyFields = 
hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
         val payloadClass = 
hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)
         val recordMergerStrategy = 
hoodieConfig.getString(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID)
@@ -752,7 +754,7 @@ class HoodieSparkSqlWriterInternal {
           .setTableType(HoodieTableType.valueOf(tableType))
           .setTableName(tableName)
           .setRecordKeyFields(recordKeyFields)
-          .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters, 
HoodieWriteConfig.WRITE_TABLE_VERSION)))
+          .setTableVersion(tableVersion)
           .setArchiveLogFolder(archiveLogFolder)
           .setPayloadClassName(payloadClass)
           
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index 40cb963ec72..fa30988eb45 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -281,7 +282,8 @@ public class BootstrapExecutorUtils implements Serializable 
{
       keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new 
HoodieConfig(props));
     }
     props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass);
-    String partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
+    HoodieTableVersion tableVersion = 
HoodieTableVersion.fromVersionCode(props.getInteger(HoodieWriteConfig.WRITE_TABLE_VERSION.key(),
 HoodieTableVersion.current().versionCode()));
+    String partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props, tableVersion);
 
     if (StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
       partitionColumnsForKeyGenerator = null;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
index 453c17ec4f4..547d49122ef 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -233,7 +234,7 @@ public class BootstrapExecutor implements Serializable {
         .setPartitionMetafileUseBaseFormat(props.getBoolean(
             PARTITION_METAFILE_USE_BASE_FORMAT.key(),
             PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
-    String partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
+    String partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props, 
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, 
WRITE_TABLE_VERSION)));
     if (!StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
       
builder.setPartitionFields(partitionColumnsForKeyGenerator).setKeyGeneratorClassProp(
           props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
SimpleKeyGenerator.class.getName()));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index bf3377a20ea..a67149c7190 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -419,7 +420,7 @@ public class StreamSync implements Serializable, Closeable {
 
   private HoodieTableMetaClient initializeEmptyTable() throws IOException {
     return initializeEmptyTable(HoodieTableMetaClient.newTableBuilder(),
-        SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props),
+        SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props, 
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, 
WRITE_TABLE_VERSION))),
         
HadoopFSUtils.getStorageConfWithCopy(hoodieSparkContext.hadoopConfiguration()));
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 675c03894c0..86c7502348e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -79,6 +79,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncClient;
 import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
+import org.apache.hudi.keygen.CustomKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.metrics.Metrics;
@@ -474,6 +475,21 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertEquals(configFlag, 
Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+  public void testPartitionKeyFieldsBasedOnVersion(HoodieTableVersion version) 
throws IOException {
+    String tablePath = basePath + "/partition_key_fields_meta_client" + 
version.versionCode();
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tablePath, 
WriteOperationType.INSERT);
+    cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + 
version.versionCode());
+    cfg.configs.add(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() + "=" + 
CustomKeyGenerator.class.getName());
+    
cfg.configs.add("hoodie.datasource.write.partitionpath.field=partition_path:simple");
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+    deltaStreamer.getIngestionService().ingestOnce();
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(context, tablePath);
+    String expectedPartitionFields = version.equals(HoodieTableVersion.SIX) ? 
"partition_path" : "partition_path:simple";
+    assertEquals(expectedPartitionFields, 
metaClient.getTableConfig().getString(HoodieTableConfig.PARTITION_FIELDS));
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType 
recordType) throws Exception {

Reply via email to