This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new af8080991d5 [HUDI-7996] Store partition type with partition fields in
table configs (#11638)
af8080991d5 is described below
commit af8080991d5550ec6427d50d72e6b99c5094b432
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Aug 5 21:19:22 2024 +0530
[HUDI-7996] Store partition type with partition fields in table configs
(#11638)
Store partition type as well in `hoodie.table.partition.fields`.
Currently this config stores the commas separated partition fields
without the partition type. Partition type here corresponds to the
partition type supported in Custom key generators like `simple`
and `timestamp`.
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/config/GlueCatalogSyncClientConfig.java | 4 +-
.../upgrade/EightToSevenDowngradeHandler.java | 22 +++++-
.../table/upgrade/SevenToEightUpgradeHandler.java | 22 ++++--
.../org/apache/hudi/util/SparkKeyGenUtils.scala | 30 ++++++--
.../apache/hudi/common/config/HoodieConfig.java | 8 +-
.../hudi/common/table/HoodieTableConfig.java | 17 ++---
.../hudi/common/table/HoodieTableVersion.java | 4 +
.../hudi/common/util/HoodieTableConfigUtils.java | 88 ++++++++++++++++++++++
.../org/apache/hudi/keygen/BaseKeyGenerator.java | 2 +
.../hudi/common/testutils/HoodieTestUtils.java | 4 +
.../hudi/gcp/bigquery/BigQuerySyncConfig.java | 4 +-
.../hudi/common/table/TestHoodieTableConfig.java | 17 +++++
.../hudi/connect/utils/KafkaConnectUtils.java | 10 +--
.../writers/KafkaConnectTransactionServices.java | 2 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 9 ++-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 3 +-
.../apache/hudi/cli/BootstrapExecutorUtils.java | 20 ++---
.../hudi/functional/TestSevenToEightUpgrade.scala | 71 +++++++++++++++++
.../apache/hudi/sync/common/HoodieSyncConfig.java | 6 +-
.../hudi/utilities/streamer/BootstrapExecutor.java | 6 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 2 +-
21 files changed, 286 insertions(+), 65 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
index 24954b87fbc..fca0c6f1461 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -83,7 +83,7 @@ public class GlueCatalogSyncClientConfig extends HoodieConfig
{
public static final ConfigProperty<String> META_SYNC_PARTITION_INDEX_FIELDS
= ConfigProperty
.key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields")
.noDefaultValue()
- .withInferFunction(cfg ->
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+ .withInferFunction(cfg ->
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
.or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
.sinceVersion("0.15.0")
.withDocumentation(String.join(" ", "Specify the partitions fields to
index on aws glue. Separate the fields by semicolon.",
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 60d387ef439..f9666acc0ed 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -20,17 +20,21 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,6 +52,7 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
UpgradeDowngradeUtils.runCompaction(table, context, config,
upgradeDowngradeHelper);
UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
@@ -73,6 +78,19 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
return false;
}, instants.size());
}
- return Collections.emptyMap();
+
+ downgradePartitionFields(config, context, upgradeDowngradeHelper,
tablePropsToAdd);
+ return tablePropsToAdd;
+ }
+
+ private static void downgradePartitionFields(HoodieWriteConfig config,
HoodieEngineContext context, SupportsUpgradeDowngrade upgradeDowngradeHelper,
+ Map<ConfigProperty, String>
tablePropsToAdd) {
+ HoodieTableConfig tableConfig = upgradeDowngradeHelper.getTable(config,
context).getMetaClient().getTableConfig();
+ String keyGenerator = tableConfig.getKeyGeneratorClassName();
+ String partitionPathField =
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+ if (keyGenerator != null && partitionPathField != null
+ && (keyGenerator.equals(KeyGeneratorType.CUSTOM.getClassName()) ||
keyGenerator.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName()))) {
+ tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS,
tableConfig.getPartitionFieldProp());
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index e614c92928b..df2b7408f38 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -27,9 +27,10 @@ import
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
-import java.util.Collections;
-import java.util.Hashtable;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -43,6 +44,7 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
String instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTableConfig tableConfig =
upgradeDowngradeHelper.getTable(config,
context).getMetaClient().getTableConfig();
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
// Record merge mode is required to dictate the merging behavior in
version 8,
// playing the same role as the payload class config in version 7.
@@ -63,12 +65,20 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
ValidationUtils.checkState(null != propToAdd, String.format("Couldn't
infer (%s) from (%s) class name",
HoodieTableConfig.RECORD_MERGE_MODE.key(), payloadClassName));
- Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
tablePropsToAdd.put(HoodieTableConfig.RECORD_MERGE_MODE, propToAdd);
+ }
+
+ upgradePartitionFields(config, tableConfig, tablePropsToAdd);
+
+ return tablePropsToAdd;
+ }
- return tablePropsToAdd;
- } else {
- return Collections.emptyMap();
+ private static void upgradePartitionFields(HoodieWriteConfig config,
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+ String keyGenerator = tableConfig.getKeyGeneratorClassName();
+ String partitionPathField =
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+ if (keyGenerator != null && partitionPathField != null
+ && (keyGenerator.equals(KeyGeneratorType.CUSTOM.getClassName()) ||
keyGenerator.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName()))) {
+ tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS,
partitionPathField);
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index bd094464096..9df55526e2f 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper,
CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator,
GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator,
NonpartitionedKeyGenerator}
+import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper,
BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator,
GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator,
NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator}
object SparkKeyGenUtils {
@@ -32,7 +32,16 @@ object SparkKeyGenUtils {
*/
def getPartitionColumns(props: TypedProperties): String = {
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
- getPartitionColumns(keyGenerator, props)
+ getPartitionColumns(keyGenerator, props, false)
+ }
+
+ /**
+ * @param properties config properties
+ * @return partition columns
+ */
+ def getPartitionColumnsForKeyGenerator(props: TypedProperties): String = {
+ val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+ getPartitionColumns(keyGenerator, props, true)
}
/**
@@ -46,14 +55,14 @@ object SparkKeyGenUtils {
} else {
HoodieSparkKeyGeneratorFactory.createKeyGenerator(KeyGenClassNameOption.get,
props)
}
- getPartitionColumns(keyGenerator, props)
+ getPartitionColumns(keyGenerator, props, false)
}
/**
* @param keyGen key generator class name
* @return partition columns
*/
- def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties:
TypedProperties): String = {
+ def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties:
TypedProperties, includeKeyGenPartitionType: Boolean): String = {
// For {@link AutoRecordGenWrapperKeyGenerator} or {@link
AutoRecordGenWrapperAvroKeyGenerator},
// get the base key generator for the partition paths
var baseKeyGen = keyGenClass match {
@@ -65,10 +74,15 @@ object SparkKeyGenUtils {
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path
filed format
// is: "field_name: field_type", we extract the field_name from the
partition path field.
if (baseKeyGen.isInstanceOf[CustomKeyGenerator] ||
baseKeyGen.isInstanceOf[CustomAvroKeyGenerator]) {
-
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
- .split(",").map(pathField => {
- pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
- .headOption.getOrElse(s"Illegal partition path field format:
'$pathField' for ${baseKeyGen}")}).mkString(",")
+ val partitionFields =
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+ if (includeKeyGenPartitionType) {
+ partitionFields
+ } else {
+ partitionFields.split(",").map(pathField => {
+ pathField.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)
+ .headOption.getOrElse(s"Illegal partition path field format:
'$pathField' for ${baseKeyGen}")
+ }).mkString(",")
+ }
} else if (baseKeyGen.isInstanceOf[NonpartitionedKeyGenerator]
|| baseKeyGen.isInstanceOf[NonpartitionedAvroKeyGenerator]
|| baseKeyGen.isInstanceOf[GlobalDeleteKeyGenerator]
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index a77ca846692..86fc42e35bd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -113,10 +113,14 @@ public class HoodieConfig implements Serializable {
}
public <T> boolean contains(ConfigProperty<T> configProperty) {
- if (props.containsKey(configProperty.key())) {
+ return contains(configProperty, this);
+ }
+
+ public static <T> boolean contains(ConfigProperty<T> configProperty,
HoodieConfig config) {
+ if (config.getProps().containsKey(configProperty.key())) {
return true;
}
- return
configProperty.getAlternatives().stream().anyMatch(props::containsKey);
+ return configProperty.getAlternatives().stream().anyMatch(k ->
config.getProps().containsKey(k));
}
private <T> Option<Object> getRawValue(ConfigProperty<T> configProperty) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 6053278d831..f82a3471686 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -114,7 +115,7 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<HoodieTableVersion> VERSION =
ConfigProperty
.key("hoodie.table.version")
- .defaultValue(HoodieTableVersion.ZERO)
+ .defaultValue(HoodieTableVersion.current())
.withDocumentation("Version of table, used for running upgrade/downgrade
steps between releases with potentially "
+ "breaking/backwards compatible changes.");
@@ -135,7 +136,7 @@ public class HoodieTableConfig extends HoodieConfig {
.key("hoodie.table.partition.fields")
.noDefaultValue()
.withDocumentation("Fields used to partition the table. Concatenated
values of these fields are used as "
- + "the partition path, by invoking toString()");
+ + "the partition path, by invoking toString(). These fields also
include the partition type which is used by custom key generators");
public static final ConfigProperty<String> RECORDKEY_FIELDS = ConfigProperty
.key("hoodie.table.recordkey.fields")
@@ -534,9 +535,7 @@ public class HoodieTableConfig extends HoodieConfig {
* @return the hoodie.table.version from hoodie.properties file.
*/
public HoodieTableVersion getTableVersion() {
- return contains(VERSION)
- ? HoodieTableVersion.versionFromCode(getInt(VERSION))
- : VERSION.defaultValue();
+ return HoodieTableConfigUtils.getTableVersion(this);
}
/**
@@ -585,11 +584,7 @@ public class HoodieTableConfig extends HoodieConfig {
}
public Option<String[]> getPartitionFields() {
- if (contains(PARTITION_FIELDS)) {
- return Option.of(Arrays.stream(getString(PARTITION_FIELDS).split(","))
- .filter(p -> p.length() >
0).collect(Collectors.toList()).toArray(new String[] {}));
- }
- return Option.empty();
+ return HoodieTableConfigUtils.getPartitionFields(this);
}
public boolean isTablePartitioned() {
@@ -612,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig {
public String getPartitionFieldProp() {
// NOTE: We're adding a stub returning empty string to stay compatible w/
pre-existing
// behavior until this method is fully deprecated
- return Option.ofNullable(getString(PARTITION_FIELDS)).orElse("");
+ return HoodieTableConfigUtils.getPartitionFieldProp(this).orElse("");
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
index f3ed871e125..2fa321080f2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
@@ -65,4 +65,8 @@ public enum HoodieTableVersion {
.filter(v -> v.versionCode == versionCode).findAny()
.orElseThrow(() -> new HoodieException("Unknown versionCode:" +
versionCode));
}
+
+ public boolean greaterThan(HoodieTableVersion other) {
+ return this.versionCode > other.versionCode;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
new file mode 100644
index 00000000000..853c4ddd6b1
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class HoodieTableConfigUtils {
+
+ /**
+ * This function returns the partition fields joined by
BaseKeyGenerator.FIELD_SEPARATOR. It will also
+ * include the key generator partition type with the field. The key
generator partition type is used for
+ * Custom Key Generator.
+ */
+ public static Option<String>
getPartitionFieldPropForKeyGenerator(HoodieConfig config) {
+ return
Option.ofNullable(config.getString(HoodieTableConfig.PARTITION_FIELDS));
+ }
+
+ /**
+ * This function returns the partition fields joined by
BaseKeyGenerator.FIELD_SEPARATOR. It will
+ * strip the partition key generator related info from the fields.
+ */
+ public static Option<String> getPartitionFieldProp(HoodieConfig config) {
+ if (getTableVersion(config).greaterThan(HoodieTableVersion.SEVEN)) {
+ // With table version eight, the table config
org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS
+ // stores the corresponding partition type as well. This partition type
is useful for CustomKeyGenerator
+ // and CustomAvroKeyGenerator.
+ return getPartitionFields(config).map(fields ->
String.join(BaseKeyGenerator.FIELD_SEPARATOR, fields));
+ } else {
+ return
Option.ofNullable(config.getString(HoodieTableConfig.PARTITION_FIELDS));
+ }
+ }
+
+ /**
+ * This function returns the partition fields only. This method strips the
key generator related
+ * partition key types from the configured fields.
+ */
+ public static Option<String[]> getPartitionFields(HoodieConfig config) {
+ if (HoodieConfig.contains(HoodieTableConfig.PARTITION_FIELDS, config)) {
+ return
Option.of(Arrays.stream(config.getString(HoodieTableConfig.PARTITION_FIELDS).split(","))
+ .filter(p -> !p.isEmpty())
+ .map(p -> getPartitionFieldWithoutKeyGenPartitionType(p, config))
+ .collect(Collectors.toList()).toArray(new String[] {}));
+ }
+ return Option.empty();
+ }
+
+ /**
+ * This function returns the partition fields only. The input partition
field would contain partition
+ * type corresponding to the custom key generator if table version is eight
and if custom key
+ * generator is configured. This function would strip the partition type and
return the partition field.
+ */
+ public static String getPartitionFieldWithoutKeyGenPartitionType(String
partitionField, HoodieConfig config) {
+ return getTableVersion(config).greaterThan(HoodieTableVersion.SEVEN)
+ ?
partitionField.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)[0]
+ : partitionField;
+ }
+
+
+ /**
+ * This function returns the hoodie.table.version from hoodie.properties
file.
+ */
+ public static HoodieTableVersion getTableVersion(HoodieConfig config) {
+ return HoodieConfig.contains(HoodieTableConfig.VERSION, config)
+ ?
HoodieTableVersion.versionFromCode(config.getInt(HoodieTableConfig.VERSION))
+ : HoodieTableConfig.VERSION.defaultValue();
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index c5a13dd49c1..76ed82883b7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -33,6 +33,8 @@ import java.util.List;
public abstract class BaseKeyGenerator extends KeyGenerator {
public static final String EMPTY_PARTITION = "";
+ public static final String CUSTOM_KEY_GENERATOR_SPLIT_REGEX = ":";
+ public static final String FIELD_SEPARATOR = ",";
protected List<String> recordKeyFields;
protected List<String> partitionPathFields;
protected final boolean encodePartitionPath;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index acf87c14307..dfdedf89658 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -188,6 +188,10 @@ public class HoodieTestUtils {
.setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class);
+ if (properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()) !=
null) {
+
builder.setKeyGeneratorType(properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()));
+ }
+
String keyGen =
properties.getProperty("hoodie.datasource.write.keygenerator.class");
if (!Objects.equals(keyGen,
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
&&
!properties.containsKey("hoodie.datasource.write.partitionpath.field")) {
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index c2532dca7e9..ee67be355bd 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -103,7 +103,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
public static final ConfigProperty<String> BIGQUERY_SYNC_PARTITION_FIELDS =
ConfigProperty
.key("hoodie.gcp.bigquery.sync.partition_fields")
.noDefaultValue()
- .withInferFunction(cfg ->
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+ .withInferFunction(cfg ->
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
.or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
.markAdvanced()
.withDocumentation("Comma-delimited partition fields. Default to
non-partitioned.");
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 9e5ad70fdca..e881001e894 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -43,6 +44,7 @@ import java.util.concurrent.Future;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -202,4 +204,19 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
readerFuture.get();
executor.shutdown();
}
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SEVEN", "EIGHT"})
+ public void testPartitionFields(HoodieTableVersion version) {
+ Properties updatedProps = new Properties();
+ updatedProps.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(),
version.greaterThan(HoodieTableVersion.SEVEN) ? "p1:simple,p2:timestamp" :
"p1,p2");
+ updatedProps.setProperty(HoodieTableConfig.VERSION.key(),
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+ HoodieTableConfig.update(storage, metaPath, updatedProps);
+
+ // Test makes sure that the partition fields returned by table config do
not have partition type
+ // to ensure backward compatibility for the API
+ HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null,
null);
+ assertArrayEquals(new String[] {"p1", "p2"},
config.getPartitionFields().get());
+ assertEquals("p1,p2", config.getPartitionFieldProp());
+ }
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index f8eb9d08837..e62c7defecf 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -34,7 +34,6 @@ import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StorageConfiguration;
@@ -185,14 +184,7 @@ public class KafkaConnectUtils {
* @param typedProperties properties from the config.
* @return partition columns Returns the partition columns separated by
comma.
*/
- public static String getPartitionColumns(KeyGenerator keyGenerator,
TypedProperties typedProperties) {
- if (keyGenerator instanceof CustomAvroKeyGenerator) {
- return ((BaseKeyGenerator)
keyGenerator).getPartitionPathFields().stream().map(
- pathField ->
Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX))
- .findFirst().orElseGet(() -> "Illegal partition path field
format: '$pathField' for ${c.getClass.getSimpleName}"))
- .collect(Collectors.joining(","));
- }
-
+ public static String getPartitionColumnsForKeyGenerator(KeyGenerator
keyGenerator, TypedProperties typedProperties) {
if (keyGenerator instanceof BaseKeyGenerator) {
return String.join(",", ((BaseKeyGenerator)
keyGenerator).getPartitionPathFields());
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index 67123bbe3df..4ab2c5eae53 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -88,7 +88,7 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
KeyGenerator keyGenerator =
HoodieAvroKeyGeneratorFactory.createAvroKeyGeneratorByType(
new TypedProperties(connectConfigs.getProps()));
String recordKeyFields =
KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
- String partitionColumns =
KafkaConnectUtils.getPartitionColumns(keyGenerator,
+ String partitionColumns =
KafkaConnectUtils.getPartitionColumnsForKeyGenerator(keyGenerator,
new TypedProperties(connectConfigs.getProps()));
LOG.info(String.format("Setting record key %s and partition fields %s
for table %s",
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 97a1cc29cc3..351f68709ff 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -279,7 +279,8 @@ class HoodieSparkSqlWriterInternal {
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig,
tblName, operation, fs)
- val partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
+ val partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters),
false)
+ val partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters),
true)
val timelineTimeZone =
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
@@ -310,7 +311,7 @@ class HoodieSparkSqlWriterInternal {
// we can't fetch preCombine field from hoodieConfig object, since
it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from
optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(),
null))
- .setPartitionFields(partitionColumns)
+ .setPartitionFields(partitionColumnsForKeyGenerator)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setSecondaryKeyFields(hoodieConfig.getString(SECONDARYKEY_COLUMN_NAME))
@@ -724,7 +725,7 @@ class HoodieSparkSqlWriterInternal {
if (!tableExists) {
val archiveLogFolder =
hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
- val partitionColumns =
HoodieWriterUtils.getPartitionColumns(parameters)
+ val partitionColumnsWithType =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters))
val recordKeyFields =
hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val payloadClass =
if
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)))
@@ -755,7 +756,7 @@ class HoodieSparkSqlWriterInternal {
.setBootstrapIndexClass(bootstrapIndexClass)
.setBaseFileFormat(baseFileFormat)
.setBootstrapBasePath(bootstrapBasePath)
- .setPartitionFields(partitionColumns)
+ .setPartitionFields(partitionColumnsWithType)
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setPopulateMetaFields(populateMetaFields)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index e613d974143..f88eb5ad690 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -24,6 +24,7 @@ import
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.HoodieTableConfigUtils
import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
@@ -221,7 +222,7 @@ object HoodieWriterUtils {
} else {
SparkKeyGenUtils.getPartitionColumns(validatedKeyGenClassName,
TypedProperties.fromMap(params.asJava))
}
- val tableConfigPartitionFields =
tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
+ val tableConfigPartitionFields =
HoodieTableConfigUtils.getPartitionFieldProp(tableConfig).orElse(null)
if (null != datasourcePartitionFields && null !=
tableConfigPartitionFields
&& currentPartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index 775a7290cd0..a886aedc61c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -149,7 +149,7 @@ public class BootstrapExecutorUtils implements Serializable
{
* Schema provider that supplies the command for reading the input and
writing out the target table.
*/
SchemaProvider schemaProvider =
createSchemaProvider(cfg.schemaProviderClass, props, jssc);
- String keyGenClass = genKeyGenClassAndPartitionColumns().getLeft();
+ String keyGenClass =
genKeyGenClassAndPartitionColumnsForKeyGenerator().getLeft();
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
@@ -230,9 +230,9 @@ public class BootstrapExecutorUtils implements Serializable
{
+ ". Cannot bootstrap data on top of an existing table");
}
}
- Pair<String, String> keyGenClassAndParCols =
genKeyGenClassAndPartitionColumns();
+ Pair<String, String> keyGenClassAndParColsForKeyGenerator =
genKeyGenClassAndPartitionColumnsForKeyGenerator();
Map<String, Object> timestampKeyGeneratorConfigs =
-
extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenClassAndParCols.getLeft(),
props);
+
extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenClassAndParColsForKeyGenerator.getLeft(),
props);
HoodieTableMetaClient.PropertyBuilder builder =
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(props)
@@ -267,13 +267,13 @@ public class BootstrapExecutorUtils implements
Serializable {
PARTITION_METAFILE_USE_BASE_FORMAT.key(),
PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
.set(timestampKeyGeneratorConfigs)
- .setKeyGeneratorClassProp(keyGenClassAndParCols.getLeft())
- .setPartitionFields(keyGenClassAndParCols.getRight());
+
.setKeyGeneratorClassProp(keyGenClassAndParColsForKeyGenerator.getLeft())
+ .setPartitionFields(keyGenClassAndParColsForKeyGenerator.getRight());
builder.initTable(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()),
cfg.basePath);
}
- private Pair<String, String> genKeyGenClassAndPartitionColumns() {
+ private Pair<String, String>
genKeyGenClassAndPartitionColumnsForKeyGenerator() {
String keyGenClass;
if
(StringUtils.nonEmpty(props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
null))) {
keyGenClass =
props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key());
@@ -283,15 +283,15 @@ public class BootstrapExecutorUtils implements
Serializable {
keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new
HoodieConfig(props));
}
props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass);
- String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+ String partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
- if (StringUtils.isNullOrEmpty(partitionColumns)) {
- partitionColumns = null;
+ if (StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
+ partitionColumnsForKeyGenerator = null;
if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) {
keyGenClass = NonpartitionedKeyGenerator.class.getName();
}
}
- return Pair.of(keyGenClass, partitionColumns);
+ return Pair.of(keyGenClass, partitionColumnsForKeyGenerator);
}
private Map<String, Object>
extractConfigsRelatedToTimestampBasedKeyGenerator(String keyGenerator,
TypedProperties params) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
new file mode 100644
index 00000000000..91efac9f0be
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.util.HoodieTableConfigUtils
+import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+class TestSevenToEightUpgrade extends RecordLevelIndexTestBase {
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testPartitionFieldsWithUpgrade(tableType: HoodieTableType): Unit = {
+ val partitionFields = "partition:simple"
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
KeyGeneratorType.CUSTOM.getClassName,
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> partitionFields)
+
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ validate = false)
+ metaClient = getLatestMetaClient(true)
+
+ // assert table version is eight and the partition fields in table config
has partition type
+ assertEquals(HoodieTableVersion.EIGHT,
metaClient.getTableConfig.getTableVersion)
+ assertEquals(partitionFields,
HoodieTableConfigUtils.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ // downgrade table props to version seven
+ // assert table version is seven and the partition fields in table config
does not have partition type
+ new UpgradeDowngrade(metaClient, getWriteConfig(hudiOpts), context,
SparkUpgradeDowngradeHelper.getInstance)
+ .run(HoodieTableVersion.SEVEN, null)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertEquals(HoodieTableVersion.SEVEN,
metaClient.getTableConfig.getTableVersion)
+ assertEquals("partition",
HoodieTableConfigUtils.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ // auto upgrade the table
+ // assert table version is eight and the partition fields in table config
has partition type
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertEquals(HoodieTableVersion.EIGHT,
metaClient.getTableConfig.getTableVersion)
+ assertEquals(partitionFields,
HoodieTableConfigUtils.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+ }
+}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index b6ee02355ff..17caaa9c01e 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -24,10 +24,10 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.HadoopConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -109,7 +109,7 @@ public class HoodieSyncConfig extends HoodieConfig {
public static final ConfigProperty<String> META_SYNC_PARTITION_FIELDS =
ConfigProperty
.key("hoodie.datasource.hive_sync.partition_fields")
.defaultValue("")
- .withInferFunction(cfg ->
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+ .withInferFunction(cfg ->
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
.or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
.markAdvanced()
.withDocumentation("Field in the table to use for determining hive
partition columns.");
@@ -122,7 +122,7 @@ public class HoodieSyncConfig extends HoodieConfig {
if (StringUtils.nonEmpty(cfg.getString(META_SYNC_PARTITION_FIELDS))) {
partitionFieldsOpt =
Option.ofNullable(cfg.getString(META_SYNC_PARTITION_FIELDS));
} else {
- partitionFieldsOpt =
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+ partitionFieldsOpt =
HoodieTableConfigUtils.getPartitionFieldProp(cfg)
.or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
}
if (!partitionFieldsOpt.isPresent()) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
index 7d84d41dd4f..dd1386c593e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
@@ -233,9 +233,9 @@ public class BootstrapExecutor implements Serializable {
.setPartitionMetafileUseBaseFormat(props.getBoolean(
PARTITION_METAFILE_USE_BASE_FORMAT.key(),
PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
- String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
- if (!StringUtils.isNullOrEmpty(partitionColumns)) {
- builder.setPartitionFields(partitionColumns).setKeyGeneratorClassProp(
+ String partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
+ if (!StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
+
builder.setPartitionFields(partitionColumnsForKeyGenerator).setKeyGeneratorClassProp(
props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
SimpleKeyGenerator.class.getName()));
} else {
builder.setKeyGeneratorClassProp(props.getString(
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 1581eb873b8..b83099dbd3f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -398,7 +398,7 @@ public class StreamSync implements Serializable, Closeable {
private void initializeEmptyTable() throws IOException {
this.commitsTimelineOpt = Option.empty();
this.allCommitsTimelineOpt = Option.empty();
- String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+ String partitionColumns =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)