zhangyue19921010 commented on code in PR #13017: URL: https://github.com/apache/hudi/pull/13017#discussion_r2011355530
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); + protected int cacheSize = 100_000; + protected PartitionBucketIndexHashingConfig hashingConfig; + private int defaultBucketNumber; + private String instantToLoad; + // Cache for partition to bucket number mapping + protected Map<String, Integer> partitionToBucketCache = new SerializableLRUMap<>(cacheSize); + private RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + */ + private PartitionBucketIndexCalculator(String instantToLoad, Configuration hadoopConf, String basePath) { + this.instantToLoad = instantToLoad; + StoragePath hashingConfigPath = PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(hashingConfigPath, HadoopFSUtils.getStorageConf(hadoopConf))) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, HoodieTableMetaClient client) { + this.instantToLoad = instantToLoad; + String metaPath = client.getHashingMetadataConfigPath(); + StoragePath hashingConfigPath = new StoragePath(metaPath, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + + try (HoodieStorage storage = client.getStorage()) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, PartitionBucketIndexHashingConfig config) { + this.hashingConfig = config; + this.defaultBucketNumber = config.getDefaultBucketNumber(); + String expressions = config.getExpressions(); + String ruleType = config.getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + private void init(HoodieStorage storage, StoragePath hashingConfigPath) { + Option<PartitionBucketIndexHashingConfig> config = PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath); + ValidationUtils.checkArgument(config.isPresent()); + this.hashingConfig = config.get(); + this.defaultBucketNumber = config.get().getDefaultBucketNumber(); + String expressions = config.get().getExpressions(); + String ruleType = config.get().getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + /** + * Gets the singleton instance for the specified instantToLoad and configuration + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + * @return The singleton instance + */ + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, Configuration hadoopConf, String basePath) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); Review Comment: done. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final ConcurrentMap<String, PartitionBucketIndexCalculator> INSTANCES = new ConcurrentHashMap<>(); + protected int cacheSize = 100_000; + protected PartitionBucketIndexHashingConfig hashingConfig; + private int defaultBucketNumber; + private String instantToLoad; + // Cache for partition to bucket number mapping + protected Map<String, Integer> partitionToBucketCache = new SerializableLRUMap<>(cacheSize); + private RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + */ + private PartitionBucketIndexCalculator(String instantToLoad, Configuration hadoopConf, String basePath) { + this.instantToLoad = instantToLoad; + StoragePath hashingConfigPath = PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(hashingConfigPath, HadoopFSUtils.getStorageConf(hadoopConf))) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, HoodieTableMetaClient client) { + this.instantToLoad = instantToLoad; + String metaPath = client.getHashingMetadataConfigPath(); + StoragePath hashingConfigPath = new StoragePath(metaPath, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + + try (HoodieStorage storage = client.getStorage()) { + init(storage, hashingConfigPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize PartitionBucketIndexCalculator ", e); + } + } + + private PartitionBucketIndexCalculator(String instantToLoad, PartitionBucketIndexHashingConfig config) { + this.hashingConfig = config; + this.defaultBucketNumber = config.getDefaultBucketNumber(); + String expressions = config.getExpressions(); + String ruleType = config.getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + private void init(HoodieStorage storage, StoragePath hashingConfigPath) { + Option<PartitionBucketIndexHashingConfig> config = PartitionBucketIndexUtils.loadHashingConfig(storage, hashingConfigPath); + ValidationUtils.checkArgument(config.isPresent()); + this.hashingConfig = config.get(); + this.defaultBucketNumber = config.get().getDefaultBucketNumber(); + String expressions = config.get().getExpressions(); + String ruleType = config.get().getRule(); + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + /** + * Gets the singleton instance for the specified instantToLoad and configuration + * + * @param instantToLoad The instant to load + * @param hadoopConf The Hadoop configuration + * @return The singleton instance + */ + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, Configuration hadoopConf, String basePath) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); + return new PartitionBucketIndexCalculator(key, hadoopConf, basePath); + }); + } + + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, HoodieTableMetaClient client) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); + return new PartitionBucketIndexCalculator(key, client); + }); + } + + public static PartitionBucketIndexCalculator getInstance(String instantToLoad, PartitionBucketIndexHashingConfig config) { + // Using instantToLoad as the key for the cache + return INSTANCES.computeIfAbsent(instantToLoad, + key -> { + LOG.info("Creating new PartitionBucketIndexCalculator instance for instantToLoad: {}", key); + return new PartitionBucketIndexCalculator(key, config); + }); + } + + /** + * Computes the bucket number for a given partition path + * + * @param partitionPath The partition path + * @return The computed bucket number + */ + public int computeNumBuckets(String partitionPath) { + // Check cache first + Integer cachedBucketNumber = partitionToBucketCache.get(partitionPath); + if (cachedBucketNumber != null) { + return cachedBucketNumber; + } + + // Calculate bucket number using the rule engine + int bucketNumber = ruleEngine.calculateBucketNumber(partitionPath); + + // If no rule matched, use default bucket number + if (bucketNumber == -1) { + bucketNumber = defaultBucketNumber; + LOG.debug("No rule matched for partition: {}. Using default bucket number: {}", + partitionPath, defaultBucketNumber); + } + + // Update cache + partitionToBucketCache.put(partitionPath, bucketNumber); + + return bucketNumber; + } + + /** + * Gets the instant to load + * + * @return The instant to load + */ + public String getInstantToLoad() { + return instantToLoad; + } + + /** + * Gets the hashing configuration + * + * @return The hashing configuration + */ + public PartitionBucketIndexHashingConfig getHashingConfig() { + return hashingConfig; + } + + /** + * Clears the instance cache (useful for testing or memory management) + */ + public static void clearInstanceCache() { + INSTANCES.clear(); + } + + public int getCacheSize() { + return partitionToBucketCache.size(); + } + + public Map<String, Integer> getPartitionToBucket() { + return new HashMap<>(partitionToBucketCache); + } + + public void clearCache() { + partitionToBucketCache.clear(); + LOG.info("Cleared partition to bucket number cache"); + } + + // ------------------------------------------------------------------------------------------------------- + + /** + * Interface for rule engines that calculate bucket numbers + */ + private interface RuleEngine extends Serializable { + /** + * Calculate bucket number for a partition path + * @param partitionPath The partition path + * @return The calculated bucket number, or -1 if no rule matches + */ + int calculateBucketNumber(String partitionPath); + } + + /** + * Factory method to create the appropriate rule engine based on rule type + */ + private static PartitionBucketIndexCalculator.RuleEngine createRuleEngine(String ruleType, String expressions) { + switch (PartitionBucketIndexRule.valueOf(ruleType.toUpperCase())) { + case REGEX: + return new PartitionBucketIndexCalculator.RegexRuleEngine(expressions); + default: + LOG.error("Unsupported rule type: {}.", ruleType); + throw new UnsupportedOperationException("Unsupported rule type " + ruleType); + } + } + + /** + * Regex-based rule engine implementation + */ + private static class RegexRuleEngine implements PartitionBucketIndexCalculator.RuleEngine { + private static final long serialVersionUID = 1L; + private final List<RegexRule> rules = new ArrayList<>(); + + /** + * Represents a single regex rule with its pattern and bucket number + */ + private static class RegexRule implements Serializable { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
