kfaraz commented on code in PR #18489:
URL: https://github.com/apache/druid/pull/18489#discussion_r2629424653
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws
IOException
}
}
- final List<DataSegment> cachedSegments = new ArrayList<>();
+ final ConcurrentLinkedQueue<DataSegment> cachedSegments = new
ConcurrentLinkedQueue<>();
final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+ CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+ boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec
== null;
+ ExecutorService executorService =
createdNewExecutorServiceToLoadSegmentCache
+ ?
MoreExecutors.newDirectExecutorService()
+ : loadOnBootstrapExec;
AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
- for (int i = 0; i < segmentsToLoad.length; i++) {
- final File file = segmentsToLoad[i];
- log.info("Loading segment cache file [%d/%d][%s].", i + 1,
segmentsToLoad.length, file);
- try {
- addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to load segment from segment cache file.")
- .addData("file", file)
- .emit();
- }
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Retrieving [%d] cached segment metadata files to cache.",
segmentsToLoad.length);
+
+ for (File file : segmentsToLoad) {
+ executorService.submit(() -> {
+ try {
+ addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to load segment from segment cache file.")
+ .addData("file", file)
+ .emit();
+ }
+ finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error(e, "Interrupted when trying to retrieve cached segment
metadata files");
Review Comment:
Since it was interrupted, I think we can skip logging the stack trace.
```suggestion
log.noStackTrace().error(e, "Interrupted when trying to retrieve
cached segment metadata files");
```
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws
IOException
}
}
- final List<DataSegment> cachedSegments = new ArrayList<>();
+ final ConcurrentLinkedQueue<DataSegment> cachedSegments = new
ConcurrentLinkedQueue<>();
final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+ CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+ boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec
== null;
+ ExecutorService executorService =
createdNewExecutorServiceToLoadSegmentCache
Review Comment:
Nit: Making these variables `final` would be nice.
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws
IOException
}
}
- final List<DataSegment> cachedSegments = new ArrayList<>();
+ final ConcurrentLinkedQueue<DataSegment> cachedSegments = new
ConcurrentLinkedQueue<>();
final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+ CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+ boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec
== null;
+ ExecutorService executorService =
createdNewExecutorServiceToLoadSegmentCache
+ ?
MoreExecutors.newDirectExecutorService()
+ : loadOnBootstrapExec;
AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
- for (int i = 0; i < segmentsToLoad.length; i++) {
- final File file = segmentsToLoad[i];
- log.info("Loading segment cache file [%d/%d][%s].", i + 1,
segmentsToLoad.length, file);
- try {
- addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to load segment from segment cache file.")
- .addData("file", file)
- .emit();
- }
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Retrieving [%d] cached segment metadata files to cache.",
segmentsToLoad.length);
+
+ for (File file : segmentsToLoad) {
+ executorService.submit(() -> {
+ try {
+ addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to load segment from segment cache file.")
+ .addData("file", file)
+ .emit();
+ }
+ finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error(e, "Interrupted when trying to retrieve cached segment
metadata files");
+ }
+
+ stopwatch.stop();
+ log.info("Retrieved [%d,%d] cached segments in [%d]ms.",
cachedSegments.size(), segmentsToLoad.length, stopwatch.millisElapsed());
Review Comment:
```suggestion
log.info("Loaded [%d/%d] cached segments in [%d]ms.",
cachedSegments.size(), segmentsToLoad.length, stopwatch.millisElapsed());
```
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws
IOException
}
}
- final List<DataSegment> cachedSegments = new ArrayList<>();
+ final ConcurrentLinkedQueue<DataSegment> cachedSegments = new
ConcurrentLinkedQueue<>();
final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+ CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+ boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec
== null;
+ ExecutorService executorService =
createdNewExecutorServiceToLoadSegmentCache
+ ?
MoreExecutors.newDirectExecutorService()
+ : loadOnBootstrapExec;
AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
- for (int i = 0; i < segmentsToLoad.length; i++) {
- final File file = segmentsToLoad[i];
- log.info("Loading segment cache file [%d/%d][%s].", i + 1,
segmentsToLoad.length, file);
- try {
- addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to load segment from segment cache file.")
- .addData("file", file)
- .emit();
- }
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Retrieving [%d] cached segment metadata files to cache.",
segmentsToLoad.length);
Review Comment:
```suggestion
log.info("Loading [%d] segments from disk to cache.",
segmentsToLoad.length);
```
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws
IOException
}
}
- final List<DataSegment> cachedSegments = new ArrayList<>();
+ final ConcurrentLinkedQueue<DataSegment> cachedSegments = new
ConcurrentLinkedQueue<>();
final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+ CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+ boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec
== null;
Review Comment:
```suggestion
// If there is no dedicated bootstrap executor, perform the loading
sequentially on the current thread
boolean createdNewExecutorServiceToLoadSegmentCache =
loadOnBootstrapExec == null;
```
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -249,10 +279,14 @@ public List<DataSegment> getCachedSegments() throws
IOException
.emit();
}
- return cachedSegments;
+ return new ArrayList<>(cachedSegments);
Review Comment:
Return an immutable list:
```suggestion
return List.copyOf(cachedSegments);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]