This is an automated email from the ASF dual-hosted git repository.

gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 74dee8410dd Concurrent loading segment cache (#18489)
74dee8410dd is described below

commit 74dee8410dd680cf02fbc9c440a183a26547acf1
Author: Virushade <[email protected]>
AuthorDate: Mon Jan 12 17:47:32 2026 +0800

    Concurrent loading segment cache (#18489)
    
    * Get Cached Segment concurrently
    
    * Checkstyle
    
    * Refactor addFilesToCachedSegments
    
    * Stopwatch refactor
    
    * Revert unrelated changes
    
    * infoDir = getEffectiveInfoDir
    
    * Refactor segmentLocalCacheManager
    
    * Try catch for addFilesToCachedSegments
    
    * Concurrent Load for Segment Metadata Files
    
    * Use List instead of general Collection
    
    * Cleanup after caching segments
    
    * Change loadToCachedSegmentsFromFile to match old style
    
    * Tidy up executor service
    
    * Latch countdown should be issued in finally block
    
    * Make it easier to review this change
    
    * Attend to code review
    
    * Make some variables final
    
    * Fix verbose variable name
---
 .../segment/loading/SegmentLocalCacheManager.java  | 65 +++++++++++++++++-----
 1 file changed, 50 insertions(+), 15 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 579bf7ff97d..0ecaa615e5d 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -34,6 +34,7 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -52,7 +53,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -60,6 +60,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -230,22 +232,51 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       }
     }
 
-    final List<DataSegment> cachedSegments = new ArrayList<>();
+    final ConcurrentLinkedQueue<DataSegment> cachedSegments = new 
ConcurrentLinkedQueue<>();
     final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+    final CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+    // If there is no dedicated bootstrap executor, perform the loading 
sequentially on the current thread.
+    final boolean isLoadingSegmentsSequentially = loadOnBootstrapExec == null;
+    final ExecutorService executorService = isLoadingSegmentsSequentially
+                                            ? 
MoreExecutors.newDirectExecutorService()
+                                            : loadOnBootstrapExec;
 
     AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
 
-    for (int i = 0; i < segmentsToLoad.length; i++) {
-      final File file = segmentsToLoad[i];
-      log.info("Loading segment cache file [%d/%d][%s].", i + 1, 
segmentsToLoad.length, file);
-      try {
-        addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
-      }
-      catch (Exception e) {
-        log.makeAlert(e, "Failed to load segment from segment cache file.")
-           .addData("file", file)
-           .emit();
-      }
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Loading [%d] segments from disk to cache.", 
segmentsToLoad.length);
+
+    for (File file : segmentsToLoad) {
+      executorService.submit(() -> {
+        try {
+          addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
+        }
+        catch (Exception e) {
+          log.makeAlert(e, "Failed to load segment from segment cache file.")
+             .addData("file", file)
+             .emit();
+        }
+        finally {
+          latch.countDown();
+        }
+      });
+    }
+
+    try {
+      latch.await();
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      log.noStackTrace().error(e, "Interrupted when trying to retrieve cached 
segment metadata files");
+    }
+
+    stopwatch.stop();
+    log.info("Loaded [%d/%d] cached segments in [%d]ms.", 
cachedSegments.size(), segmentsToLoad.length, stopwatch.millisElapsed());
+
+    if (isLoadingSegmentsSequentially) {
+      // Shutdown the direct executor service we created previously in this 
method.
+      executorService.shutdown();
     }
 
     if (ignoredFilesCounter.get() > 0) {
@@ -254,10 +285,14 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
          .emit();
     }
 
-    return cachedSegments;
+    return List.copyOf(cachedSegments);
   }
 
-  private void addFilesToCachedSegments(File file, AtomicInteger ignored, 
List<DataSegment> cachedSegments) throws IOException
+  private void addFilesToCachedSegments(
+      File file,
+      AtomicInteger ignored,
+      ConcurrentLinkedQueue<DataSegment> cachedSegments
+  ) throws IOException
   {
     final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
     if (!segment.getId().toString().equals(file.getName())) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to