This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-8990
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 475b4f108e01e137dcac0fde994ac63ac42e1f15
Author: YueZhang <[email protected]>
AuthorDate: Mon Mar 17 20:02:24 2025 +0800

    finish hashing_config initial && related UT
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 15 ++++
 .../HoodieSimpleBucketIndexPartitionRule.java      | 29 +++++++
 .../hudi/index/bucket/SimpleBucketIndexUtils.java  | 81 ++++++++++++++++++
 .../model/PartitionBucketIndexHashingConfig.java   | 98 ++++++++++++++++++++++
 .../hudi/common/table/HoodieTableMetaClient.java   | 17 +++-
 .../apache/hudi/configuration/FlinkOptions.java    | 16 ++++
 .../apache/hudi/table/catalog/HoodieCatalog.java   |  3 +-
 .../hudi/table/catalog/HoodieCatalogUtil.java      | 11 +++
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  7 +-
 .../hudi/table/catalog/TestHoodieCatalog.java      | 36 ++++++++
 10 files changed, 308 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 48b30dbe6d0..626e894fd1a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndexPartitionRule;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.slf4j.Logger;
@@ -286,6 +287,20 @@ public class HoodieIndexConfig extends HoodieConfig {
       .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
           + "and each partition is divided to N buckets.");
 
+  public static final ConfigProperty<String> BUCKET_INDEX_PARTITION_RULE_TYPE 
= ConfigProperty
+      .key("hoodie.bucket.index.partition.rule.type")
+      .defaultValue(HoodieSimpleBucketIndexPartitionRule.REGEX.value)
+      .markAdvanced()
+      .withDocumentation("Rule parser for expressions when using partition 
level bucket index, default regex.");
+
+  public static final ConfigProperty<String> 
BUCKET_INDEX_PARTITION_EXPRESSIONS = ConfigProperty
+      .key("hoodie.bucket.index.partition.expressions")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("Users can use this parameter to specify expression 
and the corresponding bucket " +
+          "numbers (separated by commas).Multiple rules are separated by 
semicolons like " +
+          
"hoodie.bucket.index.partition.expressions=expression1,bucket-number1;expression2,bucket-number2");
+
   public static final ConfigProperty<String> BUCKET_INDEX_MAX_NUM_BUCKETS = 
ConfigProperty
       .key("hoodie.bucket.index.max.num.buckets")
       .noDefaultValue()
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndexPartitionRule.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndexPartitionRule.java
new file mode 100644
index 00000000000..8d86ed8080b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndexPartitionRule.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+public enum HoodieSimpleBucketIndexPartitionRule {
+  REGEX("regex");
+
+  public final String value;
+
+  HoodieSimpleBucketIndexPartitionRule(String rule) {
+    this.value = rule;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/SimpleBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/SimpleBucketIndexUtils.java
new file mode 100644
index 00000000000..74c8022c9aa
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/SimpleBucketIndexUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieInstantWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class SimpleBucketIndexUtils {
+  public static final String INITIAL_HASHING_CONFIG_INSTANT = 
HoodieTimeline.INIT_INSTANT_TS;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleBucketIndexUtils.class);
+
+  public static boolean initHashingConfig(HoodieTableMetaClient metaClient,
+                                              String expressions,
+                                              String rule,
+                                              int defaultBucketNumber,
+                                              String instant){
+    if (StringUtils.isNullOrEmpty(expressions)) {
+      return false;
+    }
+    String hashingInstant = StringUtils.isNullOrEmpty(instant) ? 
INITIAL_HASHING_CONFIG_INSTANT : instant;
+    HoodieStorage storage = metaClient.getStorage();
+    PartitionBucketIndexHashingConfig hashingConfig =
+        new PartitionBucketIndexHashingConfig(expressions, 
defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
hashingInstant);
+    StoragePath hashingConfigPath = new 
StoragePath(metaClient.getHashingMetadataConfigPath(), 
hashingConfig.getFilename());
+
+    try {
+      Option<byte []> content = 
Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8));
+      storage.createImmutableFileInPath(hashingConfigPath, 
content.map(HoodieInstantWriter::convertByteArrayToWriter));
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to initHashingConfig ", ioe);
+    }
+    return true;
+  }
+
+  public static Option<PartitionBucketIndexHashingConfig> 
loadHashingConfig(HoodieStorage storage,
+                                                                            
StoragePathInfo hashingConfig) {
+    if (hashingConfig == null) {
+      return Option.empty();
+    }
+    try (InputStream is = storage.open(hashingConfig.getPath())) {
+      byte[] content = FileIOUtils.readAsByteArray(is);
+      return Option.of(PartitionBucketIndexHashingConfig.fromBytes(content));
+    } catch (IOException e) {
+      LOG.error("Error when loading hashing config, for path: " + 
hashingConfig.getPath().getName(), e);
+      throw new HoodieIOException("Error while loading hashing config", e);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
new file mode 100644
index 00000000000..225629b52ff
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.JsonUtils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionBucketIndexHashingConfig implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionBucketIndexHashingConfig.class);
+  public static final String HASHING_CONFIG_FILE_SUFFIX = ".hashing_config";
+  public static final Integer CURRENT_VERSION = 1;
+  private final String expressions;
+  private final int defaultBucketNumber;
+  private final String rule;
+
+  private final int version;
+  private final String instant;
+
+  @JsonCreator
+  public PartitionBucketIndexHashingConfig(@JsonProperty("expressions") String 
expressions,
+                                           
@JsonProperty("defaultBucketNumber") int defaultBucketNumber,
+                                           @JsonProperty("rule") String rule,
+                                           @JsonProperty("version") int 
version,
+                                           @JsonProperty("instant") String 
instant) {
+    this.expressions = expressions;
+    this.defaultBucketNumber = defaultBucketNumber;
+    this.rule = rule;
+    this.version = version;
+    this.instant = instant;
+  }
+
+  public String getFilename() {
+    return instant + HASHING_CONFIG_FILE_SUFFIX;
+  }
+
+  public String toJsonString() throws IOException {
+    return 
JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
+  }
+
+  public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws 
Exception {
+    if (jsonStr == null || jsonStr.isEmpty()) {
+      // For empty commit file (no data or somethings bad happen).
+      return clazz.newInstance();
+    }
+    return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
+  }
+
+
+  public static PartitionBucketIndexHashingConfig fromBytes(byte[] bytes) 
throws IOException {
+    try {
+      return fromJsonString(new String(bytes, StandardCharsets.UTF_8), 
PartitionBucketIndexHashingConfig.class);
+    } catch (Exception e) {
+      throw new IOException("unable to load hashing config", e);
+    }
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  public String getRule() {
+    return rule;
+  }
+
+  public int getDefaultBucketNumber() {
+    return defaultBucketNumber;
+  }
+
+  public String getExpressions() {
+    return expressions;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 31b7df91d0b..d50e9f4b7cd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -111,6 +111,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String METADATA_STR = "metadata";
   public static final String METAFOLDER_NAME = ".hoodie";
   public static final String TIMELINEFOLDER_NAME = "timeline";
+  public static final String BUCKET_INDEX_METAFOLDER_NAME = ".bucket_index";
   public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + 
StoragePath.SEPARATOR + ".temp";
   public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + 
StoragePath.SEPARATOR + ".aux";
   public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = 
AUXILIARYFOLDER_NAME + StoragePath.SEPARATOR + ".bootstrap";
@@ -118,7 +119,13 @@ public class HoodieTableMetaClient implements Serializable 
{
   public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + 
StoragePath.SEPARATOR + ".heartbeat";
   public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + 
StoragePath.SEPARATOR + METADATA_STR;
   public static final String HASHING_METADATA_FOLDER_NAME =
-      ".bucket_index" + StoragePath.SEPARATOR + "consistent_hashing_metadata";
+      BUCKET_INDEX_METAFOLDER_NAME + StoragePath.SEPARATOR + 
"consistent_hashing_metadata";
+  public static final String PARTITION_BUCKET_INDEX_HASHING_FOLDER =
+      BUCKET_INDEX_METAFOLDER_NAME + StoragePath.SEPARATOR + 
"partition_bucket_index_meta";
+  public static final String BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER =
+      PARTITION_BUCKET_INDEX_HASHING_FOLDER + StoragePath.SEPARATOR + 
"configs";
+  public static final String BUCKET_INDEX_METAFOLDER_CONFIG_ARCHIVE_FOLDER =
+      PARTITION_BUCKET_INDEX_HASHING_FOLDER + StoragePath.SEPARATOR + 
"archive";
   public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = 
BOOTSTRAP_INDEX_ROOT_FOLDER_PATH
       + StoragePath.SEPARATOR + ".partitions";
   public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH =
@@ -361,6 +368,14 @@ public class HoodieTableMetaClient implements Serializable 
{
     return new StoragePath(metaPath, HASHING_METADATA_FOLDER_NAME).toString();
   }
 
+  /**
+   * Used for partition level bucket index to save hashing_config.
+   * @return
+   */
+  public String getHashingMetadataConfigPath() {
+    return new StoragePath(metaPath, 
BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER).toString();
+  }
+
   /**
    * @return Temp Folder path
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index d65c80f89e9..e1c85c123a7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -474,6 +474,22 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(4) // default 4 buckets per partition
       .withDescription("Hudi bucket number per partition. Only affected if 
using Hudi bucket index.");
 
+  @AdvancedConfig
+  public static final ConfigOption<String> BUCKET_INDEX_PARTITION_RULE = 
ConfigOptions
+      .key(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key())
+      .stringType()
+      
.defaultValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.defaultValue())
+      .withDescription("Rule parser for expressions when using partition level 
bucket index, default regex.");
+
+  @AdvancedConfig
+  public static final ConfigOption<String> BUCKET_INDEX_PARTITION_EXPRESSIONS 
= ConfigOptions
+      .key(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key())
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Users can use this parameter to specify expression and 
the corresponding bucket "
+          + "numbers (separated by commas).Multiple rules are separated by 
semicolons like "
+          + 
"hoodie.bucket.index.partition.expressions=expression1,bucket-number1;expression2,bucket-number2");
+
   public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
       .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
       .stringType()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index f9088b4096c..cb62dedc4c1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -352,12 +352,13 @@ public class HoodieCatalog extends AbstractCatalog {
     }
     conf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
     try {
-      StreamerUtil.initTableIfNotExists(conf);
+      HoodieTableMetaClient metaClient = 
StreamerUtil.initTableIfNotExists(conf);
       // prepare the non-table-options properties
       if (!StringUtils.isNullOrEmpty(resolvedTable.getComment())) {
         options.put(TableOptionProperties.COMMENT, resolvedTable.getComment());
       }
       TableOptionProperties.createProperties(tablePathStr, hadoopConf, 
options);
+      HoodieCatalogUtil.initPartitionBucketIndexMeta(metaClient, catalogTable);
     } catch (IOException e) {
       throw new CatalogException(String.format("Initialize table path %s 
exception.", tablePathStr), e);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index 99a23ded5e8..db1d8364ad3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.index.bucket.SimpleBucketIndexUtils;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.Type;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
@@ -301,4 +302,14 @@ public class HoodieCatalogUtil {
     }
     return true;
   }
+
+  public static boolean initPartitionBucketIndexMeta(HoodieTableMetaClient 
metaClient, CatalogBaseTable catalogTable) {
+    Map<String, String> options = catalogTable.getOptions();
+    String expressions = 
options.getOrDefault(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(), "");
+    String rule = 
options.getOrDefault(FlinkOptions.BUCKET_INDEX_PARTITION_RULE.key(), 
FlinkOptions.BUCKET_INDEX_PARTITION_RULE.defaultValue());
+    int bucketNumber = 
options.containsKey(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key()) ?
+        
Integer.valueOf(options.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key())) : 
FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.defaultValue();
+
+    return SimpleBucketIndexUtils.initHashingConfig(metaClient, expressions, 
rule, bucketNumber, null);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 17d59b873f9..e66a6745003 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -474,7 +474,8 @@ public class HoodieHiveCatalog extends AbstractCatalog {
       //create hive table
       client.createTable(hiveTable);
       //init hoodie metaClient
-      initTableIfNotExists(tablePath, (CatalogTable) table);
+      HoodieTableMetaClient metaClient = initTableIfNotExists(tablePath, 
(CatalogTable) table);
+      HoodieCatalogUtil.initPartitionBucketIndexMeta(metaClient, table);
     } catch (AlreadyExistsException e) {
       if (!ignoreIfExists) {
         throw new TableAlreadyExistException(getName(), tablePath, e);
@@ -485,7 +486,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     }
   }
 
-  private void initTableIfNotExists(ObjectPath tablePath, CatalogTable 
catalogTable) {
+  private HoodieTableMetaClient initTableIfNotExists(ObjectPath tablePath, 
CatalogTable catalogTable) {
     Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
     final String avroSchema = AvroSchemaConverter.convertToSchema(
         catalogTable.getSchema().toPersistedRowDataType().getLogicalType(),
@@ -527,7 +528,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     StreamerUtil.checkPreCombineKey(flinkConf, fields);
 
     try {
-      StreamerUtil.initTableIfNotExists(flinkConf, hiveConf);
+      return StreamerUtil.initTableIfNotExists(flinkConf, hiveConf);
     } catch (IOException e) {
       throw new HoodieCatalogException("Initialize table exception.", e);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index f6737128698..1e518c5ef45 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.catalog;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,10 +31,14 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.index.bucket.SimpleBucketIndexUtils;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -334,6 +339,37 @@ public class TestHoodieCatalog {
     assertEquals(keyGeneratorClassName, 
NonpartitionedAvroKeyGenerator.class.getName());
   }
 
+  @Test
+  void testCreateTableWithPartitionSimpleBucketIndex() throws 
TableAlreadyExistException, DatabaseNotExistException, IOException {
+    String rule = "regex";
+    String expressions = "\\d{4}-(06-(01|17|18)|11-(01|10|11)),256";
+    String defaultBucketNumber = "20";
+    ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+    
EXPECTED_CATALOG_TABLE.getOptions().put(FlinkOptions.BUCKET_INDEX_PARTITION_RULE.key(),
 rule);
+    
EXPECTED_CATALOG_TABLE.getOptions().put(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(),
 expressions);
+    
EXPECTED_CATALOG_TABLE.getOptions().put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(),
 defaultBucketNumber);
+    // test create table
+    catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+    // test table exist
+    assertTrue(catalog.tableExists(tablePath));
+
+    String tablePathStr = catalog.inferTablePath(catalogPathStr, tablePath);
+    Configuration flinkConf = TestConfigurations.getDefaultConf(tablePathStr);
+    HoodieTableMetaClient metaClient = HoodieTestUtils
+        .createMetaClient(
+            new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(flinkConf)), 
tablePathStr);
+    HoodieStorage storage = metaClient.getStorage();
+    StoragePath initialHashing_config =
+        new StoragePath(metaClient.getHashingMetadataConfigPath(), 
SimpleBucketIndexUtils.INITIAL_HASHING_CONFIG_INSTANT + 
PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX);
+    StoragePathInfo info = storage.getPathInfo(initialHashing_config);
+    Option<PartitionBucketIndexHashingConfig> hashing_config = 
SimpleBucketIndexUtils.loadHashingConfig(storage, info);
+    assertTrue(hashing_config.isPresent());
+    assertEquals(hashing_config.get().getDefaultBucketNumber(), 
Integer.parseInt(defaultBucketNumber));
+    assertEquals(hashing_config.get().getRule(), rule);
+    assertEquals(hashing_config.get().getExpressions(), expressions);
+  }
+
   @Test
   void testCreateTableWithoutPreCombineKey() {
     Map<String, String> options = getDefaultCatalogOption();

Reply via email to