clintropolis commented on code in PR #18176:
URL: https://github.com/apache/druid/pull/18176#discussion_r2227069917
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -595,96 +448,107 @@ public void cleanup(DataSegment segment)
}
@Override
- public void loadSegmentIntoPageCache(DataSegment segment)
+ public void shutdownBootstrap()
{
- if (loadOnDownloadExec == null) {
+ if (loadOnBootstrapExec == null) {
return;
}
-
- loadOnDownloadExec.submit(() -> loadSegmentIntoPageCacheInternal(segment));
+ loadOnBootstrapExec.shutdown();
}
@Override
- public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
+ public void shutdown()
{
- if (loadOnBootstrapExec == null) {
- return;
+ if (loadOnDownloadExec != null) {
+ loadOnDownloadExec.shutdown();
+ }
+ if (virtualStorageFabricLoadOnDemandExec != null) {
+ virtualStorageFabricLoadOnDemandExec.shutdown();
}
+ }
- loadOnBootstrapExec.submit(() ->
loadSegmentIntoPageCacheInternal(segment));
+ @VisibleForTesting
+ public ConcurrentHashMap<DataSegment, ReferenceCountingLock>
getSegmentLocks()
+ {
+ return segmentLocks;
}
- void loadSegmentIntoPageCacheInternal(DataSegment segment)
+ @VisibleForTesting
+ List<StorageLocation> getLocations()
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- for (StorageLocation location : locations) {
- File localStorageDir = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (localStorageDir.exists()) {
- File baseFile = location.getPath();
- if (localStorageDir.equals(baseFile)) {
- continue;
- }
+ return locations;
+ }
- log.info("Loading directory[%s] into page cache.",
localStorageDir);
-
- File[] children = localStorageDir.listFiles();
- if (children != null) {
- for (File child : children) {
- try (InputStream in = Files.newInputStream(child.toPath())) {
- IOUtils.copy(in, NullOutputStream.NULL_OUTPUT_STREAM);
- log.info("Loaded [%s] into page cache.",
child.getAbsolutePath());
- }
- catch (Exception e) {
- log.error(e, "Failed to load [%s] into page cache",
child.getAbsolutePath());
- }
- }
- }
- }
- }
- }
- finally {
- unlock(segment, lock);
+ /**
+ * Checks whether a segment is already cached. This method does not confirm
if the segment is actually mounted in
+ * the location, or even that the segment files in some location are valid,
just that some files exist in the
+ * specified location
+ */
+ @VisibleForTesting
+ boolean isSegmentCached(final DataSegment segment)
+ {
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ for (StorageLocation location : locations) {
+ if (cacheEntry.checkExists(location.getPath())) {
+ return true;
}
}
+ return false;
}
- @Override
- public void shutdownBootstrap()
+ /**
+ * Returns the effective segment info directory based on the configuration
settings.
+ * The directory is selected based on the following configurations injected
into this class:
+ * <ul>
+ * <li>{@link SegmentLoaderConfig#getInfoDir()} - If {@code infoDir} is
set, it is used as the info directory.</li>
+ * <li>{@link SegmentLoaderConfig#getLocations()} - If the info directory
is not set, the first location from this list is used.</li>
+ * <li>List of {@link StorageLocation}s injected - If both the info
directory and locations list are not set, the
+ * first storage location is used.</li>
+ * </ul>
+ *
+ * @throws DruidException if none of the configurations are set, and the
info directory cannot be determined.
+ */
+ private File getEffectiveInfoDir()
{
- if (loadOnBootstrapExec == null) {
- return;
+ final File infoDir;
+ if (config.getInfoDir() != null) {
+ infoDir = config.getInfoDir();
+ } else if (!config.getLocations().isEmpty()) {
+ infoDir = new File(config.getLocations().get(0).getPath(), "info_dir");
+ } else if (!locations.isEmpty()) {
+ infoDir = new File(locations.get(0).getPath(), "info_dir");
+ } else {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.NOT_FOUND)
+ .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
+ + "or 'druid.segmentCache.locations' is set
correctly.");
}
- loadOnBootstrapExec.shutdown();
+ return infoDir;
}
- private void cleanupCacheFiles(File baseFile, File cacheFile)
+ private Supplier<Future<Optional<Segment>>> makeOnDemandLoadSupplier(
+ final DataSegment dataSegment,
+ final SegmentCacheEntry entry,
+ final StorageLocation location
+ )
{
- if (cacheFile.equals(baseFile)) {
- return;
- }
-
- synchronized (directoryWriteRemoveLock) {
- log.info("Deleting directory[%s]", cacheFile);
- try {
- FileUtils.deleteDirectory(cacheFile);
- }
- catch (Exception e) {
- log.error(e, "Unable to remove directory[%s]", cacheFile);
- }
-
- File parent = cacheFile.getParentFile();
- if (parent != null) {
- File[] children = parent.listFiles();
- if (children == null || children.length == 0) {
- cleanupCacheFiles(baseFile, parent);
+ return () -> virtualStorageFabricLoadOnDemandExec.submit(
+ () -> {
+ final ReferenceCountingLock threadLock = lock(dataSegment);
+ synchronized (threadLock) {
+ try {
+ entry.mount(location.getPath());
+ return entry.referenceProvider.acquireReference();
+ }
+ finally {
+ unlock(dataSegment, threadLock);
+ }
+ }
}
- }
- }
+ );
}
- private ReferenceCountingLock createOrGetLock(DataSegment dataSegment)
+ private ReferenceCountingLock lock(final DataSegment dataSegment)
Review Comment:
we remove the locks when the count reduces to 0 so that we don't have to
keep a lock per segment in memory if nothing is locking it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]