irodriguezclaveria opened a new issue, #6389: URL: https://github.com/apache/paimon/issues/6389
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Motivation In dynamic bucket mode, when buckets fill up and the system runs out of available empty buckets, the assigner cannot detect that previously full buckets may have become available again after compaction or data deletion. This causes unnecessary creation of new buckets or bucket exhaustion, leading to poor bucket distribution and potential performance degradation. ### Solution Introduce an asynchronous bucket refresh mechanism that periodically scans the bucket table to discover newly available buckets. The refresh is triggered when the number of empty buckets drops below a configurable threshold (dynamic-bucket.empty-bucket-threshold) and respects a minimum time interval between refreshes (dynamic-bucket.refresh-interval) to avoid excessive overhead. The refresh runs asynchronously in a separate thread without blocking bucket assignment operations, ensuring that the system can continuously discover and reuse buckets that have become available due to compaction or data deletion. ### Anything else? This it would be an approach. In PartitionIndex assign method ``` // 1. is it a key that has appeared before if (hash2Bucket.containsKey(hash)) { return hash2Bucket.get(hash); } //new code if (shouldRefreshEmptyBuckets(maxBucketId, minEmptyBucketsBeforeAsyncCheck) && isReachedTheMinRefreshInterval(minRefreshInterval)) { refreshBucketsFromDisk(); } ``` New methods to check if we should execute the refresh ``` private boolean shouldRefreshEmptyBuckets( int maxBucketId, int minEmptyBucketsBeforeAsyncCheck) { return maxBucketId != -1 && minEmptyBucketsBeforeAsyncCheck != -1 && (nonFullBucketInformation.size() == maxBucketId - minEmptyBucketsBeforeAsyncCheck); } private boolean isReachedTheMinRefreshInterval(final Duration duration) { return Instant.now().isAfter(lastRefreshTime.plus(duration)); } ``` And the refresh method: ``` private void refreshBucketsFromDisk() { // Only start refresh if not already in progress if (refreshFuture == null || refreshFuture.isDone()) { refreshFuture = CompletableFuture.runAsync( () -> { try { List<IndexManifestEntry> files = indexFileHandler.scanEntries(HASH_INDEX, partition); Map<Integer, Long> tempBucketInfo = new HashMap<>(); for (IndexManifestEntry file : files) { long currentNumberOfRows = file.indexFile().rowCount(); if (currentNumberOfRows < targetBucketRowNumber) { tempBucketInfo.put(file.bucket(), currentNumberOfRows); } } //nonFullBucketInformation is a ConcurrentHashMap nonFullBucketInformation.putAll(tempBucketInfo); lastRefreshTime = Instant.now(); } catch (Exception e) { // Log error instead of throwing System.err.println( "Error refreshing buckets from disk: " + e.getMessage()); } }); } } ``` ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
