This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7da379b6597fff52aca63adb90d902513ee2b2fd
Author: XuQianJin-Stars <[email protected]>
AuthorDate: Thu Jan 12 13:15:07 2023 +0800

    [HUDI-5495] add some property to table config
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  4 ++--
 .../hudi/common/table/HoodieTableConfig.java       | 28 ++++++++++++++++++++--
 .../hudi/common/table/HoodieTableMetaClient.java   | 25 ++++++++++++++-----
 .../hudi/common/table/TestHoodieTableConfig.java   |  6 +++--
 .../apache/hudi/configuration/FlinkOptions.java    | 10 ++++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    |  3 ++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  3 ++-
 7 files changed, 65 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 23375dd4952..de3eae8fac9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -71,7 +71,7 @@ public class HoodieIndexConfig extends HoodieConfig {
           SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
       .withDocumentation("Type of index to use. Default is SIMPLE on Spark 
engine, "
           + "and INMEMORY on Flink and Java engines. "
-          + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | 
GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
+          + "Possible options are [BLOOM | GLOBAL_BLOOM | SIMPLE | 
GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
           + "Bloom filters removes the dependency on a external system "
           + "and is stored in the footer of the Parquet Data Files");
 
@@ -245,7 +245,7 @@ public class HoodieIndexConfig extends HoodieConfig {
    *  1. Bucket num will auto-adjust by running clustering (still in progress)
    */
   public static final ConfigProperty<String> BUCKET_INDEX_ENGINE_TYPE = 
ConfigProperty
-      .key("hoodie.index.bucket.engine")
+      .key(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE.key())
       .defaultValue("SIMPLE")
       .sinceVersion("0.11.0")
       .withDocumentation("Type of bucket index engine to use. Default is 
SIMPLE bucket index, with fixed number of bucket."
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index ee0829944bc..878f3196750 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -218,10 +218,30 @@ public class HoodieTableConfig extends HoodieConfig {
       .noDefaultValue()
       .withDocumentation("Type of index to use. Default is SIMPLE on Spark 
engine, "
           + "and INMEMORY on Flink and Java engines. "
-          + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | 
GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
+          + "Possible options are [BLOOM | GLOBAL_BLOOM | SIMPLE | 
GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
           + "Bloom filters removes the dependency on a external system "
           + "and is stored in the footer of the Parquet Data Files");
 
+  /**
+   * Bucket Index Engine Type: implementation of bucket index
+   *
+   * SIMPLE:
+   *  0. Check `HoodieSimpleBucketLayout` for its supported operations.
+   *  1. Bucket num is fixed and requires rewriting the partition if we want 
to change it.
+   *
+   * CONSISTENT_HASHING:
+   *  0. Check `HoodieConsistentBucketLayout` for its supported operations.
+   *  1. Bucket num will auto-adjust by running clustering (still in progress)
+   */
+  public static final ConfigProperty<String> BUCKET_INDEX_ENGINE_TYPE = 
ConfigProperty
+      .key("hoodie.index.bucket.engine")
+      .defaultValue("SIMPLE")
+      .sinceVersion("0.11.0")
+      .withDocumentation("Type of bucket index engine to use. Default is 
SIMPLE bucket index, with fixed number of bucket."
+          + "Possible options are [SIMPLE | CONSISTENT_HASHING]."
+          + "Consistent hashing supports dynamic resizing of the number of 
bucket, solving potential data skew and file size "
+          + "issues of the SIMPLE hashing engine. Consistent hashing only 
works with MOR tables, only use simple hashing on COW tables.");
+
   public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = 
ConfigProperty
       .key("hoodie.bucket.index.num.buckets")
       .defaultValue(4)
@@ -679,7 +699,11 @@ public class HoodieTableConfig extends HoodieConfig {
     return getString(INDEX_TYPE);
   }
 
-  public String getIndexKeys() {
+  public String getIndexEngineType() {
+    return getString(BUCKET_INDEX_ENGINE_TYPE);
+  }
+
+  public String getIndexHashField() {
     return getString(BUCKET_INDEX_HASH_FIELD);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 990142f496c..e0d6b346bd1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -776,7 +776,9 @@ public class HoodieTableMetaClient implements Serializable {
 
     private String indexType;
 
-    private String indexKeys;
+    private String indexBucketEngine;
+
+    private String indexHashField;
 
     private Integer bucketIndexNumBuckets;
 
@@ -923,8 +925,13 @@ public class HoodieTableMetaClient implements Serializable 
{
       return this;
     }
 
-    public PropertyBuilder setIndexKeys(String indexKeys) {
-      this.indexKeys = indexKeys;
+    public PropertyBuilder setIndexBucketEngine(String indexBucketEngine) {
+      this.indexBucketEngine = indexBucketEngine;
+      return this;
+    }
+
+    public PropertyBuilder setIndexHashField(String indexHashField) {
+      this.indexHashField = indexHashField;
       return this;
     }
 
@@ -1047,8 +1054,11 @@ public class HoodieTableMetaClient implements 
Serializable {
       if (hoodieConfig.contains(HoodieTableConfig.INDEX_TYPE)) {
         setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE));
       }
+      if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE)) {
+        
setIndexBucketEngine(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE));
+      }
       if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)) {
-        
setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD));
+        
setIndexHashField(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD));
       }
       if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)) {
         
setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS));
@@ -1150,8 +1160,11 @@ public class HoodieTableMetaClient implements 
Serializable {
       if (null != indexType) {
         tableConfig.setValue(HoodieTableConfig.INDEX_TYPE, indexType);
       }
-      if (null != indexKeys) {
-        tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, 
indexKeys);
+      if (null != indexBucketEngine) {
+        tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE, 
indexBucketEngine);
+      }
+      if (null != indexHashField) {
+        tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, 
indexHashField);
       }
       if (null != bucketIndexNumBuckets) {
         tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS, 
Integer.toString(bucketIndexNumBuckets));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index bf339d94bb2..86d5521cc26 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -140,6 +140,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     Properties updatedProps = new Properties();
     
updatedProps.setProperty(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD.key(),
 "true");
     updatedProps.setProperty(HoodieTableConfig.INDEX_TYPE.key(), "BUCKET");
+    updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE.key(), 
"CONSISTENT_HASHING");
     updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD.key(), 
"uuid");
     updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 
"8");
     HoodieTableConfig.update(fs, metaPath, updatedProps);
@@ -147,10 +148,11 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     assertTrue(fs.exists(cfgPath));
     assertFalse(fs.exists(backupCfgPath));
     HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), 
null);
-    assertEquals(10, config.getProps().size());
+    assertEquals(11, config.getProps().size());
     assertEquals(true, config.getAllowOperationMetadataField());
     assertEquals("BUCKET", config.getIndexType());
-    assertEquals("uuid", config.getIndexKeys());
+    assertEquals("CONSISTENT_HASHING", config.getIndexEngineType());
+    assertEquals("uuid", config.getIndexHashField());
     assertEquals(8, config.getIndexNumBuckets());
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 420b2c8d9cf..68f22c6f7e7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -369,6 +369,16 @@ public class FlinkOptions extends HoodieConfig {
           + "Actual value will be obtained by invoking .toString() on the 
field value. Nested fields can be specified using "
           + "the dot notation eg: `a.b.c`");
 
+
+  public static final ConfigOption<String> BUCKET_INDEX_ENGINE_TYPE = 
ConfigOptions
+      .key(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
+      .stringType()
+      .defaultValue("SIMPLE")
+      .withDescription("Type of bucket index engine to use. Default is SIMPLE 
bucket index, with fixed number of bucket."
+          + "Possible options are [SIMPLE | CONSISTENT_HASHING]."
+          + "Consistent hashing supports dynamic resizing of the number of 
bucket, solving potential data skew and file size "
+          + "issues of the SIMPLE hashing engine. Consistent hashing only 
works with MOR tables, only use simple hashing on COW tables.");
+
   public static final ConfigOption<String> INDEX_KEY_FIELD = ConfigOptions
       .key(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key())
       .stringType()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index bb41aafbfcf..02cb7d02d2d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -325,7 +325,8 @@ public class StreamerUtil {
           .setTimelineLayoutVersion(1)
           
.setAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
           .setIndexType(conf.getString(FlinkOptions.INDEX_TYPE))
-          .setIndexKeys(conf.getString(FlinkOptions.INDEX_KEY_FIELD))
+          
.setIndexBucketEngine(conf.getString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE))
+          .setIndexHashField(conf.getString(FlinkOptions.INDEX_KEY_FIELD))
           
.setIndexNumBuckets(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS))
           .initTable(hadoopConf, basePath);
       LOG.info("Table initialized under base path {}", basePath);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b4e46b7f277..7123eda3f44 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -172,7 +172,8 @@ object HoodieSparkSqlWriter {
           
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
           
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
           .setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE))
-          
.setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD))
+          
.setIndexBucketEngine(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_ENGINE_TYPE))
+          
.setIndexHashField(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD))
           
.setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS))
           .initTable(sparkContext.hadoopConfiguration, path)
         tableConfig = tableMetaClient.getTableConfig

Reply via email to