github-advanced-security[bot] commented on code in PR #18176:
URL: https://github.com/apache/druid/pull/18176#discussion_r2176360516
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -272,481 +449,422 @@
infoDir = new File(locations.get(0).getPath(), "info_dir");
} else {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
- .ofCategory(DruidException.Category.NOT_FOUND)
- .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
- + "or 'druid.segmentCache.locations' is set correctly.");
+ .ofCategory(DruidException.Category.NOT_FOUND)
+ .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
+ + "or 'druid.segmentCache.locations' is set
correctly.");
}
return infoDir;
}
- private static String getSegmentDir(DataSegment segment)
+ @Override
+ public void drop(DataSegment segment)
{
- return DataSegmentPusher.getDefaultStorageDir(segment, false);
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ final ReferenceCountingLock lock = lock(segment);
+ synchronized (lock) {
+ try {
+ // always unmount on cleanup to unmap the segment
+ cacheEntry.unmount();
+ if (!config.isDeleteOnRemove()) {
+ return;
+ }
+ for (StorageLocation location : locations) {
+ if (cacheEntry.checkExists(location.getPath())) {
+ cleanupCacheFiles(location.getPath(), new File(location.getPath(),
getSegmentDir(segment)));
+ location.release(cacheEntry);
+ }
+ }
+ }
+ finally {
+ unlock(segment, lock);
+ }
+ }
}
- /**
- * Checks whether a segment is already cached. It can return false even if
{@link #reserve(DataSegment)}
- * has been successful for a segment but is not downloaded yet.
- */
- boolean isSegmentCached(final DataSegment segment)
+ @Override
+ public void shutdownBootstrap()
{
- return findStoragePathIfCached(segment) != null;
+ if (loadOnBootstrapExec == null) {
+ return;
+ }
+ loadOnBootstrapExec.shutdown();
}
- /**
- * This method will try to find if the segment is already downloaded on any
location. If so, the segment path
- * is returned. Along with that, location state is also updated with the
segment location. Refer to
- * {@link StorageLocation#maybeReserve(String, DataSegment)} for more
details.
- * If the segment files are damaged in any location, they are removed from
the location.
- * @param segment - Segment to check
- * @return - Path corresponding to segment directory if found, null
otherwise.
- */
- @Nullable
- private File findStoragePathIfCached(final DataSegment segment)
+ @VisibleForTesting
+ public ConcurrentHashMap<DataSegment, ReferenceCountingLock>
getSegmentLocks()
{
- for (StorageLocation location : locations) {
- String storageDir = getSegmentDir(segment);
- File localStorageDir = location.segmentDirectoryAsFile(storageDir);
- if (localStorageDir.exists()) {
- if (checkSegmentFilesIntact(localStorageDir)) {
- log.warn(
- "[%s] may be damaged. Delete all the segment files and pull from
DeepStorage again.",
- localStorageDir.getAbsolutePath()
- );
- cleanupCacheFiles(location.getPath(), localStorageDir);
- location.removeSegmentDir(localStorageDir, segment);
- break;
- } else {
- // Before returning, we also reserve the space. Refer to the
StorageLocation#maybeReserve documentation for details.
- location.maybeReserve(storageDir, segment);
- return localStorageDir;
- }
- }
- }
- return null;
+ return segmentLocks;
}
- /**
- * check data intact.
- * @param dir segments cache dir
- * @return true means segment files may be damaged.
- */
- private boolean checkSegmentFilesIntact(File dir)
+ @VisibleForTesting
+ public List<StorageLocation> getLocations()
{
- return checkSegmentFilesIntactWithStartMarker(dir);
+ return locations;
}
/**
- * If there is 'downloadStartMarker' existed in localStorageDir, the
segments files might be damaged.
- * Because each time, Druid will delete the 'downloadStartMarker' file after
pulling and unzip the segments from DeepStorage.
- * downloadStartMarker existed here may mean something error during download
segments and the segment files may be damaged.
+ * Checks whether a segment is already cached.
*/
- private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
+ boolean isSegmentCached(final DataSegment segment)
{
- final File downloadStartMarker = new File(localStorageDir.getPath(),
DOWNLOAD_START_MARKER_FILE_NAME);
- return downloadStartMarker.exists();
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ for (StorageLocation location : locations) {
+ if (cacheEntry.checkExists(location.getPath())) {
+ return true;
+ }
+ }
+ return false;
}
- /**
- * Make sure segments files in loc is intact, otherwise function like
loadSegments will failed because of segment files is damaged.
- * @param segment
- * @return
- * @throws SegmentLoadingException
- */
- @Override
- public File getSegmentFiles(DataSegment segment) throws
SegmentLoadingException
+ public ReferenceCountingLock lock(DataSegment dataSegment)
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- File segmentDir = findStoragePathIfCached(segment);
- if (segmentDir != null) {
- return segmentDir;
+ return segmentLocks.compute(
+ dataSegment,
+ (segment, lock) -> {
+ final ReferenceCountingLock nonNullLock;
+ if (lock == null) {
+ nonNullLock = new ReferenceCountingLock();
+ } else {
+ nonNullLock = lock;
+ }
+ nonNullLock.increment();
+ return nonNullLock;
}
+ );
+ }
- return loadSegmentWithRetry(segment);
- }
- finally {
- unlock(segment, lock);
- }
- }
+ private void unlock(DataSegment dataSegment, ReferenceCountingLock lock)
+ {
+ segmentLocks.compute(
+ dataSegment,
+ (segment, existingLock) -> {
+ if (existingLock == null) {
+ throw new ISE("Lock has already been removed");
+ } else if (existingLock != lock) {
+ throw new ISE("Different lock instance");
+ } else {
+ if (existingLock.numReferences == 1) {
+ return null;
+ } else {
+ existingLock.decrement();
+ return existingLock;
+ }
+ }
+ }
+ );
}
- /**
- * If we have already reserved a location before, probably via {@link
#reserve(DataSegment)}, then only that location
- * should be tried. Otherwise, we would fetch locations using {@link
StorageLocationSelectorStrategy} and try all
- * of them one by one till there is success.
- * Location may fail because of IO failure, most likely in two cases:<p>
- * 1. druid don't have the write access to this location, most likely the
administrator doesn't config it correctly<p>
- * 2. disk failure, druid can't read/write to this disk anymore
- * <p>
- * Locations are fetched using {@link StorageLocationSelectorStrategy}.
- */
- private File loadSegmentWithRetry(DataSegment segment) throws
SegmentLoadingException
+ private SegmentCacheEntry assignLocationAndMount(
+ SegmentCacheEntry cacheEntry,
+ SegmentLazyLoadFailCallback segmentLoadFailCallback
+ ) throws SegmentLoadingException
{
- String segmentDir = getSegmentDir(segment);
-
- // Try the already reserved location. If location has been reserved
outside, then we do not release the location
- // here and simply delete any downloaded files. That is, we revert
anything we do in this function and nothing else.
- for (StorageLocation loc : locations) {
- if (loc.isReserved(segmentDir)) {
- File storageDir = loc.segmentDirectoryAsFile(segmentDir);
- boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, false);
- if (!success) {
- throw new SegmentLoadingException(
- "Failed to load segment[%s] in reserved location[%s]",
segment.getId(), loc.getPath().getAbsolutePath()
- );
+ try {
+ for (StorageLocation location : locations) {
+ if (cacheEntry.checkExists(location.getPath())) {
+ if (location.isReserved(cacheEntry.id) ||
location.reserve(cacheEntry)) {
+ final SegmentCacheEntry entry =
location.getCacheEntry(cacheEntry.id);
+ entry.lazyLoadCallback = segmentLoadFailCallback;
+ entry.mount(location.getPath());
+ return entry;
+ } else {
+ cleanupCacheFiles(location.getPath(),
cacheEntry.toPotentialLocation(location.getPath()));
+ }
}
- return storageDir;
}
}
-
- // No location was reserved so we try all the locations
- Iterator<StorageLocation> locationsIterator = strategy.getLocations();
+ catch (IOException | SegmentLoadingException e) {
+ log.warn(e, "Failed to load segment[%s] in existing location, trying new
location", cacheEntry.id);
+ }
+ final Iterator<StorageLocation> locationsIterator =
strategy.getLocations();
while (locationsIterator.hasNext()) {
-
- StorageLocation loc = locationsIterator.next();
-
- // storageDir is the file path corresponding to segment dir
- File storageDir = loc.reserve(segmentDir, segment);
- if (storageDir != null) {
- boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, true);
- if (success) {
- return storageDir;
+ final StorageLocation location = locationsIterator.next();
+ if (location.reserve(cacheEntry)) {
+ try {
+ final SegmentCacheEntry entry =
location.getCacheEntry(cacheEntry.id);
+ entry.lazyLoadCallback = segmentLoadFailCallback;
+ entry.mount(location.getPath());
+ return entry;
+ }
+ catch (IOException | SegmentLoadingException e) {
+ log.warn(e, "Failed to load segment[%s] in location[%s], trying next
location", cacheEntry.id, location.getPath());
}
}
}
- throw new SegmentLoadingException("Failed to load segment[%s] in all
locations.", segment.getId());
+ throw new SegmentLoadingException("Failed to load segment[%s] in all
locations.", cacheEntry.id);
}
- /**
- * A helper method over {@link #loadInLocationWithStartMarker(DataSegment,
File)} that catches the {@link SegmentLoadingException}
- * and emits alerts.
- * @param loc - {@link StorageLocation} where segment is to be downloaded in.
- * @param segment - {@link DataSegment} to download
- * @param storageDir - {@link File} pointing to segment directory
- * @param releaseLocation - Whether to release the location in case of
failures
- * @return - True if segment was downloaded successfully, false otherwise.
- */
- private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc,
DataSegment segment, File storageDir, boolean releaseLocation)
+ private void cleanupCacheFiles(File baseFile, File cacheFile)
{
- try {
- loadInLocationWithStartMarker(segment, storageDir);
- return true;
+ if (cacheFile.equals(baseFile)) {
+ return;
}
- catch (SegmentLoadingException e) {
+
+ directoryWriteRemoveLock.writeLock().lock();
+ try {
+ log.info("Deleting directory[%s]", cacheFile);
try {
- log.makeAlert(
- e,
- "Failed to load segment in current location [%s], try next
location if any",
- loc.getPath().getAbsolutePath()
- ).addData("location", loc.getPath().getAbsolutePath()).emit();
+ FileUtils.deleteDirectory(cacheFile);
}
- finally {
- if (releaseLocation) {
- loc.removeSegmentDir(storageDir, segment);
- }
- cleanupCacheFiles(loc.getPath(), storageDir);
+ catch (Exception e) {
+ log.error(e, "Unable to remove directory[%s]", cacheFile);
}
- }
- return false;
- }
- private void loadInLocationWithStartMarker(DataSegment segment, File
storageDir) throws SegmentLoadingException
- {
- // We use a marker to prevent the case where a segment is downloaded, but
before the download completes,
- // the parent directories of the segment are removed
- final File downloadStartMarker = new File(storageDir,
DOWNLOAD_START_MARKER_FILE_NAME);
- synchronized (directoryWriteRemoveLock) {
- try {
- FileUtils.mkdirp(storageDir);
-
- if (!downloadStartMarker.createNewFile()) {
- throw new SegmentLoadingException("Was not able to create new
download marker for [%s]", storageDir);
+ File parent = cacheFile.getParentFile();
+ if (parent != null) {
+ File[] children = parent.listFiles();
+ if (children == null || children.length == 0) {
+ cleanupCacheFiles(baseFile, parent);
}
}
- catch (IOException e) {
- throw new SegmentLoadingException(e, "Unable to create marker file for
[%s]", storageDir);
- }
}
- loadInLocation(segment, storageDir);
-
- if (!downloadStartMarker.delete()) {
- throw new SegmentLoadingException("Unable to remove marker file for
[%s]", storageDir);
+ finally {
+ directoryWriteRemoveLock.writeLock().unlock();
}
}
- private void loadInLocation(DataSegment segment, File storageDir) throws
SegmentLoadingException
+ private static String getSegmentDir(DataSegment segment)
{
- // LoadSpec isn't materialized until here so that any system can interpret
Segment without having to have all the
- // LoadSpec dependencies.
- final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(),
LoadSpec.class);
- final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
- if (result.getSize() != segment.getSize()) {
- log.warn(
- "Segment [%s] is different than expected size. Expected [%d] found
[%d]",
- segment.getId(),
- segment.getSize(),
- result.getSize()
- );
- }
+ return DataSegmentPusher.getDefaultStorageDir(segment, false);
}
- @Override
- public boolean reserve(final DataSegment segment)
+ /**
+ * check if segment data is possibly corrupted.
+ * @param dir segments cache dir
+ * @return true means segment files may be damaged.
+ */
+ private static boolean isPossiblyCorrupted(File dir)
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- // Maybe the segment was already loaded. This check is required to
account for restart scenarios.
- if (null != findStoragePathIfCached(segment)) {
- return true;
- }
+ return hasStartMarker(dir);
+ }
- String storageDirStr = getSegmentDir(segment);
+ /**
+ * If {@link #DOWNLOAD_START_MARKER_FILE_NAME} exists in the path, the
segment files might be damaged because this
+ * file is typically deleted after the segment is pulled from deep storage.
+ */
+ private static boolean hasStartMarker(File localStorageDir)
+ {
+ final File downloadStartMarker = new File(localStorageDir.getPath(),
DOWNLOAD_START_MARKER_FILE_NAME);
+ return downloadStartMarker.exists();
+ }
- // check if we already reserved the segment
- for (StorageLocation location : locations) {
- if (location.isReserved(storageDirStr)) {
- return true;
- }
- }
+ private static class ReferenceCountingLock
+ {
+ private int numReferences;
- // Not found in any location, reserve now
- for (Iterator<StorageLocation> it = strategy.getLocations();
it.hasNext(); ) {
- StorageLocation location = it.next();
- if (null != location.reserve(storageDirStr, segment)) {
- return true;
- }
- }
- }
- finally {
- unlock(segment, lock);
- }
+ private void increment()
+ {
+ ++numReferences;
}
- return false;
+ private void decrement()
+ {
+ --numReferences;
+ }
}
- @Override
- public boolean release(final DataSegment segment)
+ private class SegmentCacheEntry implements CacheEntry
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- String storageDir = getSegmentDir(segment);
+ private final SegmentCacheEntryIdentifier id;
+ private final DataSegment dataSegment;
+ private final String relativePathString;
+ private SegmentLazyLoadFailCallback lazyLoadCallback =
SegmentLazyLoadFailCallback.NOOP;
+ private File locationRoot;
+ private File storageDir;
+ private ReferenceCountedSegmentProvider referenceProvider;
- // Release the first location encountered
- for (StorageLocation location : locations) {
- if (location.isReserved(storageDir)) {
- File localStorageDir = location.segmentDirectoryAsFile(storageDir);
- if (localStorageDir.exists()) {
- throw new ISE(
- "Asking to release a location '%s' while the segment
directory '%s' is present on disk. Any state on disk must be deleted before
releasing",
- location.getPath().getAbsolutePath(),
- localStorageDir.getAbsolutePath()
- );
- }
- return location.release(storageDir, segment.getSize());
- }
- }
- }
- finally {
- unlock(segment, lock);
- }
+ private SegmentCacheEntry(DataSegment dataSegment)
+ {
+ this.dataSegment = dataSegment;
+ this.id = new SegmentCacheEntryIdentifier(dataSegment.getId());
+ this.relativePathString = getSegmentDir(dataSegment);
}
- return false;
- }
+ @Override
+ public SegmentCacheEntryIdentifier getId()
+ {
+ return id;
+ }
- @Override
- public void cleanup(DataSegment segment)
- {
- if (!config.isDeleteOnRemove()) {
- return;
+ @Override
+ public long getSize()
+ {
+ return dataSegment.getSize();
}
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- File loc = findStoragePathIfCached(segment);
+ public boolean checkExists(File location)
+ {
+ return toPotentialLocation(location).exists();
+ }
- if (loc == null) {
- log.warn("Asked to cleanup something[%s] that didn't exist.
Skipping.", segment.getId());
+ public File toPotentialLocation(File location)
+ {
+ return new File(location, relativePathString);
+ }
+
+ @Override
+ public void mount(File location) throws IOException,
SegmentLoadingException
+ {
+ if (locationRoot != null) {
+ log.debug("already mounted [%s] in location[%s], but asked to load in
[%s], unmounting old location", id, locationRoot, location);
+ if (!locationRoot.equals(location)) {
+ unmount();
+ } else {
+ log.debug("already mounted [%s] in location[%s]", id, location);
return;
}
- // If storageDir.mkdirs() success, but
downloadStartMarker.createNewFile() failed,
- // in this case, findStorageLocationIfLoaded() will think segment is
located in the failed storageDir which is actually not.
- // So we should always clean all possible locations here
- for (StorageLocation location : locations) {
- File localStorageDir = new File(location.getPath(),
getSegmentDir(segment));
- if (localStorageDir.exists()) {
- // Druid creates folders of the form
dataSource/interval/version/partitionNum.
- // We need to clean up all these directories if they are all empty.
- cleanupCacheFiles(location.getPath(), localStorageDir);
- location.removeSegmentDir(localStorageDir, segment);
+ }
+ locationRoot = location;
+ storageDir = new File(location, relativePathString);
+ try {
+ boolean needsLoad = true;
+ if (storageDir.exists()) {
+ if (isPossiblyCorrupted(storageDir)) {
+ log.warn(
+ "[%s] may be damaged. Delete all the segment files and pull
from DeepStorage again.",
+ storageDir.getAbsolutePath()
+ );
+ cleanupCacheFiles(locationRoot, storageDir);
+ } else {
+ needsLoad = false;
}
}
+ if (needsLoad) {
+ loadInLocationWithStartMarker(dataSegment, storageDir);
+ }
+ final SegmentizerFactory factory = getSegmentFactory(storageDir);
+
+ final Segment segment = factory.factorize(dataSegment, storageDir,
false, lazyLoadCallback);
+ // wipe load callback after calling
+ lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
+ referenceProvider = ReferenceCountedSegmentProvider.of(segment);
}
- finally {
- unlock(segment, lock);
+ catch (SegmentLoadingException e) {
+ try {
+ log.makeAlert(
+ e,
+ "Failed to load segment in current location [%s], try next
location if any",
+ location.getAbsolutePath()
+ ).addData("location", location.getAbsolutePath()).emit();
+
+ throw new SegmentLoadingException(
+ "Failed to load segment[%s] in reserved location[%s]",
dataSegment.getId(), location.getAbsolutePath()
+ );
+ }
+ finally {
+ unmount();
+ }
}
}
- }
- @Override
- public void loadSegmentIntoPageCache(DataSegment segment)
- {
- if (loadOnDownloadExec == null) {
- return;
+ @Override
+ public void unmount()
+ {
+ if (referenceProvider != null) {
+ referenceProvider.close();
+ referenceProvider = null;
+ }
+ if (!config.isDeleteOnRemove()) {
+ return;
+ }
+ if (storageDir != null) {
+ cleanupCacheFiles(locationRoot, storageDir);
+ storageDir = null;
+ locationRoot = null;
+ }
}
- loadOnDownloadExec.submit(() -> loadSegmentIntoPageCacheInternal(segment));
- }
-
- @Override
- public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
- {
- if (loadOnBootstrapExec == null) {
- return;
+ public void loadIntoPageCache()
+ {
+ final File[] children = storageDir.listFiles();
+ if (children != null) {
+ for (File child : children) {
+ try (InputStream in = Files.newInputStream(child.toPath())) {
+ IOUtils.copy(in, NullOutputStream.NULL_OUTPUT_STREAM);
+ log.info("Loaded [%s] into page cache.", child.getAbsolutePath());
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to load [%s] into page cache",
child.getAbsolutePath());
+ }
+ }
+ }
}
- loadOnBootstrapExec.submit(() ->
loadSegmentIntoPageCacheInternal(segment));
- }
-
- void loadSegmentIntoPageCacheInternal(DataSegment segment)
- {
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
+ private void loadInLocationWithStartMarker(DataSegment segment, File
storageDir) throws SegmentLoadingException
+ {
+ // We use a marker to prevent the case where a segment is downloaded,
but before the download completes,
+ // the parent directories of the segment are removed
+ final File downloadStartMarker = new File(storageDir,
DOWNLOAD_START_MARKER_FILE_NAME);
+ directoryWriteRemoveLock.readLock().lock();
try {
- for (StorageLocation location : locations) {
- File localStorageDir = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (localStorageDir.exists()) {
- File baseFile = location.getPath();
- if (localStorageDir.equals(baseFile)) {
- continue;
- }
+ FileUtils.mkdirp(storageDir);
- log.info("Loading directory[%s] into page cache.",
localStorageDir);
-
- File[] children = localStorageDir.listFiles();
- if (children != null) {
- for (File child : children) {
- try (InputStream in = Files.newInputStream(child.toPath())) {
- IOUtils.copy(in, NullOutputStream.NULL_OUTPUT_STREAM);
- log.info("Loaded [%s] into page cache.",
child.getAbsolutePath());
- }
- catch (Exception e) {
- log.error(e, "Failed to load [%s] into page cache",
child.getAbsolutePath());
- }
- }
- }
- }
+ if (!downloadStartMarker.createNewFile()) {
+ throw new SegmentLoadingException("Was not able to create new
download marker for [%s]", storageDir);
+ }
+ loadInLocation(segment, storageDir);
+
+ if (!downloadStartMarker.delete()) {
Review Comment:
## Uncontrolled data used in path expression
This path depends on a [user-provided value](1).
[Show more
details](https://github.com/apache/druid/security/code-scanning/9297)
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -272,481 +449,422 @@
infoDir = new File(locations.get(0).getPath(), "info_dir");
} else {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
- .ofCategory(DruidException.Category.NOT_FOUND)
- .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
- + "or 'druid.segmentCache.locations' is set correctly.");
+ .ofCategory(DruidException.Category.NOT_FOUND)
+ .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
+ + "or 'druid.segmentCache.locations' is set
correctly.");
}
return infoDir;
}
- private static String getSegmentDir(DataSegment segment)
+ @Override
+ public void drop(DataSegment segment)
{
- return DataSegmentPusher.getDefaultStorageDir(segment, false);
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ final ReferenceCountingLock lock = lock(segment);
+ synchronized (lock) {
+ try {
+ // always unmount on cleanup to unmap the segment
+ cacheEntry.unmount();
+ if (!config.isDeleteOnRemove()) {
+ return;
+ }
+ for (StorageLocation location : locations) {
+ if (cacheEntry.checkExists(location.getPath())) {
+ cleanupCacheFiles(location.getPath(), new File(location.getPath(),
getSegmentDir(segment)));
+ location.release(cacheEntry);
+ }
+ }
+ }
+ finally {
+ unlock(segment, lock);
+ }
+ }
}
- /**
- * Checks whether a segment is already cached. It can return false even if
{@link #reserve(DataSegment)}
- * has been successful for a segment but is not downloaded yet.
- */
- boolean isSegmentCached(final DataSegment segment)
+ @Override
+ public void shutdownBootstrap()
{
- return findStoragePathIfCached(segment) != null;
+ if (loadOnBootstrapExec == null) {
+ return;
+ }
+ loadOnBootstrapExec.shutdown();
}
- /**
- * This method will try to find if the segment is already downloaded on any
location. If so, the segment path
- * is returned. Along with that, location state is also updated with the
segment location. Refer to
- * {@link StorageLocation#maybeReserve(String, DataSegment)} for more
details.
- * If the segment files are damaged in any location, they are removed from
the location.
- * @param segment - Segment to check
- * @return - Path corresponding to segment directory if found, null
otherwise.
- */
- @Nullable
- private File findStoragePathIfCached(final DataSegment segment)
+ @VisibleForTesting
+ public ConcurrentHashMap<DataSegment, ReferenceCountingLock>
getSegmentLocks()
{
- for (StorageLocation location : locations) {
- String storageDir = getSegmentDir(segment);
- File localStorageDir = location.segmentDirectoryAsFile(storageDir);
- if (localStorageDir.exists()) {
- if (checkSegmentFilesIntact(localStorageDir)) {
- log.warn(
- "[%s] may be damaged. Delete all the segment files and pull from
DeepStorage again.",
- localStorageDir.getAbsolutePath()
- );
- cleanupCacheFiles(location.getPath(), localStorageDir);
- location.removeSegmentDir(localStorageDir, segment);
- break;
- } else {
- // Before returning, we also reserve the space. Refer to the
StorageLocation#maybeReserve documentation for details.
- location.maybeReserve(storageDir, segment);
- return localStorageDir;
- }
- }
- }
- return null;
+ return segmentLocks;
}
- /**
- * check data intact.
- * @param dir segments cache dir
- * @return true means segment files may be damaged.
- */
- private boolean checkSegmentFilesIntact(File dir)
+ @VisibleForTesting
+ public List<StorageLocation> getLocations()
{
- return checkSegmentFilesIntactWithStartMarker(dir);
+ return locations;
}
/**
- * If there is 'downloadStartMarker' existed in localStorageDir, the
segments files might be damaged.
- * Because each time, Druid will delete the 'downloadStartMarker' file after
pulling and unzip the segments from DeepStorage.
- * downloadStartMarker existed here may mean something error during download
segments and the segment files may be damaged.
+ * Checks whether a segment is already cached.
*/
- private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
+ boolean isSegmentCached(final DataSegment segment)
{
- final File downloadStartMarker = new File(localStorageDir.getPath(),
DOWNLOAD_START_MARKER_FILE_NAME);
- return downloadStartMarker.exists();
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ for (StorageLocation location : locations) {
+ if (cacheEntry.checkExists(location.getPath())) {
+ return true;
+ }
+ }
+ return false;
}
- /**
- * Make sure segments files in loc is intact, otherwise function like
loadSegments will failed because of segment files is damaged.
- * @param segment
- * @return
- * @throws SegmentLoadingException
- */
- @Override
- public File getSegmentFiles(DataSegment segment) throws
SegmentLoadingException
+ public ReferenceCountingLock lock(DataSegment dataSegment)
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- File segmentDir = findStoragePathIfCached(segment);
- if (segmentDir != null) {
- return segmentDir;
+ return segmentLocks.compute(
+ dataSegment,
+ (segment, lock) -> {
+ final ReferenceCountingLock nonNullLock;
+ if (lock == null) {
+ nonNullLock = new ReferenceCountingLock();
+ } else {
+ nonNullLock = lock;
+ }
+ nonNullLock.increment();
+ return nonNullLock;
}
+ );
+ }
- return loadSegmentWithRetry(segment);
- }
- finally {
- unlock(segment, lock);
- }
- }
+ private void unlock(DataSegment dataSegment, ReferenceCountingLock lock)
+ {
+ segmentLocks.compute(
+ dataSegment,
+ (segment, existingLock) -> {
+ if (existingLock == null) {
+ throw new ISE("Lock has already been removed");
+ } else if (existingLock != lock) {
+ throw new ISE("Different lock instance");
+ } else {
+ if (existingLock.numReferences == 1) {
+ return null;
+ } else {
+ existingLock.decrement();
+ return existingLock;
+ }
+ }
+ }
+ );
}
- /**
- * If we have already reserved a location before, probably via {@link
#reserve(DataSegment)}, then only that location
- * should be tried. Otherwise, we would fetch locations using {@link
StorageLocationSelectorStrategy} and try all
- * of them one by one till there is success.
- * Location may fail because of IO failure, most likely in two cases:<p>
- * 1. druid don't have the write access to this location, most likely the
administrator doesn't config it correctly<p>
- * 2. disk failure, druid can't read/write to this disk anymore
- * <p>
- * Locations are fetched using {@link StorageLocationSelectorStrategy}.
- */
- private File loadSegmentWithRetry(DataSegment segment) throws
SegmentLoadingException
+ private SegmentCacheEntry assignLocationAndMount(
+ SegmentCacheEntry cacheEntry,
+ SegmentLazyLoadFailCallback segmentLoadFailCallback
+ ) throws SegmentLoadingException
{
- String segmentDir = getSegmentDir(segment);
-
- // Try the already reserved location. If location has been reserved
outside, then we do not release the location
- // here and simply delete any downloaded files. That is, we revert
anything we do in this function and nothing else.
- for (StorageLocation loc : locations) {
- if (loc.isReserved(segmentDir)) {
- File storageDir = loc.segmentDirectoryAsFile(segmentDir);
- boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, false);
- if (!success) {
- throw new SegmentLoadingException(
- "Failed to load segment[%s] in reserved location[%s]",
segment.getId(), loc.getPath().getAbsolutePath()
- );
+ try {
+ for (StorageLocation location : locations) {
+ if (cacheEntry.checkExists(location.getPath())) {
+ if (location.isReserved(cacheEntry.id) ||
location.reserve(cacheEntry)) {
+ final SegmentCacheEntry entry =
location.getCacheEntry(cacheEntry.id);
+ entry.lazyLoadCallback = segmentLoadFailCallback;
+ entry.mount(location.getPath());
+ return entry;
+ } else {
+ cleanupCacheFiles(location.getPath(),
cacheEntry.toPotentialLocation(location.getPath()));
+ }
}
- return storageDir;
}
}
-
- // No location was reserved so we try all the locations
- Iterator<StorageLocation> locationsIterator = strategy.getLocations();
+ catch (IOException | SegmentLoadingException e) {
+ log.warn(e, "Failed to load segment[%s] in existing location, trying new
location", cacheEntry.id);
+ }
+ final Iterator<StorageLocation> locationsIterator =
strategy.getLocations();
while (locationsIterator.hasNext()) {
-
- StorageLocation loc = locationsIterator.next();
-
- // storageDir is the file path corresponding to segment dir
- File storageDir = loc.reserve(segmentDir, segment);
- if (storageDir != null) {
- boolean success = loadInLocationWithStartMarkerQuietly(loc, segment,
storageDir, true);
- if (success) {
- return storageDir;
+ final StorageLocation location = locationsIterator.next();
+ if (location.reserve(cacheEntry)) {
+ try {
+ final SegmentCacheEntry entry =
location.getCacheEntry(cacheEntry.id);
+ entry.lazyLoadCallback = segmentLoadFailCallback;
+ entry.mount(location.getPath());
+ return entry;
+ }
+ catch (IOException | SegmentLoadingException e) {
+ log.warn(e, "Failed to load segment[%s] in location[%s], trying next
location", cacheEntry.id, location.getPath());
}
}
}
- throw new SegmentLoadingException("Failed to load segment[%s] in all
locations.", segment.getId());
+ throw new SegmentLoadingException("Failed to load segment[%s] in all
locations.", cacheEntry.id);
}
- /**
- * A helper method over {@link #loadInLocationWithStartMarker(DataSegment,
File)} that catches the {@link SegmentLoadingException}
- * and emits alerts.
- * @param loc - {@link StorageLocation} where segment is to be downloaded in.
- * @param segment - {@link DataSegment} to download
- * @param storageDir - {@link File} pointing to segment directory
- * @param releaseLocation - Whether to release the location in case of
failures
- * @return - True if segment was downloaded successfully, false otherwise.
- */
- private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc,
DataSegment segment, File storageDir, boolean releaseLocation)
+ private void cleanupCacheFiles(File baseFile, File cacheFile)
{
- try {
- loadInLocationWithStartMarker(segment, storageDir);
- return true;
+ if (cacheFile.equals(baseFile)) {
+ return;
}
- catch (SegmentLoadingException e) {
+
+ directoryWriteRemoveLock.writeLock().lock();
+ try {
+ log.info("Deleting directory[%s]", cacheFile);
try {
- log.makeAlert(
- e,
- "Failed to load segment in current location [%s], try next
location if any",
- loc.getPath().getAbsolutePath()
- ).addData("location", loc.getPath().getAbsolutePath()).emit();
+ FileUtils.deleteDirectory(cacheFile);
}
- finally {
- if (releaseLocation) {
- loc.removeSegmentDir(storageDir, segment);
- }
- cleanupCacheFiles(loc.getPath(), storageDir);
+ catch (Exception e) {
+ log.error(e, "Unable to remove directory[%s]", cacheFile);
}
- }
- return false;
- }
- private void loadInLocationWithStartMarker(DataSegment segment, File
storageDir) throws SegmentLoadingException
- {
- // We use a marker to prevent the case where a segment is downloaded, but
before the download completes,
- // the parent directories of the segment are removed
- final File downloadStartMarker = new File(storageDir,
DOWNLOAD_START_MARKER_FILE_NAME);
- synchronized (directoryWriteRemoveLock) {
- try {
- FileUtils.mkdirp(storageDir);
-
- if (!downloadStartMarker.createNewFile()) {
- throw new SegmentLoadingException("Was not able to create new
download marker for [%s]", storageDir);
+ File parent = cacheFile.getParentFile();
+ if (parent != null) {
+ File[] children = parent.listFiles();
+ if (children == null || children.length == 0) {
+ cleanupCacheFiles(baseFile, parent);
}
}
- catch (IOException e) {
- throw new SegmentLoadingException(e, "Unable to create marker file for
[%s]", storageDir);
- }
}
- loadInLocation(segment, storageDir);
-
- if (!downloadStartMarker.delete()) {
- throw new SegmentLoadingException("Unable to remove marker file for
[%s]", storageDir);
+ finally {
+ directoryWriteRemoveLock.writeLock().unlock();
}
}
- private void loadInLocation(DataSegment segment, File storageDir) throws
SegmentLoadingException
+ private static String getSegmentDir(DataSegment segment)
{
- // LoadSpec isn't materialized until here so that any system can interpret
Segment without having to have all the
- // LoadSpec dependencies.
- final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(),
LoadSpec.class);
- final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
- if (result.getSize() != segment.getSize()) {
- log.warn(
- "Segment [%s] is different than expected size. Expected [%d] found
[%d]",
- segment.getId(),
- segment.getSize(),
- result.getSize()
- );
- }
+ return DataSegmentPusher.getDefaultStorageDir(segment, false);
}
- @Override
- public boolean reserve(final DataSegment segment)
+ /**
+ * check if segment data is possibly corrupted.
+ * @param dir segments cache dir
+ * @return true means segment files may be damaged.
+ */
+ private static boolean isPossiblyCorrupted(File dir)
{
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- // Maybe the segment was already loaded. This check is required to
account for restart scenarios.
- if (null != findStoragePathIfCached(segment)) {
- return true;
- }
+ return hasStartMarker(dir);
+ }
- String storageDirStr = getSegmentDir(segment);
+ /**
+ * If {@link #DOWNLOAD_START_MARKER_FILE_NAME} exists in the path, the
segment files might be damaged because this
+ * file is typically deleted after the segment is pulled from deep storage.
+ */
+ private static boolean hasStartMarker(File localStorageDir)
+ {
+ final File downloadStartMarker = new File(localStorageDir.getPath(),
DOWNLOAD_START_MARKER_FILE_NAME);
+ return downloadStartMarker.exists();
Review Comment:
## Uncontrolled data used in path expression
This path depends on a [user-provided value](1).
[Show more
details](https://github.com/apache/druid/security/code-scanning/9296)
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -272,481 +449,422 @@
infoDir = new File(locations.get(0).getPath(), "info_dir");
} else {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
- .ofCategory(DruidException.Category.NOT_FOUND)
- .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
- + "or 'druid.segmentCache.locations' is set correctly.");
+ .ofCategory(DruidException.Category.NOT_FOUND)
+ .build("Could not determine infoDir. Make sure
'druid.segmentCache.infoDir' "
+ + "or 'druid.segmentCache.locations' is set
correctly.");
}
return infoDir;
}
- private static String getSegmentDir(DataSegment segment)
+ @Override
+ public void drop(DataSegment segment)
{
- return DataSegmentPusher.getDefaultStorageDir(segment, false);
+ final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+ final ReferenceCountingLock lock = lock(segment);
+ synchronized (lock) {
Review Comment:
## Uncontrolled data used in path expression
This path depends on a [user-provided value](1).
[Show more
details](https://github.com/apache/druid/security/code-scanning/5304)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]