github-advanced-security[bot] commented on code in PR #16475:
URL: https://github.com/apache/druid/pull/16475#discussion_r1614229992
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -112,44 +123,192 @@
log.info("Size of thread pool to load segments into page cache on
download [%d]",
config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload());
}
+
+ if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() != 0) {
+ loadSegmentsIntoPageCacheOnBootstrapExec = Execs.multiThreaded(
+ config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap(),
+ "Load-Segments-Into-Page-Cache-On-Bootstrap-%s"
+ );
+ log.info("Size of thread pool to load segments into page cache on
bootstrap [%d]",
+ config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap());
+ }
}
@VisibleForTesting
SegmentLocalCacheManager(
SegmentLoaderConfig config,
@Nonnull StorageLocationSelectorStrategy strategy,
+ IndexIO indexIO,
@Json ObjectMapper mapper
)
{
- this(config.toStorageLocations(), config, strategy, mapper);
+ this(config.toStorageLocations(), config, strategy, indexIO, mapper);
}
/**
* creates instance with default storage location selector strategy
*
* This ctor is mainly for test cases, including test cases in other modules
*/
- @VisibleForTesting
public SegmentLocalCacheManager(
SegmentLoaderConfig config,
+ IndexIO indexIO,
@Json ObjectMapper mapper
)
{
this.config = config;
+ this.indexIO = indexIO;
this.jsonMapper = mapper;
this.locations = config.toStorageLocations();
this.strategy = new
LeastBytesUsedStorageLocationSelectorStrategy(locations);
log.info("Using storage location strategy: [%s]",
this.strategy.getClass().getSimpleName());
}
- static String getSegmentDir(DataSegment segment)
+ @Override
+ public boolean canHandleSegments()
{
- return DataSegmentPusher.getDefaultStorageDir(segment, false);
+ return !(locations == null || locations.isEmpty());
}
@Override
- public boolean isSegmentCached(final DataSegment segment)
+ public List<DataSegment> getCachedSegments() throws IOException
+ {
+ if (!canHandleSegments()) {
+ throw DruidException.defensive(
+ "canHandleSegments() is false. getCachedSegments() must be invoked
only when canHandleSegments() returns true."
+ );
+ }
+ final File baseDir = getInfoDir();
+ FileUtils.mkdirp(baseDir);
+
+ List<DataSegment> cachedSegments = new ArrayList<>();
+ File[] segmentsToLoad = baseDir.listFiles();
+
+ int ignored = 0;
+
+ for (int i = 0; i < segmentsToLoad.length; i++) {
+ File file = segmentsToLoad[i];
+ log.info("Loading segment cache file [%d/%d][%s].", i + 1,
segmentsToLoad.length, file);
+ try {
+ final DataSegment segment = jsonMapper.readValue(file,
DataSegment.class);
+ if (!segment.getId().toString().equals(file.getName())) {
+ log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(),
segment.getId());
+ ignored++;
+ } else if (isSegmentCached(segment)) {
+ cachedSegments.add(segment);
+ } else {
+ final SegmentId segmentId = segment.getId();
+ log.warn("Unable to find cache file for segment[%s]. Deleting lookup
entry.", segmentId);
+ removeInfoFile(segment);
+ }
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to load segment from segmentInfo file")
+ .addData("file", file)
+ .emit();
+ }
+ }
+
+ if (ignored > 0) {
+ log.makeAlert("Ignored misnamed segment cache files on startup.")
+ .addData("numIgnored", ignored)
+ .emit();
+ }
+
+ return cachedSegments;
+ }
+
+ @Override
+ public void storeInfoFile(DataSegment segment) throws IOException
+ {
+ final File infoDir = getInfoDir();
+ FileUtils.mkdirp(infoDir);
+
+ final File segmentInfoCacheFile = new File(infoDir,
segment.getId().toString());
+ if (!segmentInfoCacheFile.exists()) {
+ jsonMapper.writeValue(segmentInfoCacheFile, segment);
Review Comment:
## Uncontrolled data used in path expression
This path depends on a [user-provided value](1).
[Show more
details](https://github.com/apache/druid/security/code-scanning/7396)
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -454,55 +615,58 @@
}
@Override
- public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService
exec)
+ public void loadSegmentIntoPageCache(DataSegment segment)
{
- ExecutorService execToUse = exec != null ? exec :
loadSegmentsIntoPageCacheOnDownloadExec;
- if (execToUse == null) {
+ if (loadSegmentsIntoPageCacheOnDownloadExec == null) {
return;
}
- execToUse.submit(
- () -> {
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- for (StorageLocation location : locations) {
- File localStorageDir = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (localStorageDir.exists()) {
- File baseFile = location.getPath();
- if (localStorageDir.equals(baseFile)) {
- continue;
- }
-
- log.info("Loading directory[%s] into page cache",
localStorageDir);
-
- File[] children = localStorageDir.listFiles();
- if (children != null) {
- for (File child : children) {
- InputStream in = null;
- try {
- in = new FileInputStream(child);
- IOUtils.copy(in, new NullOutputStream());
-
- log.info("Loaded [%s] into page cache",
child.getAbsolutePath());
- }
- catch (Exception e) {
- log.error("Failed to load [%s] into page cache, [%s]",
child.getAbsolutePath(), e.getMessage());
- }
- finally {
- IOUtils.closeQuietly(in);
- }
- }
- }
+ loadSegmentsIntoPageCacheOnDownloadExec.submit(() ->
loadSegmentIntoPageCacheInternal(segment));
+ }
+
+ @Override
+ public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
+ {
+ if (loadSegmentsIntoPageCacheOnBootstrapExec == null) {
+ return;
+ }
+ loadSegmentsIntoPageCacheOnBootstrapExec.submit(() ->
loadSegmentIntoPageCacheInternal(segment));
+ }
+
+ private void loadSegmentIntoPageCacheInternal(DataSegment segment)
+ {
+ final ReferenceCountingLock lock = createOrGetLock(segment);
+ synchronized (lock) {
+ try {
+ for (StorageLocation location : locations) {
+ File localStorageDir = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
+ if (localStorageDir.exists()) {
Review Comment:
## Uncontrolled data used in path expression
This path depends on a [user-provided value](1).
[Show more
details](https://github.com/apache/druid/security/code-scanning/7397)
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -454,55 +615,58 @@
}
@Override
- public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService
exec)
+ public void loadSegmentIntoPageCache(DataSegment segment)
{
- ExecutorService execToUse = exec != null ? exec :
loadSegmentsIntoPageCacheOnDownloadExec;
- if (execToUse == null) {
+ if (loadSegmentsIntoPageCacheOnDownloadExec == null) {
return;
}
- execToUse.submit(
- () -> {
- final ReferenceCountingLock lock = createOrGetLock(segment);
- synchronized (lock) {
- try {
- for (StorageLocation location : locations) {
- File localStorageDir = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (localStorageDir.exists()) {
- File baseFile = location.getPath();
- if (localStorageDir.equals(baseFile)) {
- continue;
- }
-
- log.info("Loading directory[%s] into page cache",
localStorageDir);
-
- File[] children = localStorageDir.listFiles();
- if (children != null) {
- for (File child : children) {
- InputStream in = null;
- try {
- in = new FileInputStream(child);
- IOUtils.copy(in, new NullOutputStream());
-
- log.info("Loaded [%s] into page cache",
child.getAbsolutePath());
- }
- catch (Exception e) {
- log.error("Failed to load [%s] into page cache, [%s]",
child.getAbsolutePath(), e.getMessage());
- }
- finally {
- IOUtils.closeQuietly(in);
- }
- }
- }
+ loadSegmentsIntoPageCacheOnDownloadExec.submit(() ->
loadSegmentIntoPageCacheInternal(segment));
+ }
+
+ @Override
+ public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
+ {
+ if (loadSegmentsIntoPageCacheOnBootstrapExec == null) {
+ return;
+ }
+ loadSegmentsIntoPageCacheOnBootstrapExec.submit(() ->
loadSegmentIntoPageCacheInternal(segment));
+ }
+
+ private void loadSegmentIntoPageCacheInternal(DataSegment segment)
+ {
+ final ReferenceCountingLock lock = createOrGetLock(segment);
+ synchronized (lock) {
+ try {
+ for (StorageLocation location : locations) {
+ File localStorageDir = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
+ if (localStorageDir.exists()) {
+ File baseFile = location.getPath();
+ if (localStorageDir.equals(baseFile)) {
+ continue;
+ }
+
+ log.info("Loading directory[%s] into page cache", localStorageDir);
+
+ File[] children = localStorageDir.listFiles();
+ if (children != null) {
+ for (File child : children) {
+ try (InputStream in = new FileInputStream(child)) {
+ IOUtils.copy(in, new NullOutputStream());
Review Comment:
## Deprecated method or constructor invocation
Invoking [NullOutputStream.NullOutputStream](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/7395)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]