This is an automated email from the ASF dual-hosted git repository.
frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 09881574c3a Refactor segmentLocalCacheManager (#18494)
09881574c3a is described below
commit 09881574c3a653e83d4bb47d44ddd2bf86f70d86
Author: Virushade <[email protected]>
AuthorDate: Wed Oct 22 10:34:28 2025 +0800
Refactor segmentLocalCacheManager (#18494)
* Refactor segmentLocalCacheManager
* Try catch for addFilesToCachedSegments
---
.../segment/loading/SegmentLocalCacheManager.java | 124 +++++++++++----------
1 file changed, 68 insertions(+), 56 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 162e20a28d1..104304dce6a 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -61,6 +61,7 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
@@ -188,68 +189,17 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
"canHandleSegments() is false. getCachedSegments() must be invoked
only when canHandleSegments() returns true."
);
}
- final File infoDir = getEffectiveInfoDir();
- FileUtils.mkdirp(infoDir);
final List<DataSegment> cachedSegments = new ArrayList<>();
- final File[] segmentsToLoad = infoDir.listFiles();
+ final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
- int ignored = 0;
+ AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
for (int i = 0; i < segmentsToLoad.length; i++) {
final File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i + 1,
segmentsToLoad.length, file);
try {
- final DataSegment segment = jsonMapper.readValue(file,
DataSegment.class);
- boolean removeInfo = false;
- if (!segment.getId().toString().equals(file.getName())) {
- log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(),
segment.getId());
- ignored++;
- } else {
- removeInfo = true;
- final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
- for (StorageLocation location : locations) {
- // check for migrate from old nested local storage path format
- final File legacyPath = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (legacyPath.exists()) {
- final File destination =
cacheEntry.toPotentialLocation(location.getPath());
- FileUtils.mkdirp(destination);
- final File[] oldFiles = legacyPath.listFiles();
- final File[] newFiles = destination.listFiles();
- // make sure old files exist and new files do not exist
- if (oldFiles != null && oldFiles.length > 0 && newFiles != null
&& newFiles.length == 0) {
- Files.move(legacyPath.toPath(), destination.toPath(),
StandardCopyOption.ATOMIC_MOVE);
- }
- cleanupLegacyCacheLocation(location.getPath(), legacyPath);
- }
-
- if (cacheEntry.checkExists(location.getPath())) {
- removeInfo = false;
- final boolean reserveResult;
- if (config.isVirtualStorage()) {
- reserveResult = location.reserveWeak(cacheEntry);
- } else {
- reserveResult = location.reserve(cacheEntry);
- }
- if (!reserveResult) {
- log.makeAlert(
- "storage[%s:%,d] has more segments than it is allowed.
Currently loading Segment[%s:%,d]. Please increase druid.segmentCache.locations
maxSize param",
- location.getPath(),
- location.availableSizeBytes(),
- segment.getId(),
- segment.getSize()
- ).emit();
- }
- cachedSegments.add(segment);
- }
- }
- }
-
- if (removeInfo) {
- final SegmentId segmentId = segment.getId();
- log.warn("Unable to find cache file for segment[%s]. Deleting lookup
entry.", segmentId);
- removeInfoFile(segment);
- }
+ addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
}
catch (Exception e) {
log.makeAlert(e, "Failed to load segment from segment cache file.")
@@ -258,15 +208,77 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
}
- if (ignored > 0) {
+ if (ignoredFilesCounter.get() > 0) {
log.makeAlert("Ignored misnamed segment cache files on startup.")
- .addData("numIgnored", ignored)
+ .addData("numIgnored", ignoredFilesCounter.get())
.emit();
}
return cachedSegments;
}
+ private void addFilesToCachedSegments(File file, AtomicInteger ignored,
List<DataSegment> cachedSegments) throws IOException
+ {
+ final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
+ boolean removeInfo = false;
+ if (!segment.getId().toString().equals(file.getName())) {
+ log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(),
segment.getId());
+ ignored.incrementAndGet();
+ } else {
+ removeInfo = true;
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ for (StorageLocation location : locations) {
+ // check for migrate from old nested local storage path format
+ final File legacyPath = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
+ if (legacyPath.exists()) {
+ final File destination =
cacheEntry.toPotentialLocation(location.getPath());
+ FileUtils.mkdirp(destination);
+ final File[] oldFiles = legacyPath.listFiles();
+ final File[] newFiles = destination.listFiles();
+ // make sure old files exist and new files do not exist
+ if (oldFiles != null && oldFiles.length > 0 && newFiles != null &&
newFiles.length == 0) {
+ Files.move(legacyPath.toPath(), destination.toPath(),
StandardCopyOption.ATOMIC_MOVE);
+ }
+ cleanupLegacyCacheLocation(location.getPath(), legacyPath);
+ }
+
+ if (cacheEntry.checkExists(location.getPath())) {
+ removeInfo = false;
+ final boolean reserveResult;
+ if (config.isVirtualStorage()) {
+ reserveResult = location.reserveWeak(cacheEntry);
+ } else {
+ reserveResult = location.reserve(cacheEntry);
+ }
+ if (!reserveResult) {
+ log.makeAlert(
+ "storage[%s:%,d] has more segments than it is allowed.
Currently loading Segment[%s:%,d]. Please increase druid.segmentCache.locations
maxSize param",
+ location.getPath(),
+ location.availableSizeBytes(),
+ segment.getId(),
+ segment.getSize()
+ ).emit();
+ }
+ cachedSegments.add(segment);
+ }
+ }
+ }
+
+ if (removeInfo) {
+ final SegmentId segmentId = segment.getId();
+ log.warn("Unable to find cache file for segment[%s]. Deleting lookup
entry.", segmentId);
+ removeInfoFile(segment);
+ }
+ }
+
+ private File[] retrieveSegmentMetadataFiles() throws IOException
+ {
+ final File infoDir = getEffectiveInfoDir();
+ FileUtils.mkdirp(infoDir);
+ File[] files = infoDir.listFiles();
+ return files == null ? new File[0] : files;
+ }
+
@Override
public void storeInfoFile(final DataSegment segment) throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]