This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7da379b6597fff52aca63adb90d902513ee2b2fd Author: XuQianJin-Stars <[email protected]> AuthorDate: Thu Jan 12 13:15:07 2023 +0800 [HUDI-5495] add some property to table config --- .../org/apache/hudi/config/HoodieIndexConfig.java | 4 ++-- .../hudi/common/table/HoodieTableConfig.java | 28 ++++++++++++++++++++-- .../hudi/common/table/HoodieTableMetaClient.java | 25 ++++++++++++++----- .../hudi/common/table/TestHoodieTableConfig.java | 6 +++-- .../apache/hudi/configuration/FlinkOptions.java | 10 ++++++++ .../java/org/apache/hudi/util/StreamerUtil.java | 3 ++- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 3 ++- 7 files changed, 65 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 23375dd4952..de3eae8fac9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -71,7 +71,7 @@ public class HoodieIndexConfig extends HoodieConfig { SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name()) .withDocumentation("Type of index to use. Default is SIMPLE on Spark engine, " + "and INMEMORY on Flink and Java engines. " - + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " + + "Possible options are [BLOOM | GLOBAL_BLOOM | SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " + "Bloom filters removes the dependency on a external system " + "and is stored in the footer of the Parquet Data Files"); @@ -245,7 +245,7 @@ public class HoodieIndexConfig extends HoodieConfig { * 1. Bucket num will auto-adjust by running clustering (still in progress) */ public static final ConfigProperty<String> BUCKET_INDEX_ENGINE_TYPE = ConfigProperty - .key("hoodie.index.bucket.engine") + .key(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE.key()) .defaultValue("SIMPLE") .sinceVersion("0.11.0") .withDocumentation("Type of bucket index engine to use. Default is SIMPLE bucket index, with fixed number of bucket." diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index ee0829944bc..878f3196750 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -218,10 +218,30 @@ public class HoodieTableConfig extends HoodieConfig { .noDefaultValue() .withDocumentation("Type of index to use. Default is SIMPLE on Spark engine, " + "and INMEMORY on Flink and Java engines. " - + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " + + "Possible options are [BLOOM | GLOBAL_BLOOM | SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " + "Bloom filters removes the dependency on a external system " + "and is stored in the footer of the Parquet Data Files"); + /** + * Bucket Index Engine Type: implementation of bucket index + * + * SIMPLE: + * 0. Check `HoodieSimpleBucketLayout` for its supported operations. + * 1. Bucket num is fixed and requires rewriting the partition if we want to change it. + * + * CONSISTENT_HASHING: + * 0. Check `HoodieConsistentBucketLayout` for its supported operations. + * 1. Bucket num will auto-adjust by running clustering (still in progress) + */ + public static final ConfigProperty<String> BUCKET_INDEX_ENGINE_TYPE = ConfigProperty + .key("hoodie.index.bucket.engine") + .defaultValue("SIMPLE") + .sinceVersion("0.11.0") + .withDocumentation("Type of bucket index engine to use. Default is SIMPLE bucket index, with fixed number of bucket." + + "Possible options are [SIMPLE | CONSISTENT_HASHING]." + + "Consistent hashing supports dynamic resizing of the number of bucket, solving potential data skew and file size " + + "issues of the SIMPLE hashing engine. Consistent hashing only works with MOR tables, only use simple hashing on COW tables."); + public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigProperty .key("hoodie.bucket.index.num.buckets") .defaultValue(4) @@ -679,7 +699,11 @@ public class HoodieTableConfig extends HoodieConfig { return getString(INDEX_TYPE); } - public String getIndexKeys() { + public String getIndexEngineType() { + return getString(BUCKET_INDEX_ENGINE_TYPE); + } + + public String getIndexHashField() { return getString(BUCKET_INDEX_HASH_FIELD); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 990142f496c..e0d6b346bd1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -776,7 +776,9 @@ public class HoodieTableMetaClient implements Serializable { private String indexType; - private String indexKeys; + private String indexBucketEngine; + + private String indexHashField; private Integer bucketIndexNumBuckets; @@ -923,8 +925,13 @@ public class HoodieTableMetaClient implements Serializable { return this; } - public PropertyBuilder setIndexKeys(String indexKeys) { - this.indexKeys = indexKeys; + public PropertyBuilder setIndexBucketEngine(String indexBucketEngine) { + this.indexBucketEngine = indexBucketEngine; + return this; + } + + public PropertyBuilder setIndexHashField(String indexHashField) { + this.indexHashField = indexHashField; return this; } @@ -1047,8 +1054,11 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.INDEX_TYPE)) { setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE)); } + if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE)) { + setIndexBucketEngine(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE)); + } if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)) { - setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)); + setIndexHashField(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)); } if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)) { setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)); @@ -1150,8 +1160,11 @@ public class HoodieTableMetaClient implements Serializable { if (null != indexType) { tableConfig.setValue(HoodieTableConfig.INDEX_TYPE, indexType); } - if (null != indexKeys) { - tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, indexKeys); + if (null != indexBucketEngine) { + tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE, indexBucketEngine); + } + if (null != indexHashField) { + tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, indexHashField); } if (null != bucketIndexNumBuckets) { tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS, Integer.toString(bucketIndexNumBuckets)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java index bf339d94bb2..86d5521cc26 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java @@ -140,6 +140,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { Properties updatedProps = new Properties(); updatedProps.setProperty(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "true"); updatedProps.setProperty(HoodieTableConfig.INDEX_TYPE.key(), "BUCKET"); + updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE.key(), "CONSISTENT_HASHING"); updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid"); updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS.key(), "8"); HoodieTableConfig.update(fs, metaPath, updatedProps); @@ -147,10 +148,11 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertTrue(fs.exists(cfgPath)); assertFalse(fs.exists(backupCfgPath)); HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null); - assertEquals(10, config.getProps().size()); + assertEquals(11, config.getProps().size()); assertEquals(true, config.getAllowOperationMetadataField()); assertEquals("BUCKET", config.getIndexType()); - assertEquals("uuid", config.getIndexKeys()); + assertEquals("CONSISTENT_HASHING", config.getIndexEngineType()); + assertEquals("uuid", config.getIndexHashField()); assertEquals(8, config.getIndexNumBuckets()); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 420b2c8d9cf..68f22c6f7e7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -369,6 +369,16 @@ public class FlinkOptions extends HoodieConfig { + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using " + "the dot notation eg: `a.b.c`"); + + public static final ConfigOption<String> BUCKET_INDEX_ENGINE_TYPE = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()) + .stringType() + .defaultValue("SIMPLE") + .withDescription("Type of bucket index engine to use. Default is SIMPLE bucket index, with fixed number of bucket." + + "Possible options are [SIMPLE | CONSISTENT_HASHING]." + + "Consistent hashing supports dynamic resizing of the number of bucket, solving potential data skew and file size " + + "issues of the SIMPLE hashing engine. Consistent hashing only works with MOR tables, only use simple hashing on COW tables."); + public static final ConfigOption<String> INDEX_KEY_FIELD = ConfigOptions .key(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()) .stringType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index bb41aafbfcf..02cb7d02d2d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -325,7 +325,8 @@ public class StreamerUtil { .setTimelineLayoutVersion(1) .setAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .setIndexType(conf.getString(FlinkOptions.INDEX_TYPE)) - .setIndexKeys(conf.getString(FlinkOptions.INDEX_KEY_FIELD)) + .setIndexBucketEngine(conf.getString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE)) + .setIndexHashField(conf.getString(FlinkOptions.INDEX_KEY_FIELD)) .setIndexNumBuckets(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)) .initTable(hadoopConf, basePath); LOG.info("Table initialized under base path {}", basePath); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b4e46b7f277..7123eda3f44 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -172,7 +172,8 @@ object HoodieSparkSqlWriter { .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE)) - .setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)) + .setIndexBucketEngine(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE)) + .setIndexHashField(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)) .setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)) .initTable(sparkContext.hadoopConfiguration, path) tableConfig = tableMetaClient.getTableConfig
