This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 544f9e5a2f6 [HUDI-6780] Introduce enums instead of classnames in table properties (#9590) 544f9e5a2f6 is described below commit 544f9e5a2f63a343ec8c56b2b1e8bcd2b42aaa33 Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Thu Sep 14 20:29:44 2023 +0530 [HUDI-6780] Introduce enums instead of classnames in table properties (#9590) There are three configs in the `hoodie.properties` that hold classnames; `KEY_GENERATOR_CLASS_NAME`, `BOOTSTRAP_INDEX_CLASS_NAME`, `PAYLOAD_CLASS_NAME`. This PR adds enums for them and sets the enum in table config while inferring the classname from the enum in the getters. --- .../apache/hudi/config/HoodieBootstrapConfig.java | 5 +- .../org/apache/hudi/config/HoodieWriteConfig.java | 11 +- .../common/bootstrap/index/BootstrapIndex.java | 6 +- .../hudi/common/model/BootstrapIndexType.java | 76 ++++++++++++++ .../hudi/common/model/RecordPayloadType.java | 112 +++++++++++++++++++++ .../hudi/common/table/HoodieTableConfig.java | 63 +++++++++--- .../hudi/common/table/HoodieTableMetaClient.java | 54 +++++++--- .../org/apache/hudi/common/util/ConfigUtils.java | 10 +- .../hudi/keygen/constant/KeyGeneratorType.java | 83 +++++++++++++-- .../org/apache/hudi/table/HoodieTableFactory.java | 4 + .../scala/org/apache/hudi/DataSourceOptions.scala | 26 ++--- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 26 +++-- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 8 +- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 6 +- .../apache/spark/sql/hudi/HoodieOptionConfig.scala | 11 +- .../apache/hudi/cli/BootstrapExecutorUtils.java | 8 +- .../apache/spark/sql/hudi/TestCreateTable.scala | 7 +- .../spark/sql/hudi/TestHoodieOptionConfig.scala | 7 +- .../sql/hudi/procedure/TestRepairsProcedure.scala | 2 +- .../deltastreamer/TestHoodieDeltaStreamer.java | 37 ++----- .../TestHoodieMultiTableDeltaStreamer.java | 2 +- .../short_trip_uber_config.properties | 2 +- 22 files changed, 451 insertions(+), 115 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java index d88f0bb2e6f..297ad381907 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java @@ -26,7 +26,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; -import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.model.BootstrapIndexType; import java.io.File; import java.io.FileReader; @@ -250,8 +250,7 @@ public class HoodieBootstrapConfig extends HoodieConfig { public HoodieBootstrapConfig build() { // TODO: use infer function instead - bootstrapConfig.setDefaultValue(INDEX_CLASS_NAME, HoodieTableConfig.getDefaultBootstrapIndexClass( - bootstrapConfig.getProps())); + bootstrapConfig.setDefaultValue(INDEX_CLASS_NAME, BootstrapIndexType.getDefaultBootstrapIndexClassName(bootstrapConfig)); bootstrapConfig.setDefaults(HoodieBootstrapConfig.class.getName()); return bootstrapConfig; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 5f83a67486a..82c0eff1610 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -42,6 +42,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.RecordPayloadType; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.block.HoodieLogBlock; @@ -151,6 +152,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); + public static final ConfigProperty<String> WRITE_PAYLOAD_TYPE = ConfigProperty + .key("hoodie.datasource.write.payload.type") + .defaultValue(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()) + .markAdvanced() + .withDocumentation(RecordPayloadType.class); + public static final ConfigProperty<String> RECORD_MERGER_IMPLS = ConfigProperty .key("hoodie.datasource.write.record.merger.impls") .defaultValue(HoodieAvroRecordMerger.class.getName()) @@ -1242,10 +1249,6 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(PRECOMBINE_FIELD_NAME); } - public String getWritePayloadClass() { - return getString(WRITE_PAYLOAD_CLASS_NAME); - } - public String getKeyGeneratorClass() { return getString(KEYGENERATOR_CLASS_NAME); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java index abd3ac51a20..80569a9f1f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.bootstrap.index; import org.apache.hudi.common.model.BootstrapFileMapping; +import org.apache.hudi.common.model.BootstrapIndexType; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -160,7 +161,8 @@ public abstract class BootstrapIndex implements Serializable { } public static BootstrapIndex getBootstrapIndex(HoodieTableMetaClient metaClient) { - return ((BootstrapIndex)(ReflectionUtils.loadClass( - metaClient.getTableConfig().getBootstrapIndexClass(), new Class[]{HoodieTableMetaClient.class}, metaClient))); + return ((BootstrapIndex) (ReflectionUtils.loadClass( + BootstrapIndexType.getBootstrapIndexClassName(metaClient.getTableConfig()), + new Class[] {HoodieTableMetaClient.class}, metaClient))); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapIndexType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapIndexType.java new file mode 100644 index 00000000000..c2233f39cea --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapIndexType.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; +import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex; +import org.apache.hudi.common.config.EnumDescription; +import org.apache.hudi.common.config.EnumFieldDescription; +import org.apache.hudi.common.config.HoodieConfig; + +import static org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME; +import static org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE; +import static org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_TYPE; + +@EnumDescription("Bootstrap index type to use for mapping between skeleton and actual data files.") +public enum BootstrapIndexType { + @EnumFieldDescription("Maintains mapping in HFile format.") + HFILE(HFileBootstrapIndex.class.getName()), + @EnumFieldDescription("No-op, an empty implementation.") + NO_OP(NoOpBootstrapIndex.class.getName()); + + private final String className; + + BootstrapIndexType(String className) { + this.className = className; + } + + public String getClassName() { + return className; + } + + public static BootstrapIndexType fromClassName(String className) { + for (BootstrapIndexType type : BootstrapIndexType.values()) { + if (type.getClassName().equals(className)) { + return type; + } + } + throw new IllegalArgumentException("No BootstrapIndexType found for class name: " + className); + } + + public static String getBootstrapIndexClassName(HoodieConfig config) { + if (!config.getBooleanOrDefault(BOOTSTRAP_INDEX_ENABLE)) { + return BootstrapIndexType.NO_OP.getClassName(); + } + if (config.contains(BOOTSTRAP_INDEX_CLASS_NAME)) { + return config.getString(BOOTSTRAP_INDEX_CLASS_NAME); + } else if (config.contains(BOOTSTRAP_INDEX_TYPE)) { + return BootstrapIndexType.valueOf(config.getString(BOOTSTRAP_INDEX_TYPE)).getClassName(); + } + return getDefaultBootstrapIndexClassName(config); + } + + public static String getDefaultBootstrapIndexClassName(HoodieConfig config) { + if (!config.getBooleanOrDefault(BOOTSTRAP_INDEX_ENABLE)) { + return BootstrapIndexType.NO_OP.getClassName(); + } + return BootstrapIndexType.valueOf(BOOTSTRAP_INDEX_TYPE.defaultValue()).getClassName(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java new file mode 100644 index 00000000000..d1eae004dc5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.config.EnumDescription; +import org.apache.hudi.common.config.EnumFieldDescription; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload; +import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload; +import org.apache.hudi.metadata.HoodieMetadataPayload; + +import static org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME; +import static org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_TYPE; + +/** + * Payload to use for record. + */ +@EnumDescription("Payload to use for merging records") +public enum RecordPayloadType { + @EnumFieldDescription("Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3.") + AWS_DMS_AVRO(AWSDmsAvroPayload.class.getName()), + + @EnumFieldDescription("A payload to wrap a existing Hoodie Avro Record. Useful to create a HoodieRecord over existing GenericRecords.") + HOODIE_AVRO(HoodieAvroPayload.class.getName()), + + @EnumFieldDescription("Honors ordering field in both preCombine and combineAndGetUpdateValue.") + HOODIE_AVRO_DEFAULT(DefaultHoodieRecordPayload.class.getName()), + + @EnumFieldDescription("The only difference with HOODIE_AVRO_DEFAULT is that this does not track the event time metadata for efficiency") + EVENT_TIME_AVRO(EventTimeAvroPayload.class.getName()), + + @EnumFieldDescription("Subclass of OVERWRITE_LATEST_AVRO used for delta streamer.") + OVERWRITE_NON_DEF_LATEST_AVRO(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()), + + @EnumFieldDescription("Default payload used for delta streamer.") + OVERWRITE_LATEST_AVRO(OverwriteWithLatestAvroPayload.class.getName()), + + @EnumFieldDescription("Used for partial update to Hudi Table.") + PARTIAL_UPDATE_AVRO(PartialUpdateAvroPayload.class.getName()), + + @EnumFieldDescription("Provides support for seamlessly applying changes captured via Debezium for MysqlDB.") + MYSQL_DEBEZIUM_AVRO(MySqlDebeziumAvroPayload.class.getName()), + + @EnumFieldDescription("Provides support for seamlessly applying changes captured via Debezium for PostgresDB.") + POSTGRES_DEBEZIUM_AVRO(PostgresDebeziumAvroPayload.class.getName()), + + @EnumFieldDescription("A record payload Hudi's internal metadata table.") + HOODIE_METADATA(HoodieMetadataPayload.class.getName()), + + @EnumFieldDescription("A record payload to validate the duplicate key for INSERT statement in spark-sql.") + VALIDATE_DUPLICATE_AVRO("org.apache.spark.sql.hudi.command.ValidateDuplicateKeyPayload"), + + @EnumFieldDescription("A record payload for MERGE INTO statement in spark-sql.") + EXPRESSION_AVRO("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"), + + @EnumFieldDescription("Use the payload class set in `hoodie.datasource.write.payload.class`") + CUSTOM(""); + + private String className; + + RecordPayloadType(String className) { + this.className = className; + } + + public String getClassName() { + return className; + } + + public static RecordPayloadType fromClassName(String className) { + for (RecordPayloadType type : RecordPayloadType.values()) { + if (type.getClassName().equals(className)) { + return type; + } + } + // No RecordPayloadType found for class name, return CUSTOM + CUSTOM.className = className; + return CUSTOM; + } + + public static String getPayloadClassName(HoodieConfig config) { + String payloadClassName; + if (config.contains(PAYLOAD_CLASS_NAME)) { + payloadClassName = config.getString(PAYLOAD_CLASS_NAME); + } else if (config.contains(PAYLOAD_TYPE)) { + payloadClassName = RecordPayloadType.valueOf(config.getString(PAYLOAD_TYPE)).getClassName(); + } else if (config.contains("hoodie.datasource.write.payload.class")) { + payloadClassName = config.getString("hoodie.datasource.write.payload.class"); + } else { + payloadClassName = RecordPayloadType.valueOf(PAYLOAD_TYPE.defaultValue()).getClassName(); + } + // There could be tables written with payload class from com.uber.hoodie. + // Need to transparently change to org.apache.hudi. + return payloadClassName.replace("com.uber.hoodie", "org.apache.hudi"); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index cae35b18da3..2e1ad0b3bb6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -19,17 +19,18 @@ package org.apache.hudi.common.table; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; -import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.OrderedProperties; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.BootstrapIndexType; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.RecordPayloadType; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -40,6 +41,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.avro.Schema; @@ -164,9 +166,16 @@ public class HoodieTableConfig extends HoodieConfig { public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty .key("hoodie.compaction.payload.class") .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) + .deprecatedAfter("1.0.0") .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then " + " produce a new base file."); + public static final ConfigProperty<String> PAYLOAD_TYPE = ConfigProperty + .key("hoodie.compaction.payload.type") + .defaultValue(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()) + .sinceVersion("1.0.0") + .withDocumentation(RecordPayloadType.class); + public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = ConfigProperty .key("hoodie.compaction.record.merger.strategy") .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID) @@ -186,8 +195,15 @@ public class HoodieTableConfig extends HoodieConfig { public static final ConfigProperty<String> BOOTSTRAP_INDEX_CLASS_NAME = ConfigProperty .key("hoodie.bootstrap.index.class") .defaultValue(HFileBootstrapIndex.class.getName()) + .deprecatedAfter("1.0.0") .withDocumentation("Implementation to use, for mapping base files to bootstrap base file, that contain actual data."); + public static final ConfigProperty<String> BOOTSTRAP_INDEX_TYPE = ConfigProperty + .key("hoodie.bootstrap.index.type") + .defaultValue(BootstrapIndexType.HFILE.name()) + .sinceVersion("1.0.0") + .withDocumentation("Bootstrap index type determines which implementation to use, for mapping base files to bootstrap base file, that contain actual data."); + public static final ConfigProperty<String> BOOTSTRAP_BASE_PATH = ConfigProperty .key("hoodie.bootstrap.base.path") .noDefaultValue() @@ -202,8 +218,15 @@ public class HoodieTableConfig extends HoodieConfig { public static final ConfigProperty<String> KEY_GENERATOR_CLASS_NAME = ConfigProperty .key("hoodie.table.keygenerator.class") .noDefaultValue() + .deprecatedAfter("1.0.0") .withDocumentation("Key Generator class property for the hoodie table"); + public static final ConfigProperty<String> KEY_GENERATOR_TYPE = ConfigProperty + .key("hoodie.table.keygenerator.type") + .noDefaultValue() + .sinceVersion("1.0.0") + .withDocumentation("Key Generator type to determine key generator class"); + public static final ConfigProperty<HoodieTimelineTimeZone> TIMELINE_TIMEZONE = ConfigProperty .key("hoodie.table.timeline.timezone") .defaultValue(HoodieTimelineTimeZone.LOCAL) @@ -235,8 +258,6 @@ public class HoodieTableConfig extends HoodieConfig { DATE_TIME_PARSER ); - public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName(); - public static final ConfigProperty<String> TABLE_CHECKSUM = ConfigProperty .key("hoodie.table.checksum") .noDefaultValue() @@ -282,6 +303,11 @@ public class HoodieTableConfig extends HoodieConfig { setValue(PAYLOAD_CLASS_NAME, payloadClassName); needStore = true; } + if (contains(PAYLOAD_TYPE) && payloadClassName != null + && !payloadClassName.equals(RecordPayloadType.valueOf(getString(PAYLOAD_TYPE)).getClassName())) { + setValue(PAYLOAD_TYPE, RecordPayloadType.fromClassName(payloadClassName).name()); + needStore = true; + } if (contains(RECORD_MERGER_STRATEGY) && recordMergerStrategyId != null && !getString(RECORD_MERGER_STRATEGY).equals(recordMergerStrategyId)) { setValue(RECORD_MERGER_STRATEGY, recordMergerStrategyId); @@ -476,7 +502,7 @@ public class HoodieTableConfig extends HoodieConfig { } hoodieConfig.setDefaultValue(TYPE); if (hoodieConfig.getString(TYPE).equals(HoodieTableType.MERGE_ON_READ.name())) { - hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME); + hoodieConfig.setDefaultValue(PAYLOAD_TYPE); hoodieConfig.setDefaultValue(RECORD_MERGER_STRATEGY); } hoodieConfig.setDefaultValue(ARCHIVELOG_FOLDER); @@ -486,7 +512,7 @@ public class HoodieTableConfig extends HoodieConfig { } if (hoodieConfig.contains(BOOTSTRAP_BASE_PATH)) { // Use the default bootstrap index class. - hoodieConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_NAME, getDefaultBootstrapIndexClass(properties)); + hoodieConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_NAME, BootstrapIndexType.getDefaultBootstrapIndexClassName(hoodieConfig)); } if (hoodieConfig.contains(TIMELINE_TIMEZONE)) { HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE))); @@ -540,10 +566,7 @@ public class HoodieTableConfig extends HoodieConfig { * Read the payload class for HoodieRecords from the table properties. */ public String getPayloadClass() { - // There could be tables written with payload class from com.uber.hoodie. Need to transparently - // change to org.apache.hudi - return getStringOrDefault(PAYLOAD_CLASS_NAME).replace("com.uber.hoodie", - "org.apache.hudi"); + return RecordPayloadType.getPayloadClassName(this); } /** @@ -602,18 +625,26 @@ public class HoodieTableConfig extends HoodieConfig { * Read the payload class for HoodieRecords from the table properties. */ public String getBootstrapIndexClass() { - // There could be tables written with payload class from com.uber.hoodie. Need to transparently - // change to org.apache.hudi - return getStringOrDefault(BOOTSTRAP_INDEX_CLASS_NAME, getDefaultBootstrapIndexClass(props)); + if (!props.getBoolean(BOOTSTRAP_INDEX_ENABLE.key(), BOOTSTRAP_INDEX_ENABLE.defaultValue())) { + return BootstrapIndexType.NO_OP.getClassName(); + } + String bootstrapIndexClassName; + if (contains(BOOTSTRAP_INDEX_TYPE)) { + bootstrapIndexClassName = BootstrapIndexType.valueOf(getString(BOOTSTRAP_INDEX_TYPE)).getClassName(); + } else if (contains(BOOTSTRAP_INDEX_CLASS_NAME)) { + bootstrapIndexClassName = getString(BOOTSTRAP_INDEX_CLASS_NAME); + } else { + bootstrapIndexClassName = BootstrapIndexType.valueOf(BOOTSTRAP_INDEX_TYPE.defaultValue()).getClassName(); + } + return bootstrapIndexClassName; } public static String getDefaultBootstrapIndexClass(Properties props) { HoodieConfig hoodieConfig = new HoodieConfig(props); - String defaultClass = BOOTSTRAP_INDEX_CLASS_NAME.defaultValue(); if (!hoodieConfig.getBooleanOrDefault(BOOTSTRAP_INDEX_ENABLE)) { - defaultClass = NO_OP_BOOTSTRAP_INDEX_CLASS; + return BootstrapIndexType.NO_OP.getClassName(); } - return defaultClass; + return BootstrapIndexType.valueOf(BOOTSTRAP_INDEX_TYPE.defaultValue()).getClassName(); } public Option<String> getBootstrapBasePath() { @@ -697,7 +728,7 @@ public class HoodieTableConfig extends HoodieConfig { } public String getKeyGeneratorClassName() { - return getString(KEY_GENERATOR_CLASS_NAME); + return KeyGeneratorType.getKeyGeneratorClassName(this); } public HoodieTimelineTimeZone getTimelineTimezone() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 1b29ba8f46f..22e29035657 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -29,9 +29,11 @@ import org.apache.hudi.common.fs.FileSystemRetryConfig; import org.apache.hudi.common.fs.HoodieRetryWrapperFileSystem; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.fs.NoOpConsistencyGuard; +import org.apache.hudi.common.model.BootstrapIndexType; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTimelineTimeZone; +import org.apache.hudi.common.model.RecordPayloadType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -47,6 +49,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.hadoop.SerializablePath; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -447,7 +450,10 @@ public class HoodieTableMetaClient implements Serializable { // Meta fields can be disabled only when either {@code SimpleKeyGenerator}, {@code ComplexKeyGenerator}, {@code NonpartitionedKeyGenerator} is used if (!getTableConfig().populateMetaFields()) { - String keyGenClass = properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator"); + String keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new HoodieConfig(properties)); + if (StringUtils.isNullOrEmpty(keyGenClass)) { + keyGenClass = "org.apache.hudi.keygen.SimpleKeyGenerator"; + } if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator") && !keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator") && !keyGenClass.equals("org.apache.hudi.keygen.ComplexKeyGenerator")) { @@ -793,6 +799,7 @@ public class HoodieTableMetaClient implements Serializable { private String recordKeyFields; private String archiveLogFolder; private String payloadClassName; + private String payloadType; private String recordMergerStrategy; private Integer timelineLayoutVersion; private String baseFileFormat; @@ -805,6 +812,7 @@ public class HoodieTableMetaClient implements Serializable { private Boolean bootstrapIndexEnable; private Boolean populateMetaFields; private String keyGeneratorClassProp; + private String keyGeneratorType; private Boolean hiveStylePartitioningEnable; private Boolean urlEncodePartitioning; private HoodieTimelineTimeZone commitTimeZone; @@ -863,6 +871,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setPayloadType(String payloadType) { + this.payloadType = payloadType; + return this; + } + public PropertyBuilder setRecordMergerStrategy(String recordMergerStrategy) { this.recordMergerStrategy = recordMergerStrategy; return this; @@ -927,6 +940,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setKeyGeneratorType(String keyGeneratorType) { + this.keyGeneratorType = keyGeneratorType; + return this; + } + public PropertyBuilder setHiveStylePartitioningEnable(Boolean hiveStylePartitioningEnable) { this.hiveStylePartitioningEnable = hiveStylePartitioningEnable; return this; @@ -1013,8 +1031,9 @@ public class HoodieTableMetaClient implements Serializable { hoodieConfig.getString(HoodieTableConfig.ARCHIVELOG_FOLDER)); } if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME)) { - setPayloadClassName( - hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME)); + setPayloadClassName(hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME)); + } else if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_TYPE)) { + setPayloadClassName(RecordPayloadType.valueOf(hoodieConfig.getString(HoodieTableConfig.PAYLOAD_TYPE)).getClassName()); } if (hoodieConfig.contains(HoodieTableConfig.RECORD_MERGER_STRATEGY)) { setRecordMergerStrategy( @@ -1031,6 +1050,9 @@ public class HoodieTableMetaClient implements Serializable { setBootstrapIndexClass( hoodieConfig.getString(HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME)); } + if (hoodieConfig.contains(HoodieTableConfig.BOOTSTRAP_INDEX_TYPE)) { + setPayloadClassName(BootstrapIndexType.valueOf(hoodieConfig.getString(HoodieTableConfig.BOOTSTRAP_INDEX_TYPE)).getClassName()); + } if (hoodieConfig.contains(HoodieTableConfig.BOOTSTRAP_BASE_PATH)) { setBootstrapBasePath(hoodieConfig.getString(HoodieTableConfig.BOOTSTRAP_BASE_PATH)); } @@ -1063,6 +1085,8 @@ public class HoodieTableMetaClient implements Serializable { } if (hoodieConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) { setKeyGeneratorClassProp(hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)); + } else if (hoodieConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE)) { + setKeyGeneratorClassProp(KeyGeneratorType.valueOf(hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_TYPE)).getClassName()); } if (hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) { setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); @@ -1101,13 +1125,17 @@ public class HoodieTableMetaClient implements Serializable { } tableConfig.setValue(HoodieTableConfig.NAME, tableName); tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name()); - tableConfig.setValue(HoodieTableConfig.VERSION, - String.valueOf(HoodieTableVersion.current().versionCode())); - if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != null) { - tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, payloadClassName); - } - if (tableType == HoodieTableType.MERGE_ON_READ && recordMergerStrategy != null) { - tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, recordMergerStrategy); + tableConfig.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.current().versionCode())); + + if (tableType == HoodieTableType.MERGE_ON_READ) { + if (null != payloadClassName) { + tableConfig.setValue(HoodieTableConfig.PAYLOAD_TYPE, RecordPayloadType.fromClassName(payloadClassName).name()); + } else if (null != payloadType) { + tableConfig.setValue(HoodieTableConfig.PAYLOAD_TYPE, payloadType); + } + if (recordMergerStrategy != null) { + tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, recordMergerStrategy); + } } if (null != tableCreateSchema) { @@ -1130,7 +1158,7 @@ public class HoodieTableMetaClient implements Serializable { } if (null != bootstrapIndexClass) { - tableConfig.setValue(HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME, bootstrapIndexClass); + tableConfig.setValue(HoodieTableConfig.BOOTSTRAP_INDEX_TYPE, BootstrapIndexType.fromClassName(bootstrapIndexClass).name()); } if (null != bootstrapIndexEnable) { @@ -1161,7 +1189,9 @@ public class HoodieTableMetaClient implements Serializable { tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS, Boolean.toString(populateMetaFields)); } if (null != keyGeneratorClassProp) { - tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, keyGeneratorClassProp); + tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, KeyGeneratorType.fromClassName(keyGeneratorClassProp).name()); + } else if (null != keyGeneratorType) { + tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, keyGeneratorType); } if (null != hiveStylePartitioningEnable) { tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, Boolean.toString(hiveStylePartitioningEnable)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java index 2dad6f97946..0b95bbabcb3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java @@ -19,8 +19,10 @@ package org.apache.hudi.common.util; import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodiePayloadProps; +import org.apache.hudi.common.model.RecordPayloadType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -77,13 +79,7 @@ public class ConfigUtils { * Get payload class. */ public static String getPayloadClass(Properties properties) { - String payloadClass = null; - if (properties.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key())) { - payloadClass = properties.getProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()); - } else if (properties.containsKey("hoodie.datasource.write.payload.class")) { - payloadClass = properties.getProperty("hoodie.datasource.write.payload.class"); - } - return payloadClass; + return RecordPayloadType.getPayloadClassName(new HoodieConfig(properties)); } public static List<String> split2List(String param) { diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java index 5434b4901c0..8d79acd7db1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java @@ -20,11 +20,17 @@ package org.apache.hudi.keygen.constant; import org.apache.hudi.common.config.EnumDescription; import org.apache.hudi.common.config.EnumFieldDescription; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_CLASS_NAME; +import static org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_TYPE; + /** * Types of {@link org.apache.hudi.keygen.KeyGenerator}. */ @@ -33,26 +39,81 @@ import java.util.List; public enum KeyGeneratorType { @EnumFieldDescription("Simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.") - SIMPLE, + SIMPLE("org.apache.hudi.keygen.SimpleKeyGenerator"), + @EnumFieldDescription("Simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.") + SIMPLE_AVRO("org.apache.hudi.keygen.SimpleAvroKeyGenerator"), @EnumFieldDescription("Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.") - COMPLEX, + COMPLEX("org.apache.hudi.keygen.ComplexKeyGenerator"), + @EnumFieldDescription("Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.") + COMPLEX_AVRO("org.apache.hudi.keygen.ComplexAvroKeyGenerator"), @EnumFieldDescription("Timestamp-based key generator, that relies on timestamps for partitioning field. Still picks record key by name.") - TIMESTAMP, + TIMESTAMP("org.apache.hudi.keygen.TimestampBasedKeyGenerator"), + @EnumFieldDescription("Timestamp-based key generator, that relies on timestamps for partitioning field. Still picks record key by name.") + TIMESTAMP_AVRO("org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator"), @EnumFieldDescription("This is a generic implementation type of KeyGenerator where users can configure record key as a single field or " + " a combination of fields. Similarly partition path can be configured to have multiple fields or only one field. " + " This KeyGenerator expects value for prop \"hoodie.datasource.write.partitionpath.field\" in a specific format. " + " For example: " + " properties.put(\"hoodie.datasource.write.partitionpath.field\", \"field1:PartitionKeyType1,field2:PartitionKeyType2\").") - CUSTOM, + CUSTOM("org.apache.hudi.keygen.CustomKeyGenerator"), + @EnumFieldDescription("This is a generic implementation type of KeyGenerator where users can configure record key as a single field or " + + " a combination of fields. Similarly partition path can be configured to have multiple fields or only one field. " + + " This KeyGenerator expects value for prop \"hoodie.datasource.write.partitionpath.field\" in a specific format. " + + " For example: " + + " properties.put(\"hoodie.datasource.write.partitionpath.field\", \"field1:PartitionKeyType1,field2:PartitionKeyType2\").") + CUSTOM_AVRO("org.apache.hudi.keygen.CustomAvroKeyGenerator"), @EnumFieldDescription("Simple Key generator for non-partitioned tables.") - NON_PARTITION, + NON_PARTITION("org.apache.hudi.keygen.NonpartitionedKeyGenerator"), + @EnumFieldDescription("Simple Key generator for non-partitioned tables.") + NON_PARTITION_AVRO("org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator"), @EnumFieldDescription("Key generator for deletes using global indices.") - GLOBAL_DELETE; + GLOBAL_DELETE("org.apache.hudi.keygen.GlobalDeleteKeyGenerator"), + @EnumFieldDescription("Key generator for deletes using global indices.") + GLOBAL_DELETE_AVRO("org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator"), + + @EnumFieldDescription("Automatic record key generation.") + AUTO_RECORD("org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator"), + @EnumFieldDescription("Automatic record key generation.") + AUTO_RECORD_AVRO("org.apache.hudi.keygen.AutoRecordGenWrapperAvroKeyGenerator"), + + @EnumFieldDescription("Custom key generator for the Hudi table metadata.") + HOODIE_TABLE_METADATA("org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator"), + + @EnumFieldDescription("Custom spark-sql specific KeyGenerator overriding behavior handling TimestampType partition values.") + SPARK_SQL("org.apache.spark.sql.hudi.command.SqlKeyGenerator"), + + @EnumFieldDescription("A KeyGenerator which use the uuid as the record key.") + SPARK_SQL_UUID("org.apache.spark.sql.hudi.command.UuidKeyGenerator"), + + @EnumFieldDescription("Meant to be used internally for the spark sql MERGE INTO command.") + SPARK_SQL_MERGE_INTO("org.apache.spark.sql.hudi.command.MergeIntoKeyGenerator"), + + @EnumFieldDescription("A test KeyGenerator for deltastreamer tests.") + STREAMER_TEST("org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator"); + + private final String className; + + KeyGeneratorType(String className) { + this.className = className; + } + + public String getClassName() { + return className; + } + + public static KeyGeneratorType fromClassName(String className) { + for (KeyGeneratorType type : KeyGeneratorType.values()) { + if (type.getClassName().equals(className)) { + return type; + } + } + throw new IllegalArgumentException("No KeyGeneratorType found for class name: " + className); + } public static List<String> getNames() { List<String> names = new ArrayList<>(KeyGeneratorType.values().length); @@ -60,4 +121,14 @@ public enum KeyGeneratorType { .forEach(x -> names.add(x.name())); return names; } + + @Nullable + public static String getKeyGeneratorClassName(HoodieConfig config) { + if (config.contains(KEY_GENERATOR_CLASS_NAME)) { + return config.getString(KEY_GENERATOR_CLASS_NAME); + } else if (config.contains(KEY_GENERATOR_TYPE)) { + return KeyGeneratorType.valueOf(config.getString(KEY_GENERATOR_TYPE)).getClassName(); + } + return null; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index d528c325b29..83816044acb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -134,6 +134,10 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab && !conf.contains(FlinkOptions.PAYLOAD_CLASS_NAME)) { conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME)); } + if (tableConfig.contains(HoodieTableConfig.PAYLOAD_TYPE) + && !conf.contains(FlinkOptions.PAYLOAD_CLASS_NAME)) { + conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, tableConfig.getPayloadClass()); + } }); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ddc9d55e50c..ff7f33d8beb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -47,8 +47,8 @@ import scala.language.implicitConversions */ /** - * Options supported for reading hoodie tables. - */ + * Options supported for reading hoodie tables. + */ object DataSourceReadOptions { val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" @@ -272,8 +272,8 @@ object DataSourceReadOptions { } /** - * Options supported for writing hoodie tables. - */ + * Options supported for writing hoodie tables. + */ object DataSourceWriteOptions { val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value @@ -325,11 +325,11 @@ object DataSourceWriteOptions { val SPARK_SQL_WRITES_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped"; /** - * May be derive partition path from incoming df if not explicitly set. - * - * @param optParams Parameters to be translated - * @return Parameters after translation - */ + * May be derive partition path from incoming df if not explicitly set. + * + * @param optParams Parameters to be translated + * @return Parameters after translation + */ def mayBeDerivePartitionPath(optParams: Map[String, String]): Map[String, String] = { var translatedOptParams = optParams // translate the api partitionBy of spark DataFrameWriter to PARTITIONPATH_FIELD @@ -384,6 +384,8 @@ object DataSourceWriteOptions { */ val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME + val PAYLOAD_TYPE = HoodieWriteConfig.WRITE_PAYLOAD_TYPE + /** * HoodieMerger will replace the payload to process the merge of data * and provide the same capabilities as the payload @@ -475,7 +477,7 @@ object DataSourceWriteOptions { .defaultValue("false") .markAdvanced() .withDocumentation("If set to true, records from the incoming dataframe will not overwrite existing records with the same key during the write operation. " + - "This config is deprecated as of 0.14.0. Please use hoodie.datasource.insert.dup.policy instead."); + "This config is deprecated as of 0.14.0. Please use hoodie.datasource.insert.dup.policy instead."); val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.partitions.to.delete") @@ -557,7 +559,7 @@ object DataSourceWriteOptions { .withValidValues(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY) .withDocumentation("When operation type is set to \"insert\", users can optionally enforce a dedup policy. This policy will be employed " + " when records being ingested already exists in storage. Default policy is none and no action will be taken. Another option is to choose " + - " \"drop\", on which matching records from incoming will be dropped and the rest will be ingested. Third option is \"fail\" which will " + + " \"drop\", on which matching records from incoming will be dropped and the rest will be ingested. Third option is \"fail\" which will " + "fail the write operation when same records are re-ingested. In other words, a given record as deduced by the key generation policy " + "can be ingested only once to the target table of interest.") @@ -904,7 +906,7 @@ object DataSourceOptionsHelper { private val log = LoggerFactory.getLogger(DataSourceOptionsHelper.getClass) // put all the configs with alternatives here - val allConfigsWithAlternatives = List( + private val allConfigsWithAlternatives = List( DataSourceReadOptions.QUERY_TYPE, DataSourceWriteOptions.TABLE_TYPE, HoodieTableConfig.BASE_FILE_FORMAT, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7828cc7ee5a..9231472a799 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -54,6 +54,7 @@ import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} +import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName import org.apache.hudi.keygen.{BaseKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} @@ -226,13 +227,21 @@ object HoodieSparkSqlWriter { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); + val payloadClass = + if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME))) + hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME) + else RecordPayloadType.getPayloadClassName(hoodieConfig) + val keyGenProp = + if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME))) + hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME) + else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig) HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setDatabaseName(databaseName) .setTableName(tblName) .setBaseFileFormat(baseFileFormat) .setArchiveLogFolder(archiveLogFolder) - .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) + .setPayloadClassName(payloadClass) // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, // but we are interested in what user has set, hence fetching from optParams. .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) @@ -241,7 +250,7 @@ object HoodieSparkSqlWriter { .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) - .setKeyGeneratorClassProp(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)) + .setKeyGeneratorClassProp(keyGenProp) .set(timestampKeyGeneratorConfigs) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) @@ -806,10 +815,14 @@ object HoodieSparkSqlWriter { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) + val payloadClass = + if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME))) + hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME) + else RecordPayloadType.getPayloadClassName(hoodieConfig) val keyGenProp = if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME))) hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME) - else hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig) val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters) val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse( HoodieTableConfig.POPULATE_META_FIELDS.key(), @@ -826,7 +839,7 @@ object HoodieSparkSqlWriter { .setTableName(tableName) .setRecordKeyFields(recordKeyFields) .setArchiveLogFolder(archiveLogFolder) - .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) + .setPayloadClassName(payloadClass) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) .setBootstrapIndexClass(bootstrapIndexClass) .setBaseFileFormat(baseFileFormat) @@ -1162,9 +1175,8 @@ object HoodieSparkSqlWriter { } } val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap) - if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) - && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) { - mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key) + if (mergedParams.contains(KEYGENERATOR_CLASS_NAME.key) && !mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_TYPE.key)) { + mergedParams(HoodieTableConfig.KEY_GENERATOR_TYPE.key) = KeyGeneratorType.fromClassName(mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)).name } // use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY if (mergedParams.contains(PRECOMBINE_FIELD.key())) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 5230c34984f..b2c44cc3330 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfigHolder +import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.util.SparkKeyGenUtils @@ -192,7 +193,7 @@ object HoodieWriterUtils { } val datasourceKeyGen = getOriginKeyGenerator(params) - val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + val tableConfigKeyGen = KeyGeneratorType.getKeyGeneratorClassName(tableConfig) if (null != datasourceKeyGen && null != tableConfigKeyGen && datasourceKeyGen != tableConfigKeyGen) { diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") @@ -228,7 +229,7 @@ object HoodieWriterUtils { val diffConfigs = StringBuilder.newBuilder if (null != tableConfig) { - val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + val tableConfigKeyGen = KeyGeneratorType.getKeyGeneratorClassName(tableConfig) if (null != tableConfigKeyGen && null != datasourceKeyGen) { val nonPartitionedTableConfig = tableConfigKeyGen.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName) val simpleKeyDataSourceConfig = datasourceKeyGen.equals(classOf[SimpleKeyGenerator].getCanonicalName) @@ -256,13 +257,14 @@ object HoodieWriterUtils { } } - val sparkDatasourceConfigsToTableConfigsMap = Map( + private val sparkDatasourceConfigsToTableConfigsMap = Map( TABLE_NAME -> HoodieTableConfig.NAME, TABLE_TYPE -> HoodieTableConfig.TYPE, PRECOMBINE_FIELD -> HoodieTableConfig.PRECOMBINE_FIELD, PARTITIONPATH_FIELD -> HoodieTableConfig.PARTITION_FIELDS, RECORDKEY_FIELD -> HoodieTableConfig.RECORDKEY_FIELDS, PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME, + PAYLOAD_TYPE -> HoodieTableConfig.PAYLOAD_TYPE, RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY ) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index a77a5dcbe2f..ee041e94b87 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -26,7 +26,7 @@ import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkArgument -import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper} import org.apache.spark.internal.Logging @@ -304,6 +304,10 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + } else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE.key)) { + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = + HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( + KeyGeneratorType.valueOf(originTableConfig(HoodieTableConfig.KEY_GENERATOR_TYPE.key)).getClassName) } else { val primaryKeys = table.properties.getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, table.storage.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)).toString val partitions = table.partitionColumnNames.mkString(",") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index abe98bb46cf..66c81ae331e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -69,6 +69,13 @@ object HoodieOptionConfig { .defaultValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue()) .build() + val SQL_PAYLOAD_TYPE: HoodieSQLOption[String] = buildConf() + .withSqlKey("payloadType") + .withHoodieKey(DataSourceWriteOptions.PAYLOAD_TYPE.key) + .withTableConfigKey(HoodieTableConfig.PAYLOAD_TYPE.key) + .defaultValue(DataSourceWriteOptions.PAYLOAD_TYPE.defaultValue()) + .build() + val SQL_RECORD_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf() .withSqlKey("recordMergerStrategy") .withHoodieKey(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key) @@ -193,7 +200,7 @@ object HoodieOptionConfig { // extract primaryKey, preCombineField, type options def extractSqlOptions(options: Map[String, String]): Map[String, String] = { val sqlOptions = mapTableConfigsToSqlOptions(options) - val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName) + val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName) -- Set(SQL_PAYLOAD_TYPE.sqlKeyName) sqlOptions.filterKeys(targetOptions.contains) } @@ -233,7 +240,7 @@ object HoodieOptionConfig { def makeOptionsCaseInsensitive(sqlOptions: Map[String, String]): Map[String, String] = { // Make Keys Case Insensitive val standardOptions = Seq(SQL_KEY_TABLE_PRIMARY_KEY, SQL_KEY_PRECOMBINE_FIELD, - SQL_KEY_TABLE_TYPE, SQL_PAYLOAD_CLASS, SQL_RECORD_MERGER_STRATEGY).map(key => key.sqlKeyName) + SQL_KEY_TABLE_TYPE, SQL_PAYLOAD_CLASS, SQL_RECORD_MERGER_STRATEGY, SQL_PAYLOAD_TYPE).map(key => key.sqlKeyName) sqlOptions.map(option => { standardOptions.find(x => x.toLowerCase().contains(option._1.toLowerCase())) match { diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java index 90ab2f9cbab..49059d20389 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.cli; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.table.HoodieTableConfig; @@ -41,6 +42,7 @@ import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.util.SparkKeyGenUtils; @@ -251,8 +253,8 @@ public class BootstrapExecutorUtils implements Serializable { URL_ENCODE_PARTITIONING.key(), Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue()))) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(props.getString( - TIMELINE_TIMEZONE.key(), - String.valueOf(TIMELINE_TIMEZONE.defaultValue())))) + TIMELINE_TIMEZONE.key(), + String.valueOf(TIMELINE_TIMEZONE.defaultValue())))) .setPartitionMetafileUseBaseFormat(props.getBoolean( PARTITION_METAFILE_USE_BASE_FORMAT.key(), PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) @@ -270,7 +272,7 @@ public class BootstrapExecutorUtils implements Serializable { } else if (StringUtils.nonEmpty(props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null))) { keyGenClass = HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props); } else { - keyGenClass = props.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new HoodieConfig(props)); } props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass); String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index bc3540ebf50..642592a6c9f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -24,7 +24,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat -import org.apache.hudi.keygen.SimpleKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} @@ -143,7 +143,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase { assertResult("dt")(tableConfig(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key)) assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key)) - assertResult(classOf[SimpleKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + assertResult(KeyGeneratorType.SIMPLE.name())(tableConfig(HoodieTableConfig.KEY_GENERATOR_TYPE.key)) assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) assertFalse(tableConfig.contains(OPERATION.key())) @@ -1204,8 +1204,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase { .setBasePath(tablePath) .setConf(spark.sessionState.newHadoopConf()) .build() - val realKeyGenerator = - metaClient.getTableConfig.getProps.asScala.toMap.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key).get + val realKeyGenerator = metaClient.getTableConfig.getKeyGeneratorClassName assertResult(targetGenerator)(realKeyGenerator) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala index 43fcb79ecf9..985300c44c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.testutils.SparkClientFunctionalTestHarness @@ -31,17 +32,19 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness { def testWithDefaultSqlOptions(): Unit = { val ops1 = Map("primaryKey" -> "id") val with1 = HoodieOptionConfig.withDefaultSqlOptions(ops1) - assertTrue(with1.size == 4) + assertTrue(with1.size == 5) assertTrue(with1("primaryKey") == "id") assertTrue(with1("type") == "cow") assertTrue(with1("payloadClass") == classOf[OverwriteWithLatestAvroPayload].getName) assertTrue(with1("recordMergerStrategy") == HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID) + assertTrue(with1("payloadType") == DataSourceWriteOptions.PAYLOAD_TYPE.defaultValue) val ops2 = Map("primaryKey" -> "id", "preCombineField" -> "timestamp", "type" -> "mor", "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName, - "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID + "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID, + "payloadType" -> DataSourceWriteOptions.PAYLOAD_TYPE.defaultValue ) val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2) assertTrue(ops2 == with2) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala index eaf977e82d1..15a9ed675c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala @@ -118,7 +118,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { |[hoodie.datasource.write.partitionpath.urlencode,false,null] |[hoodie.table.checksum,,] |[hoodie.table.create.schema,,] - |[hoodie.table.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator,null] + |[hoodie.table.keygenerator.type,NON_PARTITION,null] |[hoodie.table.name,,] |[hoodie.table.precombine.field,ts,null] |[hoodie.table.recordkey.fields,id,null] diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 9c708144931..96d888f5f07 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -26,7 +26,6 @@ import org.apache.hudi.HoodieSparkUtils$; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.DFSPropertiesConfiguration; -import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.LockConfiguration; @@ -104,7 +103,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.kafka.common.errors.TopicExistsException; -import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.AnalysisException; @@ -160,6 +158,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; import static org.apache.hudi.utilities.UtilHelpers.EXECUTE; import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE; import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE; +import static org.apache.hudi.utilities.UtilHelpers.createMetaClient; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommitsAfterRollback; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; @@ -378,8 +377,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } @Test - public void testPropsWithInvalidKeyGenerator() throws Exception { - Exception e = assertThrows(SparkException.class, () -> { + public void testPropsWithInvalidKeyGenerator() { + Exception e = assertThrows(IllegalArgumentException.class, () -> { String tableBasePath = basePath + "/test_table_invalid_key_gen"; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, @@ -388,7 +387,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { }, "Should error out when setting the key generator class property to an invalid value"); // expected LOG.debug("Expected error during getting the key generator", e); - assertTrue(e.getMessage().contains("Could not load key generator class invalid")); + assertTrue(e.getMessage().contains("No KeyGeneratorType found for class name")); } private static Stream<Arguments> provideInferKeyGenArgs() { @@ -1395,7 +1394,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } @Test - public void testNullSchemaProvider() throws Exception { + public void testNullSchemaProvider() { String tableBasePath = basePath + "/test_table"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, @@ -1423,14 +1422,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()); //now assert that hoodie.properties file now has updated payload class name - Properties props = new Properties(); - String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties"; - FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()); - try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) { - props.load(inputStream); - } - - assertEquals(new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), DummyAvroPayload.class.getName()); + HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath, false); + assertEquals(metaClient.getTableConfig().getPayloadClass(), DummyAvroPayload.class.getName()); } @Test @@ -1443,13 +1436,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(1000, dataSetBasePath, sqlContext); //now assert that hoodie.properties file now has updated payload class name - Properties props = new Properties(); - String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties"; - FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()); - try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) { - props.load(inputStream); - } - assertEquals(new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), PartialUpdateAvroPayload.class.getName()); + HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath, false); + assertEquals(metaClient.getTableConfig().getPayloadClass(), PartialUpdateAvroPayload.class.getName()); } @Test @@ -2588,13 +2576,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } } - public static class TestTableLevelGenerator extends SimpleKeyGenerator { - - public TestTableLevelGenerator(TypedProperties props) { - super(props); - } - } - public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload { public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java index a8ee0c694fd..26ea61e31fe 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java @@ -263,7 +263,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa switch (tableExecutionContext.getTableName()) { case "dummy_table_short_trip": String tableLevelKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()); - assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), tableLevelKeyGeneratorClass); + assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), tableLevelKeyGeneratorClass); List<String> transformerClass = tableExecutionContext.getConfig().transformerClassNames; assertEquals(1, transformerClass.size()); assertEquals("org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer", transformerClass.get(0)); diff --git a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties index 25b392d580a..d415e19eb20 100644 --- a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties +++ b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties @@ -22,7 +22,7 @@ hoodie.deltastreamer.source.kafka.topic=topic2 hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table -hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestTableLevelGenerator +hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/ hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest hoodie.deltastreamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer