zhangyue19921010 commented on code in PR #13017: URL: https://github.com/apache/hudi/pull/13017#discussion_r2014112295
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/PartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket.partition; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * A singleton implementation of PartitionBucketIndexCalculator that ensures only one instance + * exists for each unique hashingInstantToLoad value. + */ +public class PartitionBucketIndexCalculator implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexCalculator.class); + // Map to store singleton instances for each instantToLoad + configuration hash combination + private static final HashMap<String, PartitionBucketIndexCalculator> INSTANCES = new HashMap<>(); + private static final int CACHE_SIZE = 100_000; + private final int defaultBucketNumber; + // Cache for partition to bucket number mapping + private static final Cache<String, Integer> PARTITION_TO_BUCKET_CACHE = Caffeine.newBuilder().maximumSize(CACHE_SIZE).build(); + private final RuleEngine ruleEngine; + + /** + * Private constructor to prevent direct instantiation. + */ + private PartitionBucketIndexCalculator(String expressions, String ruleType, int defaultBucketNumber) { + this.defaultBucketNumber = defaultBucketNumber; + this.ruleEngine = createRuleEngine(ruleType, expressions); + } + + /** + * Gets the singleton instance for the specified expressions. + */ + public static PartitionBucketIndexCalculator getInstance(String expressions, String rule, int defaultBucketNumber) { + return INSTANCES.computeIfAbsent(expressions, + key -> { + LOG.info("Creating new {} instance for instantToLoad: {}", PartitionBucketIndexCalculator.class, key); + return new PartitionBucketIndexCalculator(expressions, rule, defaultBucketNumber); + }); + } + + /** + * Computes the bucket number for a given partition path. + * + * @param partitionPath The partition path. + * @return The computed bucket number. + */ + public int computeNumBuckets(String partitionPath) { Review Comment: IMO introducing additional overhead to enforce thread safety here is unnecessary. Since computeNumBuckets deterministically calculates the corresponding number of buckets based on fixed expressions and fixed partition paths, the result remains consistent regardless of thread safety. The only impact of thread-unsafe execution would be potential duplicate calculations (reducing cache hit rates), but the final cached value will eventually stabilize to the correct result ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/PartitionBucketIndexUtils.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket.partition; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class PartitionBucketIndexUtils { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexUtils.class); + + public static boolean isPartitionSimpleBucketIndex(Configuration conf, String basePath) { + return isPartitionSimpleBucketIndex(HadoopFSUtils.getStorageConf(conf), basePath); + } + + public static boolean isPartitionSimpleBucketIndex(StorageConfiguration conf, String basePath) { + StoragePath storagePath = PartitionBucketIndexHashingConfig.getHashingConfigStorageFolder(basePath); + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, conf)) { + return storage.exists(storagePath); + } catch (IOException e) { + throw new HoodieIOException("Failed to list PARTITION_BUCKET_INDEX_HASHING_FOLDER folder ", e); + } + } + + public static Map<String, Integer> getAllBucketNumbers(PartitionBucketIndexCalculator calc, List<String> partitions) { + for (String partition : partitions) { + calc.computeNumBuckets(partition); + } + return calc.getPartitionToBucket(); + } + + /** + * Used for test. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
