This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6351e5f [HUDI-2538] persist some configs to hoodie.properties when the first write (#3823) 6351e5f is described below commit 6351e5f4d042b14cd0f6715c36f23d75fcc8e091 Author: Yann Byron <biyan900...@gmail.com> AuthorDate: Wed Nov 3 10:04:23 2021 +0800 [HUDI-2538] persist some configs to hoodie.properties when the first write (#3823) --- .../table/upgrade/TwoToThreeUpgradeHandler.java | 9 +- .../factory/HoodieSparkKeyGeneratorFactory.java | 98 +++++++++------ .../client/functional/TestHoodieMetadataBase.java | 8 +- .../apache/hudi/keygen/TestCustomKeyGenerator.java | 13 +- .../hudi/testutils/HoodieClientTestHarness.java | 13 +- .../apache/hudi/common/config/HoodieConfig.java | 6 +- .../hudi/common/table/HoodieTableConfig.java | 16 +++ .../hudi/common/table/HoodieTableMetaClient.java | 24 ++++ .../common/testutils/HoodieCommonTestHarness.java | 5 + .../main/scala/org/apache/hudi/DefaultSource.scala | 12 +- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 118 +++++++++++++----- .../org/apache/hudi/HoodieStreamingSink.scala | 9 +- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 5 +- .../apache/spark/sql/hudi/HoodieOptionConfig.scala | 9 +- .../hudi/command/CreateHoodieTableCommand.scala | 83 ++++++++----- .../hudi/command/DeleteHoodieTableCommand.scala | 13 +- .../command/InsertIntoHoodieTableCommand.scala | 52 +++++--- .../hudi/command/MergeIntoHoodieTableCommand.scala | 60 ++++----- .../spark/sql/hudi/command/SqlKeyGenerator.scala | 18 ++- .../hudi/command/UpdateHoodieTableCommand.scala | 14 ++- .../apache/hudi/HoodieSparkSqlWriterSuite.scala | 137 ++++++++++++++++----- .../org/apache/hudi/TestHoodieFileIndex.scala | 9 ++ .../functional/TestDataSourceForBootstrap.scala | 8 +- .../apache/hudi/functional/TestMORDataSource.scala | 3 + .../hudi/functional/TestTimeTravelQuery.scala | 1 + 25 files changed, 540 insertions(+), 203 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java index 6a825e1..e1dbfbb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java @@ -21,10 +21,11 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieTableMetadataUtil; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -39,6 +40,10 @@ public class TwoToThreeUpgradeHandler implements UpgradeHandler { // table has been updated and is not backward compatible. HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context); } - return Collections.emptyMap(); + Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>(); + tablePropsToAdd.put(HoodieTableConfig.URL_ENCODE_PARTITIONING, config.getStringOrDefault(HoodieTableConfig.URL_ENCODE_PARTITIONING)); + tablePropsToAdd.put(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, config.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); + tablePropsToAdd.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, config.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)); + return tablePropsToAdd; } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java index d4e99f7..165b27d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java @@ -19,14 +19,13 @@ package org.apache.hudi.keygen.factory; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieKeyGeneratorException; -import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.ComplexKeyGenerator; import org.apache.hudi.keygen.CustomKeyGenerator; import org.apache.hudi.keygen.GlobalDeleteKeyGenerator; -import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; @@ -37,8 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.Locale; -import java.util.Objects; +import java.util.Map; /** * Factory help to create {@link org.apache.hudi.keygen.KeyGenerator}. @@ -50,45 +50,73 @@ public class HoodieSparkKeyGeneratorFactory { private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkKeyGeneratorFactory.class); - public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { - // keyGenerator class name has higher priority - KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props); + private static final Map<String, String> COMMON_TO_SPARK_KEYGENERATOR = new HashMap<>(); + static { + COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.ComplexAvroKeyGenerator", + "org.apache.hudi.keygen.ComplexKeyGenerator"); + COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.CustomAvroKeyGenerator", + "org.apache.hudi.keygen.CustomKeyGenerator"); + COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator", + "org.apache.hudi.keygen.GlobalDeleteKeyGenerator"); + COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator", + "org.apache.hudi.keygen.NonpartitionedKeyGenerator"); + COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.SimpleAvroKeyGenerator", + "org.apache.hudi.keygen.SimpleKeyGenerator"); + COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator", + "org.apache.hudi.keygen.TimestampBasedKeyGenerator"); + } - return Objects.isNull(keyGenerator) ? createKeyGeneratorByType(props) : keyGenerator; + public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { + String keyGeneratorClass = getKeyGeneratorClassName(props); + try { + return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); + } catch (Throwable e) { + throw new IOException("Could not load key generator class " + keyGeneratorClass, e); + } } - private static BuiltinKeyGenerator createKeyGeneratorByType(TypedProperties props) throws IOException { - // Use KeyGeneratorType.SIMPLE as default keyGeneratorType - String keyGeneratorType = - props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null); + public static String getKeyGeneratorClassName(TypedProperties props) { + String keyGeneratorClass = props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), null); - if (StringUtils.isNullOrEmpty(keyGeneratorType)) { + if (StringUtils.isNullOrEmpty(keyGeneratorClass)) { + String keyGeneratorType = props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.SIMPLE.name()); LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE.key()); - keyGeneratorType = KeyGeneratorType.SIMPLE.name(); - } - - KeyGeneratorType keyGeneratorTypeEnum; - try { - keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT)); - } catch (IllegalArgumentException e) { - throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType); - } - switch (keyGeneratorTypeEnum) { - case SIMPLE: - return new SimpleKeyGenerator(props); - case COMPLEX: - return new ComplexKeyGenerator(props); - case TIMESTAMP: - return new TimestampBasedKeyGenerator(props); - case CUSTOM: - return new CustomKeyGenerator(props); - case NON_PARTITION: - return new NonpartitionedKeyGenerator(props); - case GLOBAL_DELETE: - return new GlobalDeleteKeyGenerator(props); - default: + KeyGeneratorType keyGeneratorTypeEnum; + try { + keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType); + } + switch (keyGeneratorTypeEnum) { + case SIMPLE: + keyGeneratorClass = SimpleKeyGenerator.class.getName(); + break; + case COMPLEX: + keyGeneratorClass = ComplexKeyGenerator.class.getName(); + break; + case TIMESTAMP: + keyGeneratorClass = TimestampBasedKeyGenerator.class.getName(); + break; + case CUSTOM: + keyGeneratorClass = CustomKeyGenerator.class.getName(); + break; + case NON_PARTITION: + keyGeneratorClass = NonpartitionedKeyGenerator.class.getName(); + break; + case GLOBAL_DELETE: + keyGeneratorClass = GlobalDeleteKeyGenerator.class.getName(); + break; + default: + throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType); + } } + return keyGeneratorClass; } + /** + * Convert hoodie-common KeyGenerator to SparkKeyGeneratorInterface implement. + */ + public static String convertToSparkKeyGenerator(String keyGeneratorClassName) { + return COMMON_TO_SPARK_KEYGENERATOR.getOrDefault(keyGeneratorClassName, keyGeneratorClassName); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 7a49daf..cf261cc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; @@ -33,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; @@ -50,6 +52,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Properties; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -268,6 +271,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics, boolean enableFullScan) { + Properties properties = new Properties(); + properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) .withAutoCommit(autoCommit) @@ -287,7 +292,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) .withExecutorMetrics(true).build()) .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() - .usePrefix("unit-test").build()); + .usePrefix("unit-test").build()) + .withProperties(properties); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java index 4bfc71f..4b590d9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java @@ -33,6 +33,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { @@ -122,6 +124,13 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { return properties; } + private String stackTraceToString(Throwable e) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + return sw.toString(); + } + @Test public void testSimpleKeyGeneratorWithKeyGeneratorClass() throws IOException { testSimpleKeyGenerator(getPropertiesForSimpleKeyGen(true)); @@ -259,7 +268,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { .getMessage() .contains("Property hoodie.datasource.write.recordkey.field not found")); } else { - Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found")); + Assertions.assertTrue(stackTraceToString(e).contains("Property hoodie.datasource.write.recordkey.field not found")); } } @@ -282,7 +291,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { .getMessage() .contains("Property hoodie.datasource.write.recordkey.field not found")); } else { - Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found")); + Assertions.assertTrue(stackTraceToString(e).contains("Property hoodie.datasource.write.recordkey.field not found")); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 8c0a3bd..9ed98b1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -249,7 +249,15 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im initMetaClient(getTableType()); } + protected void initMetaClient(Properties properties) throws IOException { + initMetaClient(getTableType(), properties); + } + protected void initMetaClient(HoodieTableType tableType) throws IOException { + initMetaClient(tableType, new Properties()); + } + + protected void initMetaClient(HoodieTableType tableType, Properties properties) throws IOException { if (basePath == null) { throw new IllegalStateException("The base path has not been initialized."); } @@ -258,7 +266,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im throw new IllegalStateException("The Spark context has not been initialized."); } - metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); + if (tableName != null && !tableName.isEmpty()) { + properties.put(HoodieTableConfig.NAME.key(), tableName); + } + metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType, properties); } protected Properties getPropertiesForKeyGen() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index 1f646aa..ed2b90e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -74,6 +74,10 @@ public class HoodieConfig implements Serializable { } } + public Boolean contains(String key) { + return props.containsKey(key); + } + public <T> boolean contains(ConfigProperty<T> configProperty) { if (props.containsKey(configProperty.key())) { return true; @@ -135,7 +139,7 @@ public class HoodieConfig implements Serializable { public <T> boolean getBooleanOrDefault(ConfigProperty<T> configProperty) { Option<Object> rawValue = getRawValue(configProperty); return rawValue.map(v -> Boolean.parseBoolean(v.toString())) - .orElse((Boolean) configProperty.defaultValue()); + .orElse(Boolean.parseBoolean(configProperty.defaultValue().toString())); } public <T> Long getLong(ConfigProperty<T> configProperty) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 129bcce..dc57fd1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.avro.Schema; import org.apache.hadoop.fs.FSDataInputStream; @@ -161,6 +162,9 @@ public class HoodieTableConfig extends HoodieConfig { .noDefaultValue() .withDocumentation("Key Generator class property for the hoodie table"); + public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; + public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; + public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName(); public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { @@ -363,6 +367,18 @@ public class HoodieTableConfig extends HoodieConfig { return getString(RECORDKEY_FIELDS); } + public String getKeyGeneratorClassName() { + return getString(KEY_GENERATOR_CLASS_NAME); + } + + public String getHiveStylePartitioningEnable() { + return getString(HIVE_STYLE_PARTITIONING_ENABLE); + } + + public String getUrlEncodePartitoning() { + return getString(URL_ENCODE_PARTITIONING); + } + public Map<String, String> propsMap() { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 340a99e..450a3cc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -637,6 +637,8 @@ public class HoodieTableMetaClient implements Serializable { private Boolean bootstrapIndexEnable; private Boolean populateMetaFields; private String keyGeneratorClassProp; + private Boolean hiveStylePartitioningEnable; + private Boolean urlEncodePartitioning; private PropertyBuilder() { @@ -725,6 +727,16 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setHiveStylePartitioningEnable(Boolean hiveStylePartitioningEnable) { + this.hiveStylePartitioningEnable = hiveStylePartitioningEnable; + return this; + } + + public PropertyBuilder setUrlEncodePartitioning(Boolean urlEncodePartitioning) { + this.urlEncodePartitioning = urlEncodePartitioning; + return this; + } + public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) { return setTableType(metaClient.getTableType()) .setTableName(metaClient.getTableConfig().getTableName()) @@ -786,6 +798,12 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) { setKeyGeneratorClassProp(hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)); } + if (hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) { + setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); + } + if (hoodieConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING)) { + setUrlEncodePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING)); + } return this; } @@ -849,6 +867,12 @@ public class HoodieTableMetaClient implements Serializable { if (null != keyGeneratorClassProp) { tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, keyGeneratorClassProp); } + if (null != hiveStylePartitioningEnable) { + tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, Boolean.toString(hiveStylePartitioningEnable)); + } + if (null != urlEncodePartitioning) { + tableConfig.setValue(HoodieTableConfig.URL_ENCODE_PARTITIONING, Boolean.toString(urlEncodePartitioning)); + } return tableConfig.getProps(); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 9738816..311c131 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -34,12 +34,17 @@ import java.io.IOException; */ public class HoodieCommonTestHarness { + protected String tableName = null; protected String basePath = null; protected transient HoodieTestDataGenerator dataGen = null; protected transient HoodieTableMetaClient metaClient; @TempDir public java.nio.file.Path tempDir; + protected void setTableName(String tableName) { + this.tableName = tableName; + } + /** * Initializes basePath. */ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 00133ab..a9d85af 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -154,14 +154,12 @@ class DefaultSource extends RelationProvider mode: SaveMode, optParams: Map[String, String], df: DataFrame): BaseRelation = { - val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams) - val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters) val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*) - if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) { - HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols) + if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) { + HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols) } else { - HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols) + HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) } new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } @@ -170,11 +168,9 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams) - val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters) new HoodieStreamingSink( sqlContext, - translatedOptions, + optParams, partitionColumns, outputMode) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e5c1a7a..1d0e8af 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -17,13 +17,13 @@ package org.apache.hudi - import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.DataSourceOptionsHelper.{allAlternatives, translateConfigurations} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} @@ -31,12 +31,13 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} +import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows} import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool @@ -51,9 +52,9 @@ import org.apache.spark.{SPARK_VERSION, SparkContext} import java.util import java.util.Properties -import org.apache.hudi.index.SparkHoodieIndexFactory - import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.StringBuilder import scala.collection.mutable.ListBuffer object HoodieSparkSqlWriter { @@ -65,7 +66,7 @@ object HoodieSparkSqlWriter { def write(sqlContext: SQLContext, mode: SaveMode, - parameters: Map[String, String], + optParams: Map[String, String], df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, @@ -75,16 +76,23 @@ object HoodieSparkSqlWriter { : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { + assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") + val path = optParams("path") + val basePath = new Path(path) val sparkContext = sqlContext.sparkContext - val path = parameters.get("path") - val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) - val tblNameOp = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") + val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) + tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) + var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) + validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) + + val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) + val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, + s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim + assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)), + s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") + asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined - if (path.isEmpty) { - throw new HoodieException(s"'path' must be set.") - } - val tblName = tblNameOp.trim sparkContext.getConf.getOption("spark.serializer") match { case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") @@ -105,12 +113,8 @@ object HoodieSparkSqlWriter { } val jsc = new JavaSparkContext(sparkContext) - val basePath = new Path(path.get) val instantTime = HoodieActiveTimeline.createNewInstantTime() - val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) - tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt) - val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(toProperties(parameters)) + val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps)) if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") @@ -124,7 +128,7 @@ object HoodieSparkSqlWriter { val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) - val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean + val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) @@ -138,7 +142,9 @@ object HoodieSparkSqlWriter { .setPopulateMetaFields(populateMetaFields) .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) .setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_NAME)) - .initTable(sparkContext.hadoopConfiguration, path.get) + .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) + .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) + .initTable(sparkContext.hadoopConfiguration, path) tableConfig = tableMetaClient.getTableConfig } @@ -169,7 +175,7 @@ object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - null, path.get, tblName, + null, path, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] @@ -200,7 +206,7 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - null, path.get, tblName, + null, path, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] // Issue delete partitions @@ -244,7 +250,7 @@ object HoodieSparkSqlWriter { val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema // Create a HoodieWriteClient & issue the write. - val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path.get, + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] @@ -326,14 +332,21 @@ object HoodieSparkSqlWriter { def bootstrap(sqlContext: SQLContext, mode: SaveMode, - parameters: Map[String, String], + optParams: Map[String, String], df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = { + assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") + val path = optParams("path") + val basePath = new Path(path) val sparkContext = sqlContext.sparkContext - val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set.")) - val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) + val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) + tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) + var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) + validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) + + val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE) val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH, @@ -349,10 +362,6 @@ object HoodieSparkSqlWriter { schema = HoodieAvroUtils.getNullSchema.toString } - val basePath = new Path(path) - val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) - tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) // Handle various save modes if (mode == SaveMode.Ignore && tableExists) { @@ -381,6 +390,8 @@ object HoodieSparkSqlWriter { .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setKeyGeneratorClassProp(keyGenProp) + .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) + .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .initTable(sparkContext.hadoopConfiguration, path) } @@ -401,7 +412,7 @@ object HoodieSparkSqlWriter { df: DataFrame, tblName: String, basePath: Path, - path: Option[String], + path: String, instantTime: String, partitionColumns: String): (Boolean, common.util.Option[String]) = { val sparkContext = sqlContext.sparkContext @@ -424,7 +435,7 @@ object HoodieSparkSqlWriter { throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet") } val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString) - val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params)) + val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params)) val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) { val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig) if (userDefinedBulkInsertPartitionerOpt.isPresent) { @@ -699,4 +710,49 @@ object HoodieSparkSqlWriter { null } } + + private def validateTableConfig(spark: SparkSession, params: Map[String, String], + tableConfig: HoodieTableConfig): Unit = { + val resolver = spark.sessionState.conf.resolver + val diffConfigs = StringBuilder.newBuilder + params.foreach { case (key, value) => + val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) + if (null != existingValue && !resolver(existingValue, value)) { + diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") + } + } + if (diffConfigs.nonEmpty) { + diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n") + throw new HoodieException(diffConfigs.toString.trim) + } + } + + private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], + tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = { + val mergedParams = mutable.Map.empty ++ + DataSourceWriteOptions.translateSqlOptions(HoodieWriterUtils.parametersWithWriteDefaults(optParams)) + if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) + && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) { + mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key) + } + if (null != tableConfig) { + tableConfig.getProps.foreach { case (key, value) => + mergedParams(key) = value + } + } + val params = mergedParams.toMap + (params, HoodieWriterUtils.convertMapToHoodieConfig(params)) + } + + private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieTableConfig, key: String): String = { + if (null == tableConfig) { + null + } else { + if (allAlternatives.contains(key)) { + tableConfig.getString(allAlternatives(key)) + } else { + tableConfig.getString(key) + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index b1f8eb5..6e736d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -48,9 +48,12 @@ class HoodieStreamingSink(sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[HoodieStreamingSink]) - private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT.key).toInt - private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key).toLong - private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key).toBoolean + private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key, + DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt + private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key, + DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong + private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key, + DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean private var isAsyncCompactorServiceShutdownAbnormally = false private var isAsyncClusteringServiceShutdownAbnormally = false diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index bdb2afb..0e3ede1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -92,10 +92,9 @@ object HoodieWriterUtils { * @return */ def getPartitionColumns(parameters: Map[String, String]): String = { - val props = new TypedProperties() + val props = new Properties() props.putAll(parameters.asJava) - val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) - HoodieSparkUtils.getPartitionColumns(keyGen, props) + HoodieSparkUtils.getPartitionColumns(props) } def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 25d3026..963035c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -120,8 +120,13 @@ object HoodieOptionConfig { */ def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = { defaultTableConfig ++ - options.filterKeys(k => keyTableConfigMapping.contains(k)) - .map(kv => keyTableConfigMapping(kv._1) -> valueMapping.getOrElse(kv._2, kv._2)) + options.map { case (k, v) => + if (keyTableConfigMapping.contains(k)) { + keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v) + } else { + k -> v + } + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index ec1f746..8ac6312 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -41,8 +41,12 @@ import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOL import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkConf} - import java.util.{Locale, Properties} + +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.keygen.ComplexKeyGenerator +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + import scala.collection.JavaConverters._ import scala.collection.mutable @@ -90,35 +94,13 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean .setBasePath(path) .setConf(conf) .build() - val tableSchema = getTableSqlSchema(metaClient) - - // Get options from the external table and append with the options in ddl. - val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption( - metaClient.getTableConfig.getProps.asScala.toMap) - - val allPartitionPaths = getAllPartitionPaths(sparkSession, table) - var upgrateConfig = Map.empty[String, String] - // If this is a non-hive-styled partition table, disable the hive style config. - // (By default this config is enable for spark sql) - upgrateConfig = if (!isHiveStyledPartitioning(allPartitionPaths, table)) { - upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false") - } else { - upgrateConfig - } - upgrateConfig = if (!isUrlEncodeEnabled(allPartitionPaths, table)) { - upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false") - } else { - upgrateConfig - } + val tableSchema = getTableSqlSchema(metaClient) - // Use the origin keygen to generate record key to keep the rowkey consistent with the old table for spark sql. - // See SqlKeyGenerator#getRecordKey for detail. - upgrateConfig = if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { - upgrateConfig + (SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) - } else { - upgrateConfig - } - val options = originTableConfig ++ upgrateConfig ++ table.storage.properties + // Get options from the external table and append with the options in ddl. + val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption( + metaClient.getTableConfig.getProps.asScala.toMap) + val extraConfig = extraTableConfig(sparkSession, isTableExists, originTableConfig) + val options = originTableConfig ++ table.storage.properties ++ extraConfig val userSpecifiedSchema = table.schema if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) { @@ -137,7 +119,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean s". The associated location('$path') already exists.") } // Add the meta fields to the schema if this is a managed table or an empty external table. - (addMetaFields(table.schema), table.storage.properties) + val options = table.storage.properties ++ extraTableConfig(sparkSession, false) + (addMetaFields(table.schema), options) } val tableType = HoodieOptionConfig.getTableType(table.storage.properties) @@ -314,6 +297,43 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'") } } + + def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean, + originTableConfig: Map[String, String] = Map.empty): Map[String, String] = { + val extraConfig = mutable.Map.empty[String, String] + if (isTableExists) { + val allPartitionPaths = getAllPartitionPaths(sparkSession, table) + if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) { + extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = + originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) + } else { + extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = + String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table)) + } + if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) { + extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = + originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) + } else { + extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = + String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table)) + } + } else { + extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true" + extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue() + } + + val primaryColumns = HoodieOptionConfig.getPrimaryColumns(originTableConfig ++ table.storage.properties) + if (primaryColumns.isEmpty) { + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[UuidKeyGenerator].getCanonicalName + } else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = + HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( + originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + } else { + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName + } + extraConfig.toMap + } } object CreateHoodieTableCommand extends Logging { @@ -342,6 +362,9 @@ object CreateHoodieTableCommand extends Logging { checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) // Save all the table config to the hoodie.properties. val parameters = originTableConfig ++ tableOptions val properties = new Properties() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index 4d6d0a2..987ce0e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.ddl.HiveSyncMode @@ -58,7 +59,12 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab val targetTable = sparkSession.sessionState.catalog .getTableMetadata(tableId) val path = getTableLocation(targetTable, sparkSession) - + val conf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path) + .setConf(conf) + .build() + val tableConfig = metaClient.getTableConfig val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties) assert(primaryColumns.nonEmpty, @@ -66,13 +72,14 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab withSparkConf(sparkSession, targetTable.storage.properties) { Map( "path" -> path, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, TBL_NAME.key -> tableId.table, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, + KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HIVE_STYLE_PARTITIONING.key -> "true", HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200", SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL ) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index e1c61ed..2b88373 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -21,12 +21,14 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode +import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.sql.InsertMode import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} import org.apache.spark.internal.Logging @@ -90,7 +92,6 @@ object InsertIntoHoodieTableCommand extends Logging { // for insert into or insert overwrite partition we use append mode. SaveMode.Append } - val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config) val conf = sparkSession.sessionState.conf val alignedQuery = alignOutputFields(query, table, insertPartitions, conf) // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery), @@ -100,7 +101,7 @@ object InsertIntoHoodieTableCommand extends Logging { val inputDF = sparkSession.createDataFrame( Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema) val success = - HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1 + HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1 if (success) { if (refreshTable) { sparkSession.catalog.refreshTable(table.identifier.unquotedString) @@ -197,19 +198,42 @@ object InsertIntoHoodieTableCommand extends Logging { val parameters = withSparkConf(sparkSession, options)() val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue) - - val partitionFields = table.partitionColumnNames.mkString(",") - val path = getTableLocation(table, sparkSession) - - val tableSchema = table.schema - val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options) + val partitionFields = table.partitionColumnNames.mkString(",") - val keyGenClass = if (primaryColumns.nonEmpty) { - classOf[SqlKeyGenerator].getCanonicalName + val path = getTableLocation(table, sparkSession) + val conf = sparkSession.sessionState.newHadoopConf() + val isTableExists = tableExistsInPath(path, conf) + val tableConfig = if (isTableExists) { + HoodieTableMetaClient.builder() + .setBasePath(path) + .setConf(conf) + .build() + .getTableConfig } else { - classOf[UuidKeyGenerator].getName + null } + val hiveStylePartitioningEnable = if (null == tableConfig || null == tableConfig.getHiveStylePartitioningEnable) { + "true" + } else { + tableConfig.getHiveStylePartitioningEnable + } + val urlEncodePartitioning = if (null == tableConfig || null == tableConfig.getUrlEncodePartitoning) { + "false" + } else { + tableConfig.getUrlEncodePartitoning + } + val keyGeneratorClassName = if (null == tableConfig || null == tableConfig.getKeyGeneratorClassName) { + if (primaryColumns.nonEmpty) { + classOf[ComplexKeyGenerator].getCanonicalName + } else { + classOf[UuidKeyGenerator].getCanonicalName + } + } else { + tableConfig.getKeyGeneratorClassName + } + + val tableSchema = table.schema val dropDuplicate = sparkSession.conf .getOption(INSERT_DROP_DUPS.key) @@ -267,7 +291,9 @@ object InsertIntoHoodieTableCommand extends Logging { TBL_NAME.key -> table.identifier.table, PRECOMBINE_FIELD.key -> tableSchema.fields.last.name, OPERATION.key -> operation, - KEYGENERATOR_CLASS_NAME.key -> keyGenClass, + HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, + KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName, RECORDKEY_FIELD.key -> primaryColumns.mkString(","), PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS_NAME.key -> payloadClassName, @@ -279,10 +305,8 @@ object InsertIntoHoodieTableCommand extends Logging { HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"), HIVE_TABLE.key -> table.identifier.table, HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HIVE_STYLE_PARTITIONING.key -> "true", HIVE_PARTITION_FIELDS.key -> partitionFields, HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - URL_ENCODE_PARTITIONING.key -> "true", HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index b22c607..dd1be20 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor @@ -34,7 +35,6 @@ import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ import org.apache.spark.sql.hudi.{HoodieOptionConfig, SerDeUtils} import org.apache.spark.sql.types.{BooleanType, StructType} - import java.util.Base64 /** @@ -419,7 +419,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab val targetTableDb = targetTableIdentify.database.getOrElse("default") val targetTableName = targetTableIdentify.identifier val path = getTableLocation(targetTable, sparkSession) - + val conf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path) + .setConf(conf) + .build() + val tableConfig = metaClient.getTableConfig val options = targetTable.storage.properties val definedPk = HoodieOptionConfig.getPrimaryColumns(options) // TODO Currently the mergeEqualConditionKeys must be the same the primary key. @@ -429,31 +434,30 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab } // Enable the hive sync by default if spark have enable the hive metastore. val enableHive = isEnableHive(sparkSession) - HoodieWriterUtils.parametersWithWriteDefaults( - withSparkConf(sparkSession, options) { - Map( - "path" -> path, - RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","), - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field - TBL_NAME.key -> targetTableName, - PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), - PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, - META_SYNC_ENABLED.key -> enableHive.toString, - HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HIVE_USE_JDBC.key -> "false", - HIVE_DATABASE.key -> targetTableDb, - HIVE_TABLE.key -> targetTableName, - HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HIVE_STYLE_PARTITIONING.key -> "true", - HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","), - HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - URL_ENCODE_PARTITIONING.key -> "true", // enable the url decode for sql. - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", - HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL - ) - }) + withSparkConf(sparkSession, options) { + Map( + "path" -> path, + RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","), + PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field + TBL_NAME.key -> targetTableName, + PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), + PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, + KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName, + META_SYNC_ENABLED.key -> enableHive.toString, + HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), + HIVE_USE_JDBC.key -> "false", + HIVE_DATABASE.key -> targetTableDb, + HIVE_TABLE.key -> targetTableName, + HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", + HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","), + HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", + HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200", + SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL + ) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index b59984a..e069df9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql.hudi.command import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} + import org.apache.avro.generic.GenericRecord import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface} +import org.apache.hudi.keygen._ +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, TimestampType} import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} @@ -48,7 +50,8 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) val keyGenProps = new TypedProperties() keyGenProps.putAll(props) keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME) - keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, beforeKeyGenClassName) + val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props) + keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName) Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps)) } else { None @@ -64,7 +67,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) } override def getRecordKey(row: Row): String = { - if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) { + if (originKeyGen.isDefined) { originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row) } else { super.getRecordKey(row) @@ -121,4 +124,13 @@ object SqlKeyGenerator { val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class" private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S") + + def getRealKeyGenClassName(props: TypedProperties): String = { + val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null) + if (beforeKeyGenClassName != null) { + HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName) + } else { + classOf[ComplexKeyGenerator].getCanonicalName + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 20a8274..b1c8a04 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor @@ -85,7 +86,12 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo val targetTable = sparkSession.sessionState.catalog .getTableMetadata(tableId) val path = getTableLocation(targetTable, sparkSession) - + val conf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path) + .setConf(conf) + .build() + val tableConfig = metaClient.getTableConfig val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties) assert(primaryColumns.nonEmpty, @@ -95,9 +101,11 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo Map( "path" -> path, RECORDKEY_FIELD.key -> primaryColumns.mkString(","), - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, PRECOMBINE_FIELD.key -> primaryColumns.head, //set the default preCombine field. TBL_NAME.key -> tableId.table, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, + KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), META_SYNC_ENABLED.key -> enableHive.toString, @@ -107,9 +115,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo HIVE_TABLE.key -> tableId.table, HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","), HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - URL_ENCODE_PARTITIONING.key -> "true", HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HIVE_STYLE_PARTITIONING.key -> "true", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala index ff95e87..96fb18d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala @@ -30,7 +30,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.functional.TestBootstrap import org.apache.hudi.hive.HiveSyncConfig -import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext @@ -48,8 +48,10 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte import java.time.Instant import java.util import java.util.{Collections, Date, UUID} + import scala.collection.JavaConversions._ import scala.collection.JavaConverters +import scala.util.control.NonFatal /** * Test suite for SparkSqlWriter class. @@ -161,7 +163,6 @@ class HoodieSparkSqlWriterSuite { .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)) .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name()) - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -175,7 +176,7 @@ class HoodieSparkSqlWriterSuite { val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df) // collect all partition paths to issue read of parquet files val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, @@ -242,21 +243,19 @@ class HoodieSparkSqlWriterSuite { //create a new table val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4") - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, dataFrame) //on same path try append with different("hoodie_bar_tbl") table name which should throw an exception val barTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl", "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4") - val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier) val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) - val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2)) + val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2)) assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) //on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception - val deleteTableParams = barTableParams ++ Map(OPERATION.key -> "delete") - val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2)) + val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete") + val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2)) assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) } @@ -295,7 +294,6 @@ class HoodieSparkSqlWriterSuite { .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name()) - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -304,7 +302,7 @@ class HoodieSparkSqlWriterSuite { val df = spark.createDataFrame(sc.parallelize(inserts), structType) try { // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df) Assertions.fail("Should have thrown exception") } catch { case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back")) @@ -323,7 +321,6 @@ class HoodieSparkSqlWriterSuite { .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(INSERT_DROP_DUPS.key, "true") - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -332,7 +329,7 @@ class HoodieSparkSqlWriterSuite { val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType) // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df) fail("Drop duplicates with bulk insert in row writing should have thrown exception") } catch { case e: HoodieException => assertTrue(e.getMessage.contains("Dropping duplicates with bulk_insert in row writer path is not supported yet")) @@ -348,7 +345,6 @@ class HoodieSparkSqlWriterSuite { //create a new table val fooTableModifier = commonTableModifier.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false") - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -357,7 +353,7 @@ class HoodieSparkSqlWriterSuite { val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df) // collect all partition paths to issue read of parquet files val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, @@ -384,7 +380,6 @@ class HoodieSparkSqlWriterSuite { val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) val fullPartitionPaths = new Array[String](3) @@ -400,7 +395,7 @@ class HoodieSparkSqlWriterSuite { val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df) // Fetch records from entire dataset val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) // remove metadata columns so that expected and actual DFs can be compared as is @@ -450,7 +445,7 @@ class HoodieSparkSqlWriterSuite { new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, Option(client)) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df, Option.empty, Option(client)) // Verify that asynchronous compaction is not scheduled verify(client, times(0)).scheduleCompaction(any()) // Verify that HoodieWriteClient is closed correctly @@ -504,14 +499,14 @@ class HoodieSparkSqlWriterSuite { val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) initializeMetaClientForBootstrap(fooTableParams, tableType, true) - val client = spy(DataSourceUtils.createHoodieClient( + val client = spy(DataSourceUtils.createHoodieClient( new JavaSparkContext(sc), null, tempBasePath, hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) - HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableParams, spark.emptyDataFrame, Option.empty, + HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableModifier, spark.emptyDataFrame, Option.empty, Option(client)) // Verify that HoodieWriteClient is closed correctly @@ -556,7 +551,6 @@ class HoodieSparkSqlWriterSuite { //create a new table val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, tableType) .updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "true") - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -564,7 +558,7 @@ class HoodieSparkSqlWriterSuite { var records = DataSourceTestUtils.generateRandomRows(10) var recordsSeq = convertRowListToSeq(records) val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1) val snapshotDF1 = spark.read.format("org.apache.hudi") .load(tempBasePath + "/*/*/*/*") @@ -577,7 +571,7 @@ class HoodieSparkSqlWriterSuite { // issue updates so that log files are created for MOR table val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, updatesDf) val snapshotDF2 = spark.read.format("org.apache.hudi") .load(tempBasePath + "/*/*/*/*") @@ -595,7 +589,7 @@ class HoodieSparkSqlWriterSuite { recordsSeq = convertRowListToSeq(records) val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), evolStructType) // write to Hudi with new column - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df3) val snapshotDF3 = spark.read.format("org.apache.hudi") .load(tempBasePath + "/*/*/*/*") @@ -610,7 +604,7 @@ class HoodieSparkSqlWriterSuite { records = DataSourceTestUtils.generateRandomRows(10) recordsSeq = convertRowListToSeq(records) val df4 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df4) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df4) val snapshotDF4 = spark.read.format("org.apache.hudi") .load(tempBasePath + "/*/*/*/*") @@ -743,14 +737,13 @@ class HoodieSparkSqlWriterSuite { @ValueSource(booleans = Array(true, false)) def testDeletePartitionsV2(usePartitionsToDeleteConfig: Boolean): Unit = { val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val records = DataSourceTestUtils.generateRandomRows(10) val recordsSeq = convertRowListToSeq(records) val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1) val snapshotDF1 = spark.read.format("org.apache.hudi") .load(tempBasePath + "/*/*/*/*") assertEquals(10, snapshotDF1.count()) @@ -761,7 +754,7 @@ class HoodieSparkSqlWriterSuite { val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) // write updates to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, updatesDf) val snapshotDF2 = spark.read.format("org.apache.hudi") .load(tempBasePath + "/*/*/*/*") assertEquals(10, snapshotDF2.count()) @@ -770,7 +763,7 @@ class HoodieSparkSqlWriterSuite { // ensure 2nd batch of updates matches. assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) if (usePartitionsToDeleteConfig) { - fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + fooTableModifier.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) } // delete partitions contains the primary key val recordsToDelete = df1.filter(entry => { @@ -778,7 +771,7 @@ class HoodieSparkSqlWriterSuite { partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) }) - val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) + val updatedParams = fooTableModifier.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete) val snapshotDF3 = spark.read.format("org.apache.hudi") .load(tempBasePath + "/*/*/*/*") @@ -819,4 +812,88 @@ class HoodieSparkSqlWriterSuite { assert(spark.read.format("hudi").load(tempBasePath).where("age >= 2000").count() == 10) } } + + /** + * Test case for no need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator included in HoodieTableConfig except the first time write + */ + @Test + def testToWriteWithoutParametersIncludedInHoodieTableConfig(): Unit = { + val _spark = spark + import _spark.implicits._ + val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + val options = Map( + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + ) + + // case 1: test table which created by sql + val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") + spark.sql( + s""" + | create table $tableName1 ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | partitioned by (dt) + | options ( + | primaryKey = 'id' + | ) + | location '$tablePath1' + """.stripMargin) + val tableConfig1 = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(tablePath1).build().getTableConfig + assert(tableConfig1.getHiveStylePartitioningEnable == "true") + assert(tableConfig1.getUrlEncodePartitoning == "false") + assert(tableConfig1.getKeyGeneratorClassName == classOf[ComplexKeyGenerator].getName) + df.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .mode(SaveMode.Append).save(tablePath1) + assert(spark.read.format("hudi").load(tablePath1 + "/*").count() == 1) + + // case 2: test table which created by dataframe + val (tableName2, tablePath2) = ("hoodie_test_params_2", s"$tempBasePath" + "_2") + // the first write need to specify params + df.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName2) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, "true") + .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .mode(SaveMode.Overwrite).save(tablePath2) + val tableConfig2 = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(tablePath2).build().getTableConfig + assert(tableConfig2.getHiveStylePartitioningEnable == "false") + assert(tableConfig2.getUrlEncodePartitoning == "true") + assert(tableConfig2.getKeyGeneratorClassName == classOf[SimpleKeyGenerator].getName) + + val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + // raise exception when use params which is not same with HoodieTableConfig + try { + df2.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName2) + .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .mode(SaveMode.Append).save(tablePath2) + } catch { + case NonFatal(e) => + assert(e.getMessage.contains("Config conflict")) + assert(e.getMessage.contains( + s"${HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key}\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}")) + } + + // do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params + df2.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName2) + .mode(SaveMode.Append).save(tablePath2) + val data = spark.read.format("hudi").load(tablePath2 + "/*") + assert(data.count() == 2) + assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "dt=2021-10-16") + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 94e9620..7c58cc0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -17,6 +17,8 @@ package org.apache.hudi +import java.util.Properties + import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.HoodieTableMetaClient @@ -58,6 +60,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { ) @BeforeEach override def setUp() { + setTableName("hoodie_test") initPath() initSparkContexts() spark = sqlContext.sparkSession @@ -71,6 +74,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase { @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testPartitionSchema(partitionEncode: Boolean): Unit = { + val props = new Properties() + props.setProperty(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, String.valueOf(partitionEncode)) + initMetaClient(props) val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100) val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) inputDF1.write.format("hudi") @@ -128,6 +134,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase { @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testPartitionPruneWithPartitionEncode(partitionEncode: Boolean): Unit = { + val props = new Properties() + props.setProperty(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, String.valueOf(partitionEncode)) + initMetaClient(props) val partitions = Array("2021/03/08", "2021/03/09", "2021/03/10", "2021/03/11", "2021/03/12") val newDataGen = new HoodieTestDataGenerator(partitions) val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 8fc6e7f..d6ae80d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -154,7 +154,9 @@ class TestDataSourceForBootstrap { // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( - DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr")) + DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, + Some("datestr"), + Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true")) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") @@ -473,11 +475,13 @@ class TestDataSourceForBootstrap { } def runMetadataBootstrapAndVerifyCommit(tableType: String, - partitionColumns: Option[String] = None): String = { + partitionColumns: Option[String] = None, + extraOpts: Map[String, String] = Map.empty): String = { val bootstrapDF = spark.emptyDataFrame bootstrapDF.write .format("hudi") .options(commonOpts) + .options(extraOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse("")) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index ee914ae..eba2a3d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,6 +17,8 @@ package org.apache.hudi.functional +import java.util.Properties + import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig @@ -61,6 +63,7 @@ class TestMORDataSource extends HoodieClientTestBase { val updatedVerificationVal: String = "driver_update" @BeforeEach override def setUp() { + setTableName("hoodie_test") initPath() initSparkContexts() spark = sqlContext.sparkSession diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index bb102a4..9482ae3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -46,6 +46,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { ) @BeforeEach override def setUp() { + setTableName("hoodie_test") initPath() initSparkContexts() spark = sqlContext.sparkSession