danny0405 commented on code in PR #13017: URL: https://github.com/apache/hudi/pull/13017#discussion_r2009518343
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); + protected int cacheSize = 100_000; + protected PartitionBucketIndexHashingConfig hashingConfig; + private int defaultBucketNumber; + private String instantToLoad; + // Cache for partition to bucket number mapping + protected Map<String, Integer> partitionToBucketCache = new SerializableLRUMap<>(cacheSize); + private RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + */ + private PartitionBucketIndexCalculator(String instantToLoad, Configuration hadoopConf, String basePath) { + this.instantToLoad = instantToLoad; + StoragePath hashingConfigPath = PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(hashingConfigPath, HadoopFSUtils.getStorageConf(hadoopConf))) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, HoodieTableMetaClient client) { + this.instantToLoad = instantToLoad; + String metaPath = client.getHashingMetadataConfigPath(); + StoragePath hashingConfigPath = new StoragePath(metaPath, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + + try (HoodieStorage storage = client.getStorage()) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, PartitionBucketIndexHashingConfig config) { + this.hashingConfig = config; + this.defaultBucketNumber = config.getDefaultBucketNumber(); + String expressions = config.getExpressions(); + String ruleType = config.getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + private void init(HoodieStorage storage, StoragePath hashingConfigPath) { + Option<PartitionBucketIndexHashingConfig> config = PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath); + ValidationUtils.checkArgument(config.isPresent()); Review Comment: Can we add some msg for the check. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); + protected int cacheSize = 100_000; + protected PartitionBucketIndexHashingConfig hashingConfig; + private int defaultBucketNumber; + private String instantToLoad; + // Cache for partition to bucket number mapping + protected Map<String, Integer> partitionToBucketCache = new SerializableLRUMap<>(cacheSize); + private RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + */ + private PartitionBucketIndexCalculator(String instantToLoad, Configuration hadoopConf, String basePath) { + this.instantToLoad = instantToLoad; + StoragePath hashingConfigPath = PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(hashingConfigPath, HadoopFSUtils.getStorageConf(hadoopConf))) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, HoodieTableMetaClient client) { + this.instantToLoad = instantToLoad; + String metaPath = client.getHashingMetadataConfigPath(); + StoragePath hashingConfigPath = new StoragePath(metaPath, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + + try (HoodieStorage storage = client.getStorage()) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, PartitionBucketIndexHashingConfig config) { + this.hashingConfig = config; + this.defaultBucketNumber = config.getDefaultBucketNumber(); + String expressions = config.getExpressions(); + String ruleType = config.getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + private void init(HoodieStorage storage, StoragePath hashingConfigPath) { + Option<PartitionBucketIndexHashingConfig> config = PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath); + ValidationUtils.checkArgument(config.isPresent()); + this.hashingConfig = config.get(); + this.defaultBucketNumber = config.get().getDefaultBucketNumber(); + String expressions = config.get().getExpressions(); + String ruleType = config.get().getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + /** + * Gets the singleton instance for the specified instantToLoad and configuration + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + * @return The singleton instance + */ + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, Configuration hadoopConf, String basePath) { Review Comment: PartitionNumBucketsCalculator ? Not sure why we need a hadoop config to access the file, an empty config should work here? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java: ########## @@ -152,14 +157,21 @@ protected Function<HoodieRecord, Option<HoodieRecordLocation>> getIndexLocationF private class SimpleBucketIndexLocationFunction implements Function<HoodieRecord, Option<HoodieRecordLocation>> { private final Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping; + private final boolean isPartitionBucketIndexEnable; + private PartitionBucketIndexCalculator calc; public SimpleBucketIndexLocationFunction(HoodieTable table, String partitionPath) { this.bucketIdToFileIdMapping = loadBucketIdToFileIdMappingForPartition(table, partitionPath); + String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); Review Comment: The method `#getHashingConfigInstantToLoad` should belong to writer config instead of table config. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); + protected int cacheSize = 100_000; + protected PartitionBucketIndexHashingConfig hashingConfig; + private int defaultBucketNumber; + private String instantToLoad; + // Cache for partition to bucket number mapping + protected Map<String, Integer> partitionToBucketCache = new SerializableLRUMap<>(cacheSize); + private RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + */ + private PartitionBucketIndexCalculator(String instantToLoad, Configuration hadoopConf, String basePath) { + this.instantToLoad = instantToLoad; + StoragePath hashingConfigPath = PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(hashingConfigPath, HadoopFSUtils.getStorageConf(hadoopConf))) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, HoodieTableMetaClient client) { + this.instantToLoad = instantToLoad; + String metaPath = client.getHashingMetadataConfigPath(); + StoragePath hashingConfigPath = new StoragePath(metaPath, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + + try (HoodieStorage storage = client.getStorage()) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, PartitionBucketIndexHashingConfig config) { + this.hashingConfig = config; + this.defaultBucketNumber = config.getDefaultBucketNumber(); + String expressions = config.getExpressions(); + String ruleType = config.getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + private void init(HoodieStorage storage, StoragePath hashingConfigPath) { + Option<PartitionBucketIndexHashingConfig> config = PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath); + ValidationUtils.checkArgument(config.isPresent()); + this.hashingConfig = config.get(); + this.defaultBucketNumber = config.get().getDefaultBucketNumber(); + String expressions = config.get().getExpressions(); + String ruleType = config.get().getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + /** + * Gets the singleton instance for the specified instantToLoad and configuration + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + * @return The singleton instance + */ + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, Configuration hadoopConf, String basePath) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); Review Comment: avoid to hard code the class name in the msg. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); + protected int cacheSize = 100_000; + protected PartitionBucketIndexHashingConfig hashingConfig; + private int defaultBucketNumber; + private String instantToLoad; + // Cache for partition to bucket number mapping + protected Map<String, Integer> partitionToBucketCache = new SerializableLRUMap<>(cacheSize); + private RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation Review Comment: Can we end each doc with `.`. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); Review Comment: do we need concurrent access here? can we use Caffeine cache instead with configured eviction strategy. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java: ########## @@ -95,15 +98,19 @@ public BucketStreamWriteFunction(Configuration config, RowType rowType) { @Override public void open(Configuration parameters) throws IOException { super.open(parameters); - this.bucketNum = config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); this.indexKeyFields = OptionsResolver.getIndexKeyField(config); this.isNonBlockingConcurrencyControl = OptionsResolver.isNonBlockingConcurrencyControl(config); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); this.bucketIndex = new HashMap<>(); this.incBucketIndex = new HashSet<>(); - this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(bucketNum, parallelism); + this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(parallelism); this.isInsertOverwrite = OptionsResolver.isInsertOverwrite(config); + String hashingInstant = config.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT); + this.isPartitionLevelBucketIndexEnable = !StringUtils.isNullOrEmpty(hashingInstant); + if (isPartitionLevelBucketIndexEnable) { + calc = PartitionBucketIndexCalculator.getInstance(hashingInstant, metaClient); Review Comment: The calc is everywhere, can we abstract a utility class named `NumBucketsFunction` to capsulate access of the regular buckets and the new one? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -333,6 +335,19 @@ class HoodieSparkSqlWriterInternal { .initTable(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration), path) } + // take care of partition level bucket index which is simple bucket index + if (parameters.contains(HoodieIndexConfig.INDEX_TYPE.key) + && parameters(HoodieIndexConfig.INDEX_TYPE.key) == HoodieIndex.IndexType.BUCKET.name + && (!parameters.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key) Review Comment: this is way to complicaed, can we abstract out some fine granular mehod here. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java: ########## @@ -90,4 +95,16 @@ public static void setupClientId(Configuration conf) { } } } + + public static void setupIndexConfigs(Configuration conf) { + HoodieIndex.BucketIndexEngineType engineType = OptionsResolver.getBucketEngineType(conf); + if (engineType.equals(HoodieIndex.BucketIndexEngineType.SIMPLE) Review Comment: define a utility method named `OptionsResolver#isPartitionLevelSimpleBucketIndex` ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { Review Comment: It looks more like a num buckets reader because the reader side access is embeded here. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java: ########## @@ -474,6 +474,28 @@ private FlinkOptions() { .defaultValue(4) // default 4 buckets per partition .withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index."); + @AdvancedConfig + public static final ConfigOption<String> BUCKET_INDEX_PARTITION_RULE = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key()) + .stringType() + .defaultValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.defaultValue()) + .withDescription("Rule parser for expressions when using partition level bucket index, default regex."); + + @AdvancedConfig + public static final ConfigOption<String> BUCKET_INDEX_PARTITION_EXPRESSIONS = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key()) + .stringType() + .noDefaultValue() + .withDescription("Users can use this parameter to specify expression and the corresponding bucket " + + "numbers (separated by commas).Multiple rules are separated by semicolons like " + + "hoodie.bucket.index.partition.expressions=expression1,bucket-number1;expression2,bucket-number2"); + + public static final ConfigOption<String> BUCKET_INDEX_PARTITION_LOAD_INSTANT = ConfigOptions + .key(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key()) Review Comment: get rid of this inner config stuff. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieInstantWriter; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class PartitionBucketIndexUtils { + public static final String INITIAL_HASHING_CONFIG_INSTANT = HoodieTimeline.INIT_INSTANT_TS; + + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexUtils.class); + + public static boolean isPartitionSimpleBucketIndex(Configuration conf, String basePath) { + return isPartitionSimpleBucketIndex(HadoopFSUtils.getStorageConf(conf), basePath); + } + + public static boolean isPartitionSimpleBucketIndex(StorageConfiguration conf, String basePath) { + StoragePath storagePath = getHashingConfigStorageFolder(basePath); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, conf)) { + return storage.exists(storagePath); + } catch (IOException e) { + throw new HoodieIOException("Failed to list PARTITION_BUCKET_INDEX_HASHING_FOLDER folder ", e); + } + } + + public static StoragePath getHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER); + } + + public static StoragePath getHashingConfigPath(String basePath, String instantToLoad) { + StoragePath hashingBase = getHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + + public static boolean initHashingConfig(HoodieTableMetaClient metaClient, + String expressions, + String rule, + int defaultBucketNumber, + String instant) { + if (StringUtils.isNullOrEmpty(expressions)) { + return false; + } + String hashingInstant = StringUtils.isNullOrEmpty(instant) ? INITIAL_HASHING_CONFIG_INSTANT : instant; + PartitionBucketIndexHashingConfig hashingConfig = + new PartitionBucketIndexHashingConfig(expressions, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, hashingInstant); + return saveHashingConfig(hashingConfig, metaClient); + } + + public static boolean saveHashingConfig(PartitionBucketIndexHashingConfig hashingConfig, HoodieTableMetaClient metaClient) { + StoragePath hashingConfigPath = new StoragePath(metaClient.getHashingMetadataConfigPath(), hashingConfig.getFilename()); + HoodieStorage storage = metaClient.getStorage(); + try { + Option<byte []> content = Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8)); + storage.createImmutableFileInPath(hashingConfigPath, content.map(HoodieInstantWriter::convertByteArrayToWriter)); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to initHashingConfig ", ioe); + } + return true; + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePathInfo hashingConfig) { + return loadHashingConfig(storage, hashingConfig.getPath()); + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePath hashingConfig) { + if (hashingConfig == null) { + return Option.empty(); + } + try (InputStream is = storage.open(hashingConfig)) { + byte[] content = FileIOUtils.readAsByteArray(is); + return Option.of(PartitionBucketIndexHashingConfig.fromBytes(content)); + } catch (IOException e) { + LOG.error("Error when loading hashing config, for path: " + hashingConfig.getName(), e); + throw new HoodieIOException("Error while loading hashing config", e); + } + } + + public static String getHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) { Review Comment: Can we move all the `PartitionBucketIndexHashingConfig` related utility methods into the class itself. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); + protected int cacheSize = 100_000; + protected PartitionBucketIndexHashingConfig hashingConfig; + private int defaultBucketNumber; + private String instantToLoad; + // Cache for partition to bucket number mapping + protected Map<String, Integer> partitionToBucketCache = new SerializableLRUMap<>(cacheSize); + private RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + */ + private PartitionBucketIndexCalculator(String instantToLoad, Configuration hadoopConf, String basePath) { + this.instantToLoad = instantToLoad; + StoragePath hashingConfigPath = PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(hashingConfigPath, HadoopFSUtils.getStorageConf(hadoopConf))) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, HoodieTableMetaClient client) { + this.instantToLoad = instantToLoad; + String metaPath = client.getHashingMetadataConfigPath(); + StoragePath hashingConfigPath = new StoragePath(metaPath, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + + try (HoodieStorage storage = client.getStorage()) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, PartitionBucketIndexHashingConfig config) { + this.hashingConfig = config; + this.defaultBucketNumber = config.getDefaultBucketNumber(); + String expressions = config.getExpressions(); + String ruleType = config.getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + private void init(HoodieStorage storage, StoragePath hashingConfigPath) { + Option<PartitionBucketIndexHashingConfig> config = PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath); + ValidationUtils.checkArgument(config.isPresent()); + this.hashingConfig = config.get(); + this.defaultBucketNumber = config.get().getDefaultBucketNumber(); + String expressions = config.get().getExpressions(); + String ruleType = config.get().getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + /** + * Gets the singleton instance for the specified instantToLoad and configuration + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + * @return The singleton instance + */ + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, Configuration hadoopConf, String basePath) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); + return new PartitionBucketIndexCalculator(key, hadoopConf, basePath); + }); + } + + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, HoodieTableMetaClient client) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); + return new PartitionBucketIndexCalculator(key, client); + }); + } + + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, PartitionBucketIndexHashingConfig config) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); + return new PartitionBucketIndexCalculator(key, config); + }); + } + + /** + * Computes the bucket number for a given partition path + * + * @param partitionPath The partition path + * @return The computed bucket number + */ + public int computeNumBuckets(String partitionPath) { + // Check cache first + Integer cachedBucketNumber = partitionToBucketCache.get(partitionPath); + if (cachedBucketNumber != null) { + return cachedBucketNumber; + } + + // Calculate bucket number using the rule engine + int bucketNumber = ruleEngine.calculateBucketNumber(partitionPath); + + // If no rule matched, use default bucket number + if (bucketNumber == -1) { + bucketNumber = defaultBucketNumber; + LOG.debug("No rule matched for partition: {}. Using default bucket number: {}", + partitionPath, defaultBucketNumber); + } + + // Update cache + partitionToBucketCache.put(partitionPath, bucketNumber); + + return bucketNumber; + } + + /** + * Gets the instant to load + * + * @return The instant to load + */ + public String getInstantToLoad() { + return instantToLoad; + } + + /** + * Gets the hashing configuration + * + * @return The hashing configuration + */ + public PartitionBucketIndexHashingConfig getHashingConfig() { + return hashingConfig; + } + + /** + * Clears the instance cache (useful for testing or memory management) + */ + public static void clearInstanceCache() { + INSTANCES.clear(); + } + + public int getCacheSize() { + return partitionToBucketCache.size(); + } + + public Map<String, Integer> getPartitionToBucket() { + return new HashMap<>(partitionToBucketCache); + } + + public void clearCache() { + partitionToBucketCache.clear(); + LOG.info("Cleared partition to bucket number cache"); + } + + // ------------------------------------------------------------------------------------------------------- + + /** + * Interface for rule engines that calculate bucket numbers + */ + private interface RuleEngine extends Serializable { + /** + * Calculate bucket number for a partition path + * @param partitionPath The partition path + * @return The calculated bucket number, or -1 if no rule matches + */ + int calculateBucketNumber(String partitionPath); + } + + /** + * Factory method to create the appropriate rule engine based on rule type + */ + private static PartitionBucketIndexCalculator.RuleEngine createRuleEngine(String ruleType, String expressions) { + switch (PartitionBucketIndexRule.valueOf(ruleType.toUpperCase())) { + case REGEX: + return new PartitionBucketIndexCalculator.RegexRuleEngine(expressions); + default: + LOG.error("Unsupported rule type: {}.", ruleType); + throw new UnsupportedOperationException("Unsupported rule type " + ruleType); + } + } + + /** + * Regex-based rule engine implementation + */ + private static class RegexRuleEngine implements PartitionBucketIndexCalculator.RuleEngine { + private static final long serialVersionUID = 1L; + private final List<RegexRule> rules = new ArrayList<>(); + + /** + * Represents a single regex rule with its pattern and bucket number + */ + private static class RegexRule implements Serializable { Review Comment: Can we avoid nested inner class? ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java: ########## @@ -352,12 +352,13 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable catalogTable, boo } conf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName()); try { - StreamerUtil.initTableIfNotExists(conf); + HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf); // prepare the non-table-options properties if (!StringUtils.isNullOrEmpty(resolvedTable.getComment())) { options.put(TableOptionProperties.COMMENT, resolvedTable.getComment()); } TableOptionProperties.createProperties(tablePathStr, hadoopConf, options); + HoodieCatalogUtil.initPartitionBucketIndexMeta(metaClient, catalogTable); Review Comment: Not sure how the consistent hashing meta is initialized, can we follow that? ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java: ########## @@ -90,4 +95,16 @@ public static void setupClientId(Configuration conf) { } } } + + public static void setupIndexConfigs(Configuration conf) { + HoodieIndex.BucketIndexEngineType engineType = OptionsResolver.getBucketEngineType(conf); + if (engineType.equals(HoodieIndex.BucketIndexEngineType.SIMPLE) Review Comment: and do we need to set up the option on start-up? ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala: ########## @@ -21,16 +21,27 @@ package org.apache.spark.sql import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.Functions import org.apache.hudi.common.util.hash.BucketIndexUtil -import org.apache.hudi.index.bucket.BucketIdentifier +import org.apache.hudi.index.bucket.{BucketIdentifier, PartitionBucketIndexCalculator} import org.apache.spark.Partitioner import org.apache.spark.sql.catalyst.InternalRow object BucketPartitionUtils { - def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int, partitionNum: Int): DataFrame = { + def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int, partitionNum: Int, Review Comment: create a new method instead. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java: ########## @@ -33,22 +38,36 @@ */ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> { - private final int bucketNum; + private final Configuration conf; private final String indexKeyFields; + private final String hashingInstantToLoad; + private StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf; - private Functions.Function2<String, Integer, Integer> partitionIndexFunc; + private Functions.Function3<Integer, String, Integer, Integer> partitionIndexFunc; - public BucketIndexPartitioner(int bucketNum, String indexKeyFields) { - this.bucketNum = bucketNum; + public BucketIndexPartitioner(Configuration conf, String indexKeyFields, + StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) { + this.conf = conf; this.indexKeyFields = indexKeyFields; + this.hashingInstantToLoad = conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT); + this.storageConf = storageConf; } @Override public int partition(HoodieKey key, int numPartitions) { if (this.partitionIndexFunc == null) { - this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(bucketNum, numPartitions); + this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(numPartitions); + } + + int bucketNum; + if (!StringUtils.isNullOrEmpty(hashingInstantToLoad)) { + org.apache.hadoop.conf.Configuration hadoopConf = storageConf.unwrapAs(org.apache.hadoop.conf.Configuration.class); + PartitionBucketIndexCalculator calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, hadoopConf, conf.get(FlinkOptions.PATH)); + bucketNum = calc.computeNumBuckets(key.getPartitionPath()); + } else { + bucketNum = conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); } int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), indexKeyFields, bucketNum); - return this.partitionIndexFunc.apply(key.getPartitionPath(), curBucket); + return this.partitionIndexFunc.apply(bucketNum, key.getPartitionPath(), curBucket); Review Comment: Can we fire a new partioner or we add a func `NumBucketsFunction`. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java: ########## @@ -31,15 +35,29 @@ public class BucketIndexBulkInsertPartitionerWithRows implements BulkInsertParti private final String indexKeyFields; private final int bucketNum; + private boolean isPartitionBucketIndexEnable = false; + private PartitionBucketIndexCalculator calc; - public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int bucketNum) { + public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int bucketNum, HoodieTable table) { this.indexKeyFields = indexKeyFields; this.bucketNum = bucketNum; + String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); Review Comment: Like avobe-mentioned, we should bind the index config with table config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
