irodriguezclaveria opened a new issue, #6389:
URL: https://github.com/apache/paimon/issues/6389

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   In dynamic bucket mode, when buckets fill up and the system runs out of 
available empty buckets, the assigner cannot detect that previously full 
buckets may have become available again after compaction or data deletion. This 
causes unnecessary creation of new buckets or bucket exhaustion, leading to 
poor bucket distribution and potential performance degradation.
   
   ### Solution
   
   Introduce an asynchronous bucket refresh mechanism that periodically scans 
the bucket table to discover newly available buckets. The refresh is triggered 
when the number of empty buckets drops below a configurable threshold 
(dynamic-bucket.empty-bucket-threshold) and respects a minimum time interval 
between refreshes
     (dynamic-bucket.refresh-interval) to avoid excessive overhead. The refresh 
runs asynchronously in a separate thread without blocking bucket assignment 
operations, ensuring that the system can continuously discover and reuse 
buckets that have become available due to compaction or data deletion.
   
   ### Anything else?
   
   This it would be an approach. 
   
   In PartitionIndex assign method
   ```
   
     // 1. is it a key that has appeared before
          if (hash2Bucket.containsKey(hash)) {
               return hash2Bucket.get(hash);
           }
           
          //new code
           if (shouldRefreshEmptyBuckets(maxBucketId, 
minEmptyBucketsBeforeAsyncCheck)
                   && isReachedTheMinRefreshInterval(minRefreshInterval)) {
               refreshBucketsFromDisk();
           }
   
   ```
   
   New methods to check if we should execute the refresh
   
   ```
    private boolean shouldRefreshEmptyBuckets(
               int maxBucketId, int minEmptyBucketsBeforeAsyncCheck) {
           return maxBucketId != -1
                   && minEmptyBucketsBeforeAsyncCheck != -1
                   && (nonFullBucketInformation.size()
                   == maxBucketId - minEmptyBucketsBeforeAsyncCheck);
       }
   
       private boolean isReachedTheMinRefreshInterval(final Duration duration) {
           return Instant.now().isAfter(lastRefreshTime.plus(duration));
       }
   ```
   
   And the refresh method:
   ```
   
     private void refreshBucketsFromDisk() {
           // Only start refresh if not already in progress
           if (refreshFuture == null || refreshFuture.isDone()) {
               refreshFuture =
                       CompletableFuture.runAsync(
                               () -> {
                                   try {
                                       List<IndexManifestEntry> files =
                                               
indexFileHandler.scanEntries(HASH_INDEX, partition);
                                       Map<Integer, Long> tempBucketInfo = new 
HashMap<>();
   
                                       for (IndexManifestEntry file : files) {
                                           long currentNumberOfRows = 
file.indexFile().rowCount();
                                           if (currentNumberOfRows < 
targetBucketRowNumber) {
                                               
tempBucketInfo.put(file.bucket(), currentNumberOfRows);
                                           }
                                       }
                                       //nonFullBucketInformation is a 
ConcurrentHashMap
                                       
nonFullBucketInformation.putAll(tempBucketInfo);
                                       lastRefreshTime = Instant.now();
                                   } catch (Exception e) {
                                       // Log error instead of throwing
                                       System.err.println(
                                               "Error refreshing buckets from 
disk: "
                                                       + e.getMessage());
                                   }
                               });
           }
       }
   
   ```
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to