This is an automated email from the ASF dual-hosted git repository.
gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 74dee8410dd Concurrent loading segment cache (#18489)
74dee8410dd is described below
commit 74dee8410dd680cf02fbc9c440a183a26547acf1
Author: Virushade <[email protected]>
AuthorDate: Mon Jan 12 17:47:32 2026 +0800
Concurrent loading segment cache (#18489)
* Get Cached Segment concurrently
* Checkstyle
* Refactor addFilesToCachedSegments
* Stopwatch refactor
* Revert unrelated changes
* infoDir = getEffectiveInfoDir
* Refactor segmentLocalCacheManager
* Try catch for addFilesToCachedSegments
* Concurrent Load for Segment Metadata Files
* Use List instead of general Collection
* Cleanup after caching segments
* Change loadToCachedSegmentsFromFile to match old style
* Tidy up executor service
* Latch countdown should be issued in finally block
* Make it easier to review this change
* Attend to code review
* Make some variables final
* Fix verbose variable name
---
.../segment/loading/SegmentLocalCacheManager.java | 65 +++++++++++++++++-----
1 file changed, 50 insertions(+), 15 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 579bf7ff97d..0ecaa615e5d 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -34,6 +34,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -52,7 +53,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -60,6 +60,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -230,22 +232,51 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
}
- final List<DataSegment> cachedSegments = new ArrayList<>();
+ final ConcurrentLinkedQueue<DataSegment> cachedSegments = new
ConcurrentLinkedQueue<>();
final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+ final CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+ // If there is no dedicated bootstrap executor, perform the loading
sequentially on the current thread.
+ final boolean isLoadingSegmentsSequentially = loadOnBootstrapExec == null;
+ final ExecutorService executorService = isLoadingSegmentsSequentially
+ ?
MoreExecutors.newDirectExecutorService()
+ : loadOnBootstrapExec;
AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
- for (int i = 0; i < segmentsToLoad.length; i++) {
- final File file = segmentsToLoad[i];
- log.info("Loading segment cache file [%d/%d][%s].", i + 1,
segmentsToLoad.length, file);
- try {
- addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to load segment from segment cache file.")
- .addData("file", file)
- .emit();
- }
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Loading [%d] segments from disk to cache.",
segmentsToLoad.length);
+
+ for (File file : segmentsToLoad) {
+ executorService.submit(() -> {
+ try {
+ addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to load segment from segment cache file.")
+ .addData("file", file)
+ .emit();
+ }
+ finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.noStackTrace().error(e, "Interrupted when trying to retrieve cached
segment metadata files");
+ }
+
+ stopwatch.stop();
+ log.info("Loaded [%d/%d] cached segments in [%d]ms.",
cachedSegments.size(), segmentsToLoad.length, stopwatch.millisElapsed());
+
+ if (isLoadingSegmentsSequentially) {
+ // Shutdown the direct executor service we created previously in this
method.
+ executorService.shutdown();
}
if (ignoredFilesCounter.get() > 0) {
@@ -254,10 +285,14 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
.emit();
}
- return cachedSegments;
+ return List.copyOf(cachedSegments);
}
- private void addFilesToCachedSegments(File file, AtomicInteger ignored,
List<DataSegment> cachedSegments) throws IOException
+ private void addFilesToCachedSegments(
+ File file,
+ AtomicInteger ignored,
+ ConcurrentLinkedQueue<DataSegment> cachedSegments
+ ) throws IOException
{
final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (!segment.getId().toString().equals(file.getName())) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]