This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new af8080991d5 [HUDI-7996] Store partition type with partition fields in 
table configs (#11638)
af8080991d5 is described below

commit af8080991d5550ec6427d50d72e6b99c5094b432
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Aug 5 21:19:22 2024 +0530

    [HUDI-7996] Store partition type with partition fields in table configs 
(#11638)
    
    Store partition type as well in `hoodie.table.partition.fields`.
    Currently this config stores the commas separated partition fields
    without the partition type. Partition type here corresponds to the
    partition type supported in Custom key generators like `simple`
    and `timestamp`.
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/config/GlueCatalogSyncClientConfig.java   |  4 +-
 .../upgrade/EightToSevenDowngradeHandler.java      | 22 +++++-
 .../table/upgrade/SevenToEightUpgradeHandler.java  | 22 ++++--
 .../org/apache/hudi/util/SparkKeyGenUtils.scala    | 30 ++++++--
 .../apache/hudi/common/config/HoodieConfig.java    |  8 +-
 .../hudi/common/table/HoodieTableConfig.java       | 17 ++---
 .../hudi/common/table/HoodieTableVersion.java      |  4 +
 .../hudi/common/util/HoodieTableConfigUtils.java   | 88 ++++++++++++++++++++++
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |  2 +
 .../hudi/common/testutils/HoodieTestUtils.java     |  4 +
 .../hudi/gcp/bigquery/BigQuerySyncConfig.java      |  4 +-
 .../hudi/common/table/TestHoodieTableConfig.java   | 17 +++++
 .../hudi/connect/utils/KafkaConnectUtils.java      | 10 +--
 .../writers/KafkaConnectTransactionServices.java   |  2 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  9 ++-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  3 +-
 .../apache/hudi/cli/BootstrapExecutorUtils.java    | 20 ++---
 .../hudi/functional/TestSevenToEightUpgrade.scala  | 71 +++++++++++++++++
 .../apache/hudi/sync/common/HoodieSyncConfig.java  |  6 +-
 .../hudi/utilities/streamer/BootstrapExecutor.java |  6 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |  2 +-
 21 files changed, 286 insertions(+), 65 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
 
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
index 24954b87fbc..fca0c6f1461 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
@@ -83,7 +83,7 @@ public class GlueCatalogSyncClientConfig extends HoodieConfig 
{
   public static final ConfigProperty<String> META_SYNC_PARTITION_INDEX_FIELDS 
= ConfigProperty
       .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields")
       .noDefaultValue()
-      .withInferFunction(cfg -> 
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+      .withInferFunction(cfg -> 
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
           .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
       .sinceVersion("0.15.0")
       .withDocumentation(String.join(" ", "Specify the partitions fields to 
index on aws glue. Separate the fields by semicolon.",
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 60d387ef439..f9666acc0ed 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -20,17 +20,21 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -48,6 +52,7 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
   @Override
   public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
     UpgradeDowngradeUtils.runCompaction(table, context, config, 
upgradeDowngradeHelper);
     UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
 
@@ -73,6 +78,19 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
         return false;
       }, instants.size());
     }
-    return Collections.emptyMap();
+
+    downgradePartitionFields(config, context, upgradeDowngradeHelper, 
tablePropsToAdd);
+    return tablePropsToAdd;
+  }
+
+  private static void downgradePartitionFields(HoodieWriteConfig config, 
HoodieEngineContext context, SupportsUpgradeDowngrade upgradeDowngradeHelper,
+                                               Map<ConfigProperty, String> 
tablePropsToAdd) {
+    HoodieTableConfig tableConfig = upgradeDowngradeHelper.getTable(config, 
context).getMetaClient().getTableConfig();
+    String keyGenerator = tableConfig.getKeyGeneratorClassName();
+    String partitionPathField = 
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+    if (keyGenerator != null && partitionPathField != null
+        && (keyGenerator.equals(KeyGeneratorType.CUSTOM.getClassName()) || 
keyGenerator.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName()))) {
+      tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, 
tableConfig.getPartitionFieldProp());
+    }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index e614c92928b..df2b7408f38 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -27,9 +27,10 @@ import 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
 
-import java.util.Collections;
-import java.util.Hashtable;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -43,6 +44,7 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
                                              String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     final HoodieTableConfig tableConfig = 
upgradeDowngradeHelper.getTable(config, 
context).getMetaClient().getTableConfig();
 
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
     if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
       // Record merge mode is required to dictate the merging behavior in 
version 8,
       // playing the same role as the payload class config in version 7.
@@ -63,12 +65,20 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
       ValidationUtils.checkState(null != propToAdd, String.format("Couldn't 
infer (%s) from (%s) class name",
           HoodieTableConfig.RECORD_MERGE_MODE.key(), payloadClassName));
 
-      Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
       tablePropsToAdd.put(HoodieTableConfig.RECORD_MERGE_MODE, propToAdd);
+    }
+
+    upgradePartitionFields(config, tableConfig, tablePropsToAdd);
+
+    return tablePropsToAdd;
+  }
 
-      return tablePropsToAdd;
-    } else {
-      return Collections.emptyMap();
+  private static void upgradePartitionFields(HoodieWriteConfig config, 
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+    String keyGenerator = tableConfig.getKeyGeneratorClassName();
+    String partitionPathField = 
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+    if (keyGenerator != null && partitionPathField != null
+        && (keyGenerator.equals(KeyGeneratorType.CUSTOM.getClassName()) || 
keyGenerator.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName()))) {
+      tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, 
partitionPathField);
     }
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index bd094464096..9df55526e2f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, 
CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, 
GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, 
NonpartitionedKeyGenerator}
+import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, 
BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, 
GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, 
NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator}
 
 object SparkKeyGenUtils {
 
@@ -32,7 +32,16 @@ object SparkKeyGenUtils {
    */
   def getPartitionColumns(props: TypedProperties): String = {
     val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
-    getPartitionColumns(keyGenerator, props)
+    getPartitionColumns(keyGenerator, props, false)
+  }
+
+  /**
+   * @param properties config properties
+   * @return partition columns
+   */
+  def getPartitionColumnsForKeyGenerator(props: TypedProperties): String = {
+    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+    getPartitionColumns(keyGenerator, props, true)
   }
 
   /**
@@ -46,14 +55,14 @@ object SparkKeyGenUtils {
     } else {
       
HoodieSparkKeyGeneratorFactory.createKeyGenerator(KeyGenClassNameOption.get, 
props)
     }
-    getPartitionColumns(keyGenerator, props)
+    getPartitionColumns(keyGenerator, props, false)
   }
 
   /**
    * @param keyGen key generator class name
    * @return partition columns
    */
-  def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties: 
TypedProperties): String = {
+  def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties: 
TypedProperties, includeKeyGenPartitionType: Boolean): String = {
     // For {@link AutoRecordGenWrapperKeyGenerator} or {@link 
AutoRecordGenWrapperAvroKeyGenerator},
     // get the base key generator for the partition paths
     var baseKeyGen = keyGenClass match {
@@ -65,10 +74,15 @@ object SparkKeyGenUtils {
     // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path 
filed format
     // is: "field_name: field_type", we extract the field_name from the 
partition path field.
     if (baseKeyGen.isInstanceOf[CustomKeyGenerator] || 
baseKeyGen.isInstanceOf[CustomAvroKeyGenerator]) {
-      
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
-        .split(",").map(pathField => {
-        pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
-          .headOption.getOrElse(s"Illegal partition path field format: 
'$pathField' for ${baseKeyGen}")}).mkString(",")
+      val partitionFields = 
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+      if (includeKeyGenPartitionType) {
+        partitionFields
+      } else {
+        partitionFields.split(",").map(pathField => {
+          pathField.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)
+            .headOption.getOrElse(s"Illegal partition path field format: 
'$pathField' for ${baseKeyGen}")
+        }).mkString(",")
+      }
     } else if (baseKeyGen.isInstanceOf[NonpartitionedKeyGenerator]
       || baseKeyGen.isInstanceOf[NonpartitionedAvroKeyGenerator]
       || baseKeyGen.isInstanceOf[GlobalDeleteKeyGenerator]
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index a77ca846692..86fc42e35bd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -113,10 +113,14 @@ public class HoodieConfig implements Serializable {
   }
 
   public <T> boolean contains(ConfigProperty<T> configProperty) {
-    if (props.containsKey(configProperty.key())) {
+    return contains(configProperty, this);
+  }
+
+  public static <T> boolean contains(ConfigProperty<T> configProperty, 
HoodieConfig config) {
+    if (config.getProps().containsKey(configProperty.key())) {
       return true;
     }
-    return 
configProperty.getAlternatives().stream().anyMatch(props::containsKey);
+    return configProperty.getAlternatives().stream().anyMatch(k -> 
config.getProps().containsKey(k));
   }
 
   private <T> Option<Object> getRawValue(ConfigProperty<T> configProperty) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 6053278d831..f82a3471686 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.BinaryUtil;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -114,7 +115,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   public static final ConfigProperty<HoodieTableVersion> VERSION = 
ConfigProperty
       .key("hoodie.table.version")
-      .defaultValue(HoodieTableVersion.ZERO)
+      .defaultValue(HoodieTableVersion.current())
       .withDocumentation("Version of table, used for running upgrade/downgrade 
steps between releases with potentially "
           + "breaking/backwards compatible changes.");
 
@@ -135,7 +136,7 @@ public class HoodieTableConfig extends HoodieConfig {
       .key("hoodie.table.partition.fields")
       .noDefaultValue()
       .withDocumentation("Fields used to partition the table. Concatenated 
values of these fields are used as "
-          + "the partition path, by invoking toString()");
+          + "the partition path, by invoking toString(). These fields also 
include the partition type which is used by custom key generators");
 
   public static final ConfigProperty<String> RECORDKEY_FIELDS = ConfigProperty
       .key("hoodie.table.recordkey.fields")
@@ -534,9 +535,7 @@ public class HoodieTableConfig extends HoodieConfig {
    * @return the hoodie.table.version from hoodie.properties file.
    */
   public HoodieTableVersion getTableVersion() {
-    return contains(VERSION)
-        ? HoodieTableVersion.versionFromCode(getInt(VERSION))
-        : VERSION.defaultValue();
+    return HoodieTableConfigUtils.getTableVersion(this);
   }
 
   /**
@@ -585,11 +584,7 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   public Option<String[]> getPartitionFields() {
-    if (contains(PARTITION_FIELDS)) {
-      return Option.of(Arrays.stream(getString(PARTITION_FIELDS).split(","))
-          .filter(p -> p.length() > 
0).collect(Collectors.toList()).toArray(new String[] {}));
-    }
-    return Option.empty();
+    return HoodieTableConfigUtils.getPartitionFields(this);
   }
 
   public boolean isTablePartitioned() {
@@ -612,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig {
   public String getPartitionFieldProp() {
     // NOTE: We're adding a stub returning empty string to stay compatible w/ 
pre-existing
     //       behavior until this method is fully deprecated
-    return Option.ofNullable(getString(PARTITION_FIELDS)).orElse("");
+    return HoodieTableConfigUtils.getPartitionFieldProp(this).orElse("");
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
index f3ed871e125..2fa321080f2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
@@ -65,4 +65,8 @@ public enum HoodieTableVersion {
         .filter(v -> v.versionCode == versionCode).findAny()
         .orElseThrow(() -> new HoodieException("Unknown versionCode:" + 
versionCode));
   }
+
+  public boolean greaterThan(HoodieTableVersion other) {
+    return this.versionCode > other.versionCode;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
new file mode 100644
index 00000000000..853c4ddd6b1
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class HoodieTableConfigUtils {
+
+  /**
+   * This function returns the partition fields joined by 
BaseKeyGenerator.FIELD_SEPARATOR. It will also
+   * include the key generator partition type with the field. The key 
generator partition type is used for
+   * Custom Key Generator.
+   */
+  public static Option<String> 
getPartitionFieldPropForKeyGenerator(HoodieConfig config) {
+    return 
Option.ofNullable(config.getString(HoodieTableConfig.PARTITION_FIELDS));
+  }
+
+  /**
+   * This function returns the partition fields joined by 
BaseKeyGenerator.FIELD_SEPARATOR. It will
+   * strip the partition key generator related info from the fields.
+   */
+  public static Option<String> getPartitionFieldProp(HoodieConfig config) {
+    if (getTableVersion(config).greaterThan(HoodieTableVersion.SEVEN)) {
+      // With table version eight, the table config 
org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS
+      // stores the corresponding partition type as well. This partition type 
is useful for CustomKeyGenerator
+      // and CustomAvroKeyGenerator.
+      return getPartitionFields(config).map(fields -> 
String.join(BaseKeyGenerator.FIELD_SEPARATOR, fields));
+    } else {
+      return 
Option.ofNullable(config.getString(HoodieTableConfig.PARTITION_FIELDS));
+    }
+  }
+
+  /**
+   * This function returns the partition fields only. This method strips the 
key generator related
+   * partition key types from the configured fields.
+   */
+  public static Option<String[]> getPartitionFields(HoodieConfig config) {
+    if (HoodieConfig.contains(HoodieTableConfig.PARTITION_FIELDS, config)) {
+      return 
Option.of(Arrays.stream(config.getString(HoodieTableConfig.PARTITION_FIELDS).split(","))
+          .filter(p -> !p.isEmpty())
+          .map(p -> getPartitionFieldWithoutKeyGenPartitionType(p, config))
+          .collect(Collectors.toList()).toArray(new String[] {}));
+    }
+    return Option.empty();
+  }
+
+  /**
+   * This function returns the partition fields only. The input partition 
field would contain partition
+   * type corresponding to the custom key generator if table version is eight 
and if custom key
+   * generator is configured. This function would strip the partition type and 
return the partition field.
+   */
+  public static String getPartitionFieldWithoutKeyGenPartitionType(String 
partitionField, HoodieConfig config) {
+    return getTableVersion(config).greaterThan(HoodieTableVersion.SEVEN)
+        ? 
partitionField.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)[0]
+        : partitionField;
+  }
+
+
+  /**
+   * This function returns the hoodie.table.version from hoodie.properties 
file.
+   */
+  public static HoodieTableVersion getTableVersion(HoodieConfig config) {
+    return HoodieConfig.contains(HoodieTableConfig.VERSION, config)
+        ? 
HoodieTableVersion.versionFromCode(config.getInt(HoodieTableConfig.VERSION))
+        : HoodieTableConfig.VERSION.defaultValue();
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java 
b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index c5a13dd49c1..76ed82883b7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -33,6 +33,8 @@ import java.util.List;
 public abstract class BaseKeyGenerator extends KeyGenerator {
 
   public static final String EMPTY_PARTITION = "";
+  public static final String CUSTOM_KEY_GENERATOR_SPLIT_REGEX = ":";
+  public static final String FIELD_SEPARATOR = ",";
   protected List<String> recordKeyFields;
   protected List<String> partitionPathFields;
   protected final boolean encodePartitionPath;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index acf87c14307..dfdedf89658 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -188,6 +188,10 @@ public class HoodieTestUtils {
             .setTableType(tableType)
             .setPayloadClass(HoodieAvroPayload.class);
 
+    if (properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()) != 
null) {
+      
builder.setKeyGeneratorType(properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()));
+    }
+
     String keyGen = 
properties.getProperty("hoodie.datasource.write.keygenerator.class");
     if (!Objects.equals(keyGen, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
         && 
!properties.containsKey("hoodie.datasource.write.partitionpath.field")) {
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index c2532dca7e9..ee67be355bd 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -103,7 +103,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
   public static final ConfigProperty<String> BIGQUERY_SYNC_PARTITION_FIELDS = 
ConfigProperty
       .key("hoodie.gcp.bigquery.sync.partition_fields")
       .noDefaultValue()
-      .withInferFunction(cfg -> 
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+      .withInferFunction(cfg -> 
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
           .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
       .markAdvanced()
       .withDocumentation("Comma-delimited partition fields. Default to 
non-partitioned.");
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 9e5ad70fdca..e881001e894 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -43,6 +44,7 @@ import java.util.concurrent.Future;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
 import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -202,4 +204,19 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     readerFuture.get();
     executor.shutdown();
   }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SEVEN", "EIGHT"})
+  public void testPartitionFields(HoodieTableVersion version) {
+    Properties updatedProps = new Properties();
+    updatedProps.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), 
version.greaterThan(HoodieTableVersion.SEVEN) ? "p1:simple,p2:timestamp" : 
"p1,p2");
+    updatedProps.setProperty(HoodieTableConfig.VERSION.key(), 
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+    HoodieTableConfig.update(storage, metaPath, updatedProps);
+
+    // Test makes sure that the partition fields returned by table config do 
not have partition type
+    // to ensure backward compatibility for the API
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null);
+    assertArrayEquals(new String[] {"p1", "p2"}, 
config.getPartitionFields().get());
+    assertEquals("p1,p2", config.getPartitionFieldProp());
+  }
 }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index f8eb9d08837..e62c7defecf 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -34,7 +34,6 @@ import org.apache.hudi.connect.writers.KafkaConnectConfigs;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.keygen.CustomAvroKeyGenerator;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -185,14 +184,7 @@ public class KafkaConnectUtils {
    * @param typedProperties properties from the config.
    * @return partition columns Returns the partition columns separated by 
comma.
    */
-  public static String getPartitionColumns(KeyGenerator keyGenerator, 
TypedProperties typedProperties) {
-    if (keyGenerator instanceof CustomAvroKeyGenerator) {
-      return ((BaseKeyGenerator) 
keyGenerator).getPartitionPathFields().stream().map(
-          pathField -> 
Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX))
-              .findFirst().orElseGet(() -> "Illegal partition path field 
format: '$pathField' for ${c.getClass.getSimpleName}"))
-          .collect(Collectors.joining(","));
-    }
-
+  public static String getPartitionColumnsForKeyGenerator(KeyGenerator 
keyGenerator, TypedProperties typedProperties) {
     if (keyGenerator instanceof BaseKeyGenerator) {
       return String.join(",", ((BaseKeyGenerator) 
keyGenerator).getPartitionPathFields());
     }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index 67123bbe3df..4ab2c5eae53 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -88,7 +88,7 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
       KeyGenerator keyGenerator = 
HoodieAvroKeyGeneratorFactory.createAvroKeyGeneratorByType(
           new TypedProperties(connectConfigs.getProps()));
       String recordKeyFields = 
KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
-      String partitionColumns = 
KafkaConnectUtils.getPartitionColumns(keyGenerator,
+      String partitionColumns = 
KafkaConnectUtils.getPartitionColumnsForKeyGenerator(keyGenerator,
           new TypedProperties(connectConfigs.getProps()));
 
       LOG.info(String.format("Setting record key %s and partition fields %s 
for table %s",
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 97a1cc29cc3..351f68709ff 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -279,7 +279,8 @@ class HoodieSparkSqlWriterInternal {
     } else {
       // Handle various save modes
       handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, 
tblName, operation, fs)
-      val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
+      val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters), 
false)
+      val partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters), 
true)
       val timelineTimeZone = 
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
       val tableMetaClient = if (tableExists) {
         HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
@@ -310,7 +311,7 @@ class HoodieSparkSqlWriterInternal {
           // we can't fetch preCombine field from hoodieConfig object, since 
it falls back to "ts" as default value,
           // but we are interested in what user has set, hence fetching from 
optParams.
           .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), 
null))
-          .setPartitionFields(partitionColumns)
+          .setPartitionFields(partitionColumnsForKeyGenerator)
           .setPopulateMetaFields(populateMetaFields)
           .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
           
.setSecondaryKeyFields(hoodieConfig.getString(SECONDARYKEY_COLUMN_NAME))
@@ -724,7 +725,7 @@ class HoodieSparkSqlWriterInternal {
 
       if (!tableExists) {
         val archiveLogFolder = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
-        val partitionColumns = 
HoodieWriterUtils.getPartitionColumns(parameters)
+        val partitionColumnsWithType = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters))
         val recordKeyFields = 
hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
         val payloadClass =
           if 
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)))
@@ -755,7 +756,7 @@ class HoodieSparkSqlWriterInternal {
           .setBootstrapIndexClass(bootstrapIndexClass)
           .setBaseFileFormat(baseFileFormat)
           .setBootstrapBasePath(bootstrapBasePath)
-          .setPartitionFields(partitionColumns)
+          .setPartitionFields(partitionColumnsWithType)
           
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
           
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
           .setPopulateMetaFields(populateMetaFields)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index e613d974143..f88eb5ad690 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -24,6 +24,7 @@ import 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.HoodieTableConfigUtils
 import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
@@ -221,7 +222,7 @@ object HoodieWriterUtils {
         } else {
           SparkKeyGenUtils.getPartitionColumns(validatedKeyGenClassName, 
TypedProperties.fromMap(params.asJava))
         }
-        val tableConfigPartitionFields = 
tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
+        val tableConfigPartitionFields = 
HoodieTableConfigUtils.getPartitionFieldProp(tableConfig).orElse(null)
         if (null != datasourcePartitionFields && null != 
tableConfigPartitionFields
           && currentPartitionFields != tableConfigPartitionFields) {
           
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index 775a7290cd0..a886aedc61c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -149,7 +149,7 @@ public class BootstrapExecutorUtils implements Serializable 
{
      * Schema provider that supplies the command for reading the input and 
writing out the target table.
      */
     SchemaProvider schemaProvider = 
createSchemaProvider(cfg.schemaProviderClass, props, jssc);
-    String keyGenClass = genKeyGenClassAndPartitionColumns().getLeft();
+    String keyGenClass = 
genKeyGenClassAndPartitionColumnsForKeyGenerator().getLeft();
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
             
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
@@ -230,9 +230,9 @@ public class BootstrapExecutorUtils implements Serializable 
{
             + ". Cannot bootstrap data on top of an existing table");
       }
     }
-    Pair<String, String> keyGenClassAndParCols = 
genKeyGenClassAndPartitionColumns();
+    Pair<String, String> keyGenClassAndParColsForKeyGenerator = 
genKeyGenClassAndPartitionColumnsForKeyGenerator();
     Map<String, Object> timestampKeyGeneratorConfigs =
-        
extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenClassAndParCols.getLeft(),
 props);
+        
extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenClassAndParColsForKeyGenerator.getLeft(),
 props);
 
     HoodieTableMetaClient.PropertyBuilder builder = 
HoodieTableMetaClient.withPropertyBuilder()
         .fromProperties(props)
@@ -267,13 +267,13 @@ public class BootstrapExecutorUtils implements 
Serializable {
             PARTITION_METAFILE_USE_BASE_FORMAT.key(),
             PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
         .set(timestampKeyGeneratorConfigs)
-        .setKeyGeneratorClassProp(keyGenClassAndParCols.getLeft())
-        .setPartitionFields(keyGenClassAndParCols.getRight());
+        
.setKeyGeneratorClassProp(keyGenClassAndParColsForKeyGenerator.getLeft())
+        .setPartitionFields(keyGenClassAndParColsForKeyGenerator.getRight());
 
     
builder.initTable(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()),
 cfg.basePath);
   }
 
-  private Pair<String, String> genKeyGenClassAndPartitionColumns() {
+  private Pair<String, String> 
genKeyGenClassAndPartitionColumnsForKeyGenerator() {
     String keyGenClass;
     if 
(StringUtils.nonEmpty(props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
 null))) {
       keyGenClass = 
props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key());
@@ -283,15 +283,15 @@ public class BootstrapExecutorUtils implements 
Serializable {
       keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new 
HoodieConfig(props));
     }
     props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass);
-    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+    String partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
 
-    if (StringUtils.isNullOrEmpty(partitionColumns)) {
-      partitionColumns = null;
+    if (StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
+      partitionColumnsForKeyGenerator = null;
       if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) {
         keyGenClass = NonpartitionedKeyGenerator.class.getName();
       }
     }
-    return Pair.of(keyGenClass, partitionColumns);
+    return Pair.of(keyGenClass, partitionColumnsForKeyGenerator);
   }
 
   private Map<String, Object> 
extractConfigsRelatedToTimestampBasedKeyGenerator(String keyGenerator, 
TypedProperties params) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
new file mode 100644
index 00000000000..91efac9f0be
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.util.HoodieTableConfigUtils
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+class TestSevenToEightUpgrade extends RecordLevelIndexTestBase {
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testPartitionFieldsWithUpgrade(tableType: HoodieTableType): Unit = {
+    val partitionFields = "partition:simple"
+    val hudiOpts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
KeyGeneratorType.CUSTOM.getClassName,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> partitionFields)
+
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+    metaClient = getLatestMetaClient(true)
+
+    // assert table version is eight and the partition fields in table config 
has partition type
+    assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
+    assertEquals(partitionFields, 
HoodieTableConfigUtils.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+    // downgrade table props to version seven
+    // assert table version is seven and the partition fields in table config 
does not have partition type
+    new UpgradeDowngrade(metaClient, getWriteConfig(hudiOpts), context, 
SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.SEVEN, null)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(HoodieTableVersion.SEVEN, 
metaClient.getTableConfig.getTableVersion)
+    assertEquals("partition", 
HoodieTableConfigUtils.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+    // auto upgrade the table
+    // assert table version is eight and the partition fields in table config 
has partition type
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      validate = false)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
+    assertEquals(partitionFields, 
HoodieTableConfigUtils.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+  }
+}
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index b6ee02355ff..17caaa9c01e 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -24,10 +24,10 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.HadoopConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -109,7 +109,7 @@ public class HoodieSyncConfig extends HoodieConfig {
   public static final ConfigProperty<String> META_SYNC_PARTITION_FIELDS = 
ConfigProperty
       .key("hoodie.datasource.hive_sync.partition_fields")
       .defaultValue("")
-      .withInferFunction(cfg -> 
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+      .withInferFunction(cfg -> 
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
           .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
       .markAdvanced()
       .withDocumentation("Field in the table to use for determining hive 
partition columns.");
@@ -122,7 +122,7 @@ public class HoodieSyncConfig extends HoodieConfig {
         if (StringUtils.nonEmpty(cfg.getString(META_SYNC_PARTITION_FIELDS))) {
           partitionFieldsOpt = 
Option.ofNullable(cfg.getString(META_SYNC_PARTITION_FIELDS));
         } else {
-          partitionFieldsOpt = 
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+          partitionFieldsOpt = 
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
               .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
         }
         if (!partitionFieldsOpt.isPresent()) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
index 7d84d41dd4f..dd1386c593e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
@@ -233,9 +233,9 @@ public class BootstrapExecutor implements Serializable {
         .setPartitionMetafileUseBaseFormat(props.getBoolean(
             PARTITION_METAFILE_USE_BASE_FORMAT.key(),
             PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
-    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
-    if (!StringUtils.isNullOrEmpty(partitionColumns)) {
-      builder.setPartitionFields(partitionColumns).setKeyGeneratorClassProp(
+    String partitionColumnsForKeyGenerator = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
+    if (!StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
+      
builder.setPartitionFields(partitionColumnsForKeyGenerator).setKeyGeneratorClassProp(
           props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
SimpleKeyGenerator.class.getName()));
     } else {
       builder.setKeyGeneratorClassProp(props.getString(
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 1581eb873b8..b83099dbd3f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -398,7 +398,7 @@ public class StreamSync implements Serializable, Closeable {
   private void initializeEmptyTable() throws IOException {
     this.commitsTimelineOpt = Option.empty();
     this.allCommitsTimelineOpt = Option.empty();
-    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+    String partitionColumns = 
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
     HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(cfg.tableType)
         .setTableName(cfg.targetTableName)

Reply via email to