This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 283b25a8a6ba3e3fcc7f35815b43afdf7a86b22b
Author: XuQianJin-Stars <[email protected]>
AuthorDate: Mon Jan 2 12:37:39 2023 +0800

    [MINOR] add some property to table config
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 16 ++++---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 +-
 .../hudi/common/table/HoodieTableConfig.java       | 46 ++++++++++++++++++
 .../hudi/common/table/HoodieTableMetaClient.java   | 55 ++++++++++++++++++++++
 .../hudi/common/table/TestHoodieTableConfig.java   | 19 ++++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    | 11 +++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 13 ++++-
 7 files changed, 153 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index b5edaf4abcd..23375dd4952 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -63,7 +64,7 @@ import static 
org.apache.hudi.index.HoodieIndex.IndexType.SIMPLE;
 public class HoodieIndexConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
-      .key("hoodie.index.type")
+      .key(HoodieTableConfig.INDEX_TYPE.key())
       // Builder#getDefaultIndexType has already set it according to engine 
type
       .noDefaultValue()
       .withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), 
GLOBAL_BLOOM.name(),
@@ -259,13 +260,13 @@ public class HoodieIndexConfig extends HoodieConfig {
    * In dynamic bucket index cases (e.g., using CONSISTENT_HASHING), this 
config of number of bucket serves as a initial bucket size
    */
   public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = 
ConfigProperty
-      .key("hoodie.bucket.index.num.buckets")
+      .key(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS.key())
       .defaultValue(256)
       .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
           + "and each partition is divided to N buckets.");
 
   public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = 
ConfigProperty
-      .key("hoodie.bucket.index.hash.field")
+      .key(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD.key())
       .noDefaultValue()
       .withDocumentation("Index key. It is used to index the record and find 
its file group. "
           + "If not set, use record key field as default");
@@ -682,12 +683,15 @@ public class HoodieIndexConfig extends HoodieConfig {
           hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD,
               
hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
         } else {
+          String recordkeyField = 
hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME);
+          String indexHashField = 
hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD);
           boolean valid = Arrays
-              
.stream(hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
+              .stream(recordkeyField.split(","))
               .collect(Collectors.toSet())
-              
.containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(",")));
+              .containsAll(Arrays.asList(indexHashField.split(",")));
           if (!valid) {
-            throw new HoodieIndexException("Bucket index key (if configured) 
must be subset of record key.");
+            throw new HoodieIndexException("Bucket index key (if configured) 
must be subset of record key."
+                + " Bucket index key: " + indexHashField + " record key: " + 
recordkeyField);
           }
         }
         // check the bucket num
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 103123f9801..8b00655a0bd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -463,7 +463,7 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "It's useful in cases where extra metadata needs to be published 
regardless e.g tracking source offsets when ingesting data");
 
   public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = 
ConfigProperty
-      .key("hoodie.allow.operation.metadata.field")
+      .key(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD.key())
       .defaultValue(false)
       .sinceVersion("0.9.0")
       .withDocumentation("Whether to include '_hoodie_operation' in the 
metadata fields. "
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 85f970e7ec4..ee0829944bc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -204,6 +204,36 @@ public class HoodieTableConfig extends HoodieConfig {
       .defaultValue(false)
       .withDocumentation("When set to true, will not write the partition 
columns into hudi. By default, false.");
 
+
+  public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = 
ConfigProperty
+      .key("hoodie.allow.operation.metadata.field")
+      .defaultValue(false)
+      .sinceVersion("0.9.0")
+      .withDocumentation("Whether to include '_hoodie_operation' in the 
metadata fields. "
+          + "Once enabled, all the changes of a record are persisted to the 
delta log directly without merge");
+
+  public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
+      .key("hoodie.index.type")
+      // Builder#getDefaultIndexType has already set it according to engine 
type
+      .noDefaultValue()
+      .withDocumentation("Type of index to use. Default is SIMPLE on Spark 
engine, "
+          + "and INMEMORY on Flink and Java engines. "
+          + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | 
GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
+          + "Bloom filters removes the dependency on a external system "
+          + "and is stored in the footer of the Parquet Data Files");
+
+  public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = 
ConfigProperty
+      .key("hoodie.bucket.index.num.buckets")
+      .defaultValue(4)
+      .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
+          + "and each partition is divided to N buckets.");
+
+  public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = 
ConfigProperty
+      .key("hoodie.bucket.index.hash.field")
+      .noDefaultValue()
+      .withDocumentation("Index key. It is used to index the record and find 
its file group. "
+          + "If not set, use record key field as default");
+
   public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = 
KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
   public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = 
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
 
@@ -641,6 +671,22 @@ public class HoodieTableConfig extends HoodieConfig {
     return Option.empty();
   }
 
+  public Boolean getAllowOperationMetadataField() {
+    return getBoolean(ALLOW_OPERATION_METADATA_FIELD);
+  }
+
+  public String getIndexType() {
+    return getString(INDEX_TYPE);
+  }
+
+  public String getIndexKeys() {
+    return getString(BUCKET_INDEX_HASH_FIELD);
+  }
+
+  public Integer getIndexNumBuckets() {
+    return getInt(BUCKET_INDEX_NUM_BUCKETS);
+  }
+
   public Map<String, String> propsMap() {
     return props.entrySet().stream()
         .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> 
String.valueOf(e.getValue())));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 16dd373486f..39f27f4160f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -743,6 +743,14 @@ public class HoodieTableMetaClient implements Serializable 
{
     private String metadataPartitions;
     private String inflightMetadataPartitions;
 
+    private Boolean allowOperationMetadataField;
+
+    private String indexType;
+
+    private String indexKeys;
+
+    private Integer bucketIndexNumBuckets;
+
     /**
      * Persist the configs that is written at the first time, and should not 
be changed.
      * Like KeyGenerator's configs.
@@ -876,6 +884,26 @@ public class HoodieTableMetaClient implements Serializable 
{
       return this;
     }
 
+    public PropertyBuilder setAllowOperationMetadataField(boolean 
allowOperationMetadataField) {
+      this.allowOperationMetadataField = allowOperationMetadataField;
+      return this;
+    }
+
+    public PropertyBuilder setIndexType(String indexType) {
+      this.indexType = indexType;
+      return this;
+    }
+
+    public PropertyBuilder setIndexKeys(String indexKeys) {
+      this.indexKeys = indexKeys;
+      return this;
+    }
+
+    public PropertyBuilder setIndexNumBuckets(Integer bucketIndexNumBuckets) {
+      this.bucketIndexNumBuckets = bucketIndexNumBuckets;
+      return this;
+    }
+
     private void set(String key, Object value) {
       if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
         this.others.put(key, value);
@@ -982,6 +1010,20 @@ public class HoodieTableMetaClient implements 
Serializable {
       if 
(hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
         
setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
       }
+      if 
(hoodieConfig.contains(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD)) {
+        setAllowOperationMetadataField(
+            
hoodieConfig.getBoolean(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD));
+      }
+      // At present, the main consideration is bucket index
+      if (hoodieConfig.contains(HoodieTableConfig.INDEX_TYPE)) {
+        setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE));
+      }
+      if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD)) {
+        
setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD));
+      }
+      if (hoodieConfig.contains(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS)) {
+        
setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS));
+      }
       return this;
     }
 
@@ -1072,6 +1114,19 @@ public class HoodieTableMetaClient implements 
Serializable {
       if (null != inflightMetadataPartitions) {
         
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, 
inflightMetadataPartitions);
       }
+      if (null != allowOperationMetadataField) {
+        tableConfig.setValue(
+            HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD, 
Boolean.toString(allowOperationMetadataField));
+      }
+      if (null != indexType) {
+        tableConfig.setValue(HoodieTableConfig.INDEX_TYPE, indexType);
+      }
+      if (null != indexKeys) {
+        tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, 
indexKeys);
+      }
+      if (null != bucketIndexNumBuckets) {
+        tableConfig.setValue(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS, 
Integer.toString(bucketIndexNumBuckets));
+      }
       return tableConfig.getProps();
     }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 0defefe2ea4..bf339d94bb2 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -134,4 +134,23 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     config = new HoodieTableConfig(fs, metaPath.toString(), null);
     assertEquals(6, config.getProps().size());
   }
+
+  @Test
+  public void testIndexProps() throws IOException {
+    Properties updatedProps = new Properties();
+    
updatedProps.setProperty(HoodieTableConfig.ALLOW_OPERATION_METADATA_FIELD.key(),
 "true");
+    updatedProps.setProperty(HoodieTableConfig.INDEX_TYPE.key(), "BUCKET");
+    updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD.key(), 
"uuid");
+    updatedProps.setProperty(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 
"8");
+    HoodieTableConfig.update(fs, metaPath, updatedProps);
+
+    assertTrue(fs.exists(cfgPath));
+    assertFalse(fs.exists(backupCfgPath));
+    HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), 
null);
+    assertEquals(10, config.getProps().size());
+    assertEquals(true, config.getAllowOperationMetadataField());
+    assertEquals("BUCKET", config.getIndexType());
+    assertEquals("uuid", config.getIndexKeys());
+    assertEquals(8, config.getIndexNumBuckets());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 895002770fe..bb41aafbfcf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -54,6 +54,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
 import org.apache.hudi.sink.transform.ChainedTransformer;
@@ -302,6 +303,12 @@ public class StreamerUtil {
       org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
     final String basePath = conf.getString(FlinkOptions.PATH);
     if (!tableExists(basePath, hadoopConf)) {
+      if 
(conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()))
 {
+        if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
+          conf.setString(FlinkOptions.INDEX_KEY_FIELD, 
conf.getString(FlinkOptions.RECORD_KEY_FIELD));
+        }
+      }
+
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.withPropertyBuilder()
           
.setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
           .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
@@ -316,6 +323,10 @@ public class StreamerUtil {
           
.setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))
           
.setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
           .setTimelineLayoutVersion(1)
+          
.setAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
+          .setIndexType(conf.getString(FlinkOptions.INDEX_TYPE))
+          .setIndexKeys(conf.getString(FlinkOptions.INDEX_KEY_FIELD))
+          
.setIndexNumBuckets(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS))
           .initTable(hadoopConf, basePath);
       LOG.info("Table initialized under base path {}", basePath);
       return metaClient;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 61cb7ef9614..b4e46b7f277 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -37,7 +37,7 @@ import org.apache.hudi.config.{HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
 import 
org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory,
 NonSortPartitionerWithRows}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
-import org.apache.hudi.index.SparkHoodieIndexFactory
+import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
 import org.apache.hudi.internal.DataSourceInternalWriterHelper
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -142,7 +142,13 @@ object HoodieSparkSqlWriter {
         val archiveLogFolder = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
         val recordKeyFields = 
hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
         val populateMetaFields = 
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
-        val useBaseFormatMetaFile = 
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
+        val useBaseFormatMetaFile = 
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)
+
+        if 
(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name))
 {
+          if 
(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD).isEmpty) {
+            hoodieConfig.setValue(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD, 
recordKeyFields)
+          }
+        }
 
         val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(tableType)
@@ -165,6 +171,9 @@ object HoodieSparkSqlWriter {
           .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
           
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
           
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
+          .setIndexType(hoodieConfig.getString(HoodieTableConfig.INDEX_TYPE))
+          
.setIndexKeys(hoodieConfig.getString(HoodieTableConfig.BUCKET_INDEX_HASH_FIELD))
+          
.setIndexNumBuckets(hoodieConfig.getInt(HoodieTableConfig.BUCKET_INDEX_NUM_BUCKETS))
           .initTable(sparkContext.hadoopConfiguration, path)
         tableConfig = tableMetaClient.getTableConfig
       }

Reply via email to