This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-8990 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 475b4f108e01e137dcac0fde994ac63ac42e1f15 Author: YueZhang <[email protected]> AuthorDate: Mon Mar 17 20:02:24 2025 +0800 finish hashing_config initial && related UT --- .../org/apache/hudi/config/HoodieIndexConfig.java | 15 ++++ .../HoodieSimpleBucketIndexPartitionRule.java | 29 +++++++ .../hudi/index/bucket/SimpleBucketIndexUtils.java | 81 ++++++++++++++++++ .../model/PartitionBucketIndexHashingConfig.java | 98 ++++++++++++++++++++++ .../hudi/common/table/HoodieTableMetaClient.java | 17 +++- .../apache/hudi/configuration/FlinkOptions.java | 16 ++++ .../apache/hudi/table/catalog/HoodieCatalog.java | 3 +- .../hudi/table/catalog/HoodieCatalogUtil.java | 11 +++ .../hudi/table/catalog/HoodieHiveCatalog.java | 7 +- .../hudi/table/catalog/TestHoodieCatalog.java | 36 ++++++++ 10 files changed, 308 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 48b30dbe6d0..626e894fd1a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.bucket.HoodieSimpleBucketIndexPartitionRule; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.slf4j.Logger; @@ -286,6 +287,20 @@ public class HoodieIndexConfig extends HoodieConfig { .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + "and each partition is divided to N buckets."); + public static final ConfigProperty<String> BUCKET_INDEX_PARTITION_RULE_TYPE = ConfigProperty + .key("hoodie.bucket.index.partition.rule.type") + .defaultValue(HoodieSimpleBucketIndexPartitionRule.REGEX.value) + .markAdvanced() + .withDocumentation("Rule parser for expressions when using partition level bucket index, default regex."); + + public static final ConfigProperty<String> BUCKET_INDEX_PARTITION_EXPRESSIONS = ConfigProperty + .key("hoodie.bucket.index.partition.expressions") + .noDefaultValue() + .markAdvanced() + .withDocumentation("Users can use this parameter to specify expression and the corresponding bucket " + + "numbers (separated by commas).Multiple rules are separated by semicolons like " + + "hoodie.bucket.index.partition.expressions=expression1,bucket-number1;expression2,bucket-number2"); + public static final ConfigProperty<String> BUCKET_INDEX_MAX_NUM_BUCKETS = ConfigProperty .key("hoodie.bucket.index.max.num.buckets") .noDefaultValue() diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndexPartitionRule.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndexPartitionRule.java new file mode 100644 index 00000000000..8d86ed8080b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndexPartitionRule.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +public enum HoodieSimpleBucketIndexPartitionRule { + REGEX("regex"); + + public final String value; + + HoodieSimpleBucketIndexPartitionRule(String rule) { + this.value = rule; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/SimpleBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/SimpleBucketIndexUtils.java new file mode 100644 index 00000000000..74c8022c9aa --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/SimpleBucketIndexUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieInstantWriter; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class SimpleBucketIndexUtils { + public static final String INITIAL_HASHING_CONFIG_INSTANT = HoodieTimeline.INIT_INSTANT_TS; + + private static final Logger LOG = LoggerFactory.getLogger(SimpleBucketIndexUtils.class); + + public static boolean initHashingConfig(HoodieTableMetaClient metaClient, + String expressions, + String rule, + int defaultBucketNumber, + String instant){ + if (StringUtils.isNullOrEmpty(expressions)) { + return false; + } + String hashingInstant = StringUtils.isNullOrEmpty(instant) ? INITIAL_HASHING_CONFIG_INSTANT : instant; + HoodieStorage storage = metaClient.getStorage(); + PartitionBucketIndexHashingConfig hashingConfig = + new PartitionBucketIndexHashingConfig(expressions, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, hashingInstant); + StoragePath hashingConfigPath = new StoragePath(metaClient.getHashingMetadataConfigPath(), hashingConfig.getFilename()); + + try { + Option<byte []> content = Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8)); + storage.createImmutableFileInPath(hashingConfigPath, content.map(HoodieInstantWriter::convertByteArrayToWriter)); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to initHashingConfig ", ioe); + } + return true; + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, + StoragePathInfo hashingConfig) { + if (hashingConfig == null) { + return Option.empty(); + } + try (InputStream is = storage.open(hashingConfig.getPath())) { + byte[] content = FileIOUtils.readAsByteArray(is); + return Option.of(PartitionBucketIndexHashingConfig.fromBytes(content)); + } catch (IOException e) { + LOG.error("Error when loading hashing config, for path: " + hashingConfig.getPath().getName(), e); + throw new HoodieIOException("Error while loading hashing config", e); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java new file mode 100644 index 00000000000..225629b52ff --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.JsonUtils; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class PartitionBucketIndexHashingConfig implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexHashingConfig.class); + public static final String HASHING_CONFIG_FILE_SUFFIX = ".hashing_config"; + public static final Integer CURRENT_VERSION = 1; + private final String expressions; + private final int defaultBucketNumber; + private final String rule; + + private final int version; + private final String instant; + + @JsonCreator + public PartitionBucketIndexHashingConfig(@JsonProperty("expressions") String expressions, + @JsonProperty("defaultBucketNumber") int defaultBucketNumber, + @JsonProperty("rule") String rule, + @JsonProperty("version") int version, + @JsonProperty("instant") String instant) { + this.expressions = expressions; + this.defaultBucketNumber = defaultBucketNumber; + this.rule = rule; + this.version = version; + this.instant = instant; + } + + public String getFilename() { + return instant + HASHING_CONFIG_FILE_SUFFIX; + } + + public String toJsonString() throws IOException { + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or somethings bad happen). + return clazz.newInstance(); + } + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); + } + + + public static PartitionBucketIndexHashingConfig fromBytes(byte[] bytes) throws IOException { + try { + return fromJsonString(new String(bytes, StandardCharsets.UTF_8), PartitionBucketIndexHashingConfig.class); + } catch (Exception e) { + throw new IOException("unable to load hashing config", e); + } + } + + public int getVersion() { + return version; + } + + public String getRule() { + return rule; + } + + public int getDefaultBucketNumber() { + return defaultBucketNumber; + } + + public String getExpressions() { + return expressions; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 31b7df91d0b..d50e9f4b7cd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -111,6 +111,7 @@ public class HoodieTableMetaClient implements Serializable { public static final String METADATA_STR = "metadata"; public static final String METAFOLDER_NAME = ".hoodie"; public static final String TIMELINEFOLDER_NAME = "timeline"; + public static final String BUCKET_INDEX_METAFOLDER_NAME = ".bucket_index"; public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + StoragePath.SEPARATOR + ".temp"; public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + StoragePath.SEPARATOR + ".aux"; public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + StoragePath.SEPARATOR + ".bootstrap"; @@ -118,7 +119,13 @@ public class HoodieTableMetaClient implements Serializable { public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + StoragePath.SEPARATOR + ".heartbeat"; public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + StoragePath.SEPARATOR + METADATA_STR; public static final String HASHING_METADATA_FOLDER_NAME = - ".bucket_index" + StoragePath.SEPARATOR + "consistent_hashing_metadata"; + BUCKET_INDEX_METAFOLDER_NAME + StoragePath.SEPARATOR + "consistent_hashing_metadata"; + public static final String PARTITION_BUCKET_INDEX_HASHING_FOLDER = + BUCKET_INDEX_METAFOLDER_NAME + StoragePath.SEPARATOR + "partition_bucket_index_meta"; + public static final String BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER = + PARTITION_BUCKET_INDEX_HASHING_FOLDER + StoragePath.SEPARATOR + "configs"; + public static final String BUCKET_INDEX_METAFOLDER_CONFIG_ARCHIVE_FOLDER = + PARTITION_BUCKET_INDEX_HASHING_FOLDER + StoragePath.SEPARATOR + "archive"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + StoragePath.SEPARATOR + ".partitions"; public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = @@ -361,6 +368,14 @@ public class HoodieTableMetaClient implements Serializable { return new StoragePath(metaPath, HASHING_METADATA_FOLDER_NAME).toString(); } + /** + * Used for partition level bucket index to save hashing_config. + * @return + */ + public String getHashingMetadataConfigPath() { + return new StoragePath(metaPath, BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER).toString(); + } + /** * @return Temp Folder path */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index d65c80f89e9..e1c85c123a7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -474,6 +474,22 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(4) // default 4 buckets per partition .withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index."); + @AdvancedConfig + public static final ConfigOption<String> BUCKET_INDEX_PARTITION_RULE = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key()) + .stringType() + .defaultValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.defaultValue()) + .withDescription("Rule parser for expressions when using partition level bucket index, default regex."); + + @AdvancedConfig + public static final ConfigOption<String> BUCKET_INDEX_PARTITION_EXPRESSIONS = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key()) + .stringType() + .noDefaultValue() + .withDescription("Users can use this parameter to specify expression and the corresponding bucket " + + "numbers (separated by commas).Multiple rules are separated by semicolons like " + + "hoodie.bucket.index.partition.expressions=expression1,bucket-number1;expression2,bucket-number2"); + public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) .stringType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index f9088b4096c..cb62dedc4c1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -352,12 +352,13 @@ public class HoodieCatalog extends AbstractCatalog { } conf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName()); try { - StreamerUtil.initTableIfNotExists(conf); + HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf); // prepare the non-table-options properties if (!StringUtils.isNullOrEmpty(resolvedTable.getComment())) { options.put(TableOptionProperties.COMMENT, resolvedTable.getComment()); } TableOptionProperties.createProperties(tablePathStr, hadoopConf, options); + HoodieCatalogUtil.initPartitionBucketIndexMeta(metaClient, catalogTable); } catch (IOException e) { throw new CatalogException(String.format("Initialize table path %s exception.", tablePathStr), e); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java index 99a23ded5e8..db1d8364ad3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieCatalogException; +import org.apache.hudi.index.bucket.SimpleBucketIndexUtils; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.Type; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; @@ -301,4 +302,14 @@ public class HoodieCatalogUtil { } return true; } + + public static boolean initPartitionBucketIndexMeta(HoodieTableMetaClient metaClient, CatalogBaseTable catalogTable) { + Map<String, String> options = catalogTable.getOptions(); + String expressions = options.getOrDefault(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(), ""); + String rule = options.getOrDefault(FlinkOptions.BUCKET_INDEX_PARTITION_RULE.key(), FlinkOptions.BUCKET_INDEX_PARTITION_RULE.defaultValue()); + int bucketNumber = options.containsKey(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key()) ? + Integer.valueOf(options.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key())) : FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.defaultValue(); + + return SimpleBucketIndexUtils.initHashingConfig(metaClient, expressions, rule, bucketNumber, null); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 17d59b873f9..e66a6745003 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -474,7 +474,8 @@ public class HoodieHiveCatalog extends AbstractCatalog { //create hive table client.createTable(hiveTable); //init hoodie metaClient - initTableIfNotExists(tablePath, (CatalogTable) table); + HoodieTableMetaClient metaClient = initTableIfNotExists(tablePath, (CatalogTable) table); + HoodieCatalogUtil.initPartitionBucketIndexMeta(metaClient, table); } catch (AlreadyExistsException e) { if (!ignoreIfExists) { throw new TableAlreadyExistException(getName(), tablePath, e); @@ -485,7 +486,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { } } - private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) { + private HoodieTableMetaClient initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) { Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions()); final String avroSchema = AvroSchemaConverter.convertToSchema( catalogTable.getSchema().toPersistedRowDataType().getLogicalType(), @@ -527,7 +528,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { StreamerUtil.checkPreCombineKey(flinkConf, fields); try { - StreamerUtil.initTableIfNotExists(flinkConf, hiveConf); + return StreamerUtil.initTableIfNotExists(flinkConf, hiveConf); } catch (IOException e) { throw new HoodieCatalogException("Initialize table exception.", e); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java index f6737128698..1e518c5ef45 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.catalog; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -30,10 +31,14 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.index.bucket.SimpleBucketIndexUtils; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.sink.partitioner.profile.WriteProfiles; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -334,6 +339,37 @@ public class TestHoodieCatalog { assertEquals(keyGeneratorClassName, NonpartitionedAvroKeyGenerator.class.getName()); } + @Test + void testCreateTableWithPartitionSimpleBucketIndex() throws TableAlreadyExistException, DatabaseNotExistException, IOException { + String rule = "regex"; + String expressions = "\\d{4}-(06-(01|17|18)|11-(01|10|11)),256"; + String defaultBucketNumber = "20"; + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + EXPECTED_CATALOG_TABLE.getOptions().put(FlinkOptions.BUCKET_INDEX_PARTITION_RULE.key(), rule); + EXPECTED_CATALOG_TABLE.getOptions().put(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(), expressions); + EXPECTED_CATALOG_TABLE.getOptions().put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), defaultBucketNumber); + // test create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + // test table exist + assertTrue(catalog.tableExists(tablePath)); + + String tablePathStr = catalog.inferTablePath(catalogPathStr, tablePath); + Configuration flinkConf = TestConfigurations.getDefaultConf(tablePathStr); + HoodieTableMetaClient metaClient = HoodieTestUtils + .createMetaClient( + new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(flinkConf)), tablePathStr); + HoodieStorage storage = metaClient.getStorage(); + StoragePath initialHashing_config = + new StoragePath(metaClient.getHashingMetadataConfigPath(), SimpleBucketIndexUtils.INITIAL_HASHING_CONFIG_INSTANT + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + StoragePathInfo info = storage.getPathInfo(initialHashing_config); + Option<PartitionBucketIndexHashingConfig> hashing_config = SimpleBucketIndexUtils.loadHashingConfig(storage, info); + assertTrue(hashing_config.isPresent()); + assertEquals(hashing_config.get().getDefaultBucketNumber(), Integer.parseInt(defaultBucketNumber)); + assertEquals(hashing_config.get().getRule(), rule); + assertEquals(hashing_config.get().getExpressions(), expressions); + } + @Test void testCreateTableWithoutPreCombineKey() { Map<String, String> options = getDefaultCatalogOption();
