This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 283b25a8a6ba3e3fcc7f35815b43afdf7a86b22b Author: XuQianJin-Stars <[email protected]> AuthorDate: Mon Jan 2 12:37:39 2023 +0800 [MINOR] add some property to table config --- .../org/apache/hudi/config/HoodieIndexConfig.java | 16 ++++--- .../org/apache/hudi/config/HoodieWriteConfig.java | 2 +- .../hudi/common/table/HoodieTableConfig.java | 46 ++++++++++++++++++ .../hudi/common/table/HoodieTableMetaClient.java | 55 ++++++++++++++++++++++ .../hudi/common/table/TestHoodieTableConfig.java | 19 ++++++++ .../java/org/apache/hudi/util/StreamerUtil.java | 11 +++++ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 13 ++++- 7 files changed, 153 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index b5edaf4abcd..23375dd4952 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -63,7 +64,7 @@ import static org.apache.hudi.index.HoodieIndex.IndexType.SIMPLE; public class HoodieIndexConfig extends HoodieConfig { public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty - .key("hoodie.index.type") + .key(HoodieTableConfig.INDEX_TYPE.key()) // Builder#getDefaultIndexType has already set it according to engine type .noDefaultValue() .withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(), @@ -259,13 +260,13 @@ public class HoodieIndexConfig extends HoodieConfig { * In dynamic bucket index cases (e.g., using CONSISTENT_HASHING), this config of number of bucket serves as a initial bucket size */ public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigProperty - .key("hoodie.bucket.index.num.buckets") + .key(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS.key()) .defaultValue(256) .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + "and each partition is divided to N buckets."); public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty - .key("hoodie.bucket.index.hash.field") + .key(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD.key()) .noDefaultValue() .withDocumentation("Index key. It is used to index the record and find its file group. " + "If not set, use record key field as default"); @@ -682,12 +683,15 @@ public class HoodieIndexConfig extends HoodieConfig { hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME)); } else { + String recordkeyField = hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME); + String indexHashField = hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD); boolean valid = Arrays - .stream(hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(",")) + .stream(recordkeyField.split(",")) .collect(Collectors.toSet()) - .containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(","))); + .containsAll(Arrays.asList(indexHashField.split(","))); if (!valid) { - throw new HoodieIndexException("Bucket index key (if configured) must be subset of record key."); + throw new HoodieIndexException("Bucket index key (if configured) must be subset of record key." + + " Bucket index key: " + indexHashField + " record key: " + recordkeyField); } } // check the bucket num diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 103123f9801..8b00655a0bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -463,7 +463,7 @@ public class HoodieWriteConfig extends HoodieConfig { + "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data"); public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = ConfigProperty - .key("hoodie.allow.operation.metadata.field") + .key(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD.key()) .defaultValue(false) .sinceVersion("0.9.0") .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. " diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 85f970e7ec4..ee0829944bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -204,6 +204,36 @@ public class HoodieTableConfig extends HoodieConfig { .defaultValue(false) .withDocumentation("When set to true, will not write the partition columns into hudi. By default, false."); + + public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = ConfigProperty + .key("hoodie.allow.operation.metadata.field") + .defaultValue(false) + .sinceVersion("0.9.0") + .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. " + + "Once enabled, all the changes of a record are persisted to the delta log directly without merge"); + + public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty + .key("hoodie.index.type") + // Builder#getDefaultIndexType has already set it according to engine type + .noDefaultValue() + .withDocumentation("Type of index to use. Default is SIMPLE on Spark engine, " + + "and INMEMORY on Flink and Java engines. " + + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " + + "Bloom filters removes the dependency on a external system " + + "and is stored in the footer of the Parquet Data Files"); + + public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigProperty + .key("hoodie.bucket.index.num.buckets") + .defaultValue(4) + .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + + "and each partition is divided to N buckets."); + + public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty + .key("hoodie.bucket.index.hash.field") + .noDefaultValue() + .withDocumentation("Index key. It is used to index the record and find its file group. " + + "If not set, use record key field as default"); + public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; @@ -641,6 +671,22 @@ public class HoodieTableConfig extends HoodieConfig { return Option.empty(); } + public Boolean getAllowOperationMetadataField() { + return getBoolean(ALLOW_OPERATION_METADATA_FIELD); + } + + public String getIndexType() { + return getString(INDEX_TYPE); + } + + public String getIndexKeys() { + return getString(BUCKET_INDEX_HASH_FIELD); + } + + public Integer getIndexNumBuckets() { + return getInt(BUCKET_INDEX_NUM_BUCKETS); + } + public Map<String, String> propsMap() { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 16dd373486f..39f27f4160f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -743,6 +743,14 @@ public class HoodieTableMetaClient implements Serializable { private String metadataPartitions; private String inflightMetadataPartitions; + private Boolean allowOperationMetadataField; + + private String indexType; + + private String indexKeys; + + private Integer bucketIndexNumBuckets; + /** * Persist the configs that is written at the first time, and should not be changed. * Like KeyGenerator's configs. @@ -876,6 +884,26 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setAllowOperationMetadataField(boolean allowOperationMetadataField) { + this.allowOperationMetadataField = allowOperationMetadataField; + return this; + } + + public PropertyBuilder setIndexType(String indexType) { + this.indexType = indexType; + return this; + } + + public PropertyBuilder setIndexKeys(String indexKeys) { + this.indexKeys = indexKeys; + return this; + } + + public PropertyBuilder setIndexNumBuckets(Integer bucketIndexNumBuckets) { + this.bucketIndexNumBuckets = bucketIndexNumBuckets; + return this; + } + private void set(String key, Object value) { if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) { this.others.put(key, value); @@ -982,6 +1010,20 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) { setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)); } + if (hoodieConfig.contains(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD)) { + setAllowOperationMetadataField( + hoodieConfig.getBoolean(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD)); + } + // At present, the main consideration is bucket index + if (hoodieConfig.contains(HoodieTableConfig.INDEX_TYPE)) { + setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE)); + } + if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)) { + setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)); + } + if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)) { + setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)); + } return this; } @@ -1072,6 +1114,19 @@ public class HoodieTableMetaClient implements Serializable { if (null != inflightMetadataPartitions) { tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions); } + if (null != allowOperationMetadataField) { + tableConfig.setValue( + HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD, Boolean.toString(allowOperationMetadataField)); + } + if (null != indexType) { + tableConfig.setValue(HoodieTableConfig.INDEX_TYPE, indexType); + } + if (null != indexKeys) { + tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, indexKeys); + } + if (null != bucketIndexNumBuckets) { + tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS, Integer.toString(bucketIndexNumBuckets)); + } return tableConfig.getProps(); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java index 0defefe2ea4..bf339d94bb2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java @@ -134,4 +134,23 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { config = new HoodieTableConfig(fs, metaPath.toString(), null); assertEquals(6, config.getProps().size()); } + + @Test + public void testIndexProps() throws IOException { + Properties updatedProps = new Properties(); + updatedProps.setProperty(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "true"); + updatedProps.setProperty(HoodieTableConfig.INDEX_TYPE.key(), "BUCKET"); + updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid"); + updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS.key(), "8"); + HoodieTableConfig.update(fs, metaPath, updatedProps); + + assertTrue(fs.exists(cfgPath)); + assertFalse(fs.exists(backupCfgPath)); + HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null); + assertEquals(10, config.getProps().size()); + assertEquals(true, config.getAllowOperationMetadataField()); + assertEquals("BUCKET", config.getIndexType()); + assertEquals("uuid", config.getIndexKeys()); + assertEquals(8, config.getIndexNumBuckets()); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 895002770fe..bb41aafbfcf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -54,6 +54,7 @@ import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.schema.FilebasedSchemaProvider; import org.apache.hudi.sink.transform.ChainedTransformer; @@ -302,6 +303,12 @@ public class StreamerUtil { org.apache.hadoop.conf.Configuration hadoopConf) throws IOException { final String basePath = conf.getString(FlinkOptions.PATH); if (!tableExists(basePath, hadoopConf)) { + if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) { + if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) { + conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD)); + } + } + HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)) .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) @@ -316,6 +323,10 @@ public class StreamerUtil { .setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING)) .setTimelineLayoutVersion(1) + .setAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) + .setIndexType(conf.getString(FlinkOptions.INDEX_TYPE)) + .setIndexKeys(conf.getString(FlinkOptions.INDEX_KEY_FIELD)) + .setIndexNumBuckets(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)) .initTable(hadoopConf, basePath); LOG.info("Table initialized under base path {}", basePath); return metaClient; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 61cb7ef9614..b4e46b7f277 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -37,7 +37,7 @@ import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows} import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} -import org.apache.hudi.index.SparkHoodieIndexFactory +import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory} import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter @@ -142,7 +142,13 @@ object HoodieSparkSqlWriter { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) - val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); + val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT) + + if (hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name)) { + if (hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD).isEmpty) { + hoodieConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, recordKeyFields) + } + } val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) @@ -165,6 +171,9 @@ object HoodieSparkSqlWriter { .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) + .setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE)) + .setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)) + .setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)) .initTable(sparkContext.hadoopConfiguration, path) tableConfig = tableMetaClient.getTableConfig }
