github-advanced-security[bot] commented on code in PR #18176:
URL: https://github.com/apache/druid/pull/18176#discussion_r2183829593
##########
server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java:
##########
@@ -78,86 +83,201 @@
new InjectableValues.Std().addValue(
LocalDataSegmentPuller.class,
new LocalDataSegmentPuller()
+ ).addValue(
+ IndexIO.class,
+ TestHelper.getTestIndexIO()
)
);
segmentVersion = DateTimes.nowUtc().toString();
}
- @Before
+ @BeforeEach
public void setUp() throws Exception
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
- localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
+ localSegmentCacheFolder = new File(tempDir, "segment_cache_folder");
+ segmentsToLoad = new ArrayList<>();
+ segmentsToWeakLoad = new ArrayList<>();
final List<StorageLocationConfig> locations = new ArrayList<>();
- // Each segment has the size of 1000 bytes. This deep storage is capable
of storing up to 2 segments.
- final StorageLocationConfig locationConfig = new
StorageLocationConfig(localSegmentCacheFolder, 2000L, null);
+ // Each segment has the size of 1000 bytes. This deep storage is capable
of storing up to 8 segments.
+ final StorageLocationConfig locationConfig = new
StorageLocationConfig(localSegmentCacheFolder, 8000L, null);
locations.add(locationConfig);
final SegmentLoaderConfig loaderConfig = new
SegmentLoaderConfig().withLocations(locations);
+ final SegmentLoaderConfig vsfLoaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return locations;
+ }
+
+ @Override
+ public boolean isVirtualStorageFabric()
+ {
+ return true;
+ }
+ };
final List<StorageLocation> storageLocations =
loaderConfig.toStorageLocations();
manager = new SegmentLocalCacheManager(
storageLocations,
- new SegmentLoaderConfig().withLocations(locations),
+ loaderConfig,
+ new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
+ TestIndex.INDEX_IO,
+ jsonMapper
+ );
+ virtualStorageFabricManager = new SegmentLocalCacheManager(
+ storageLocations,
+ vsfLoaderConfig,
new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
TestIndex.INDEX_IO,
jsonMapper
);
- executorService = Execs.multiThreaded(4,
"segment-loader-local-cache-manager-concurrency-test-%d");
+ executorService = Execs.multiThreaded(10,
"segment-loader-local-cache-manager-concurrency-test-%d");
}
- @After
+ @AfterEach
public void tearDown()
{
+ for (DataSegment segment : segmentsToLoad) {
+ manager.drop(segment);
+ }
+ for (DataSegment segment : segmentsToWeakLoad) {
+ virtualStorageFabricManager.drop(segment);
+ }
executorService.shutdownNow();
}
@Test
- public void testGetSegment() throws IOException, ExecutionException,
InterruptedException
+ public void testAcquireSegment() throws IOException, ExecutionException,
InterruptedException
+ {
+ final File localStorageFolder = new File(tempDir, "local_storage_folder");
+
+
+ final Interval interval = Intervals.of("2019-01-01/P1D");
+ makeSegmentsToLoad(8, localStorageFolder, interval, segmentsToLoad);
+
+ final List<Future<?>> futures = segmentsToLoad
+ .stream()
+ .map(segment -> executorService.submit(new Load(manager, segment)))
+ .collect(Collectors.toList());
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ Assertions.assertTrue(true);
+ }
+
+ @Test
+ public void testAcquireSegmentFailTooManySegments() throws IOException
+ {
+ final File localStorageFolder = new File("local_storage_folder");
+
+ final Interval interval = Intervals.of("2019-01-01/P1D");
+ makeSegmentsToLoad(9, localStorageFolder, interval, segmentsToLoad);
+
+ final List<Future<?>> futures = segmentsToLoad
+ .stream()
+ .map(segment -> executorService.submit(new Load(manager, segment)))
+ .collect(Collectors.toList());
+
+ Throwable t = Assertions.assertThrows(
+ ExecutionException.class,
+ () -> {
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
+ );
+ Assertions.assertInstanceOf(SegmentLoadingException.class, t.getCause());
+ Assertions.assertTrue(t.getCause().getMessage().contains("Failed to load
segment"));
+ Assertions.assertTrue(t.getCause().getMessage().contains("in all
locations."));
+ }
+
+
+ @Test
+ public void testAcquireSegmentOnDemand() throws IOException
{
- final File localStorageFolder =
tmpFolder.newFolder("local_storage_folder");
- final List<DataSegment> segmentsToLoad = new ArrayList<>(4);
+ final File localStorageFolder = new File(tempDir, "local_storage_folder");
final Interval interval = Intervals.of("2019-01-01/P1D");
- for (int partitionId = 0; partitionId < 4; partitionId++) {
+ makeSegmentsToLoad(100, localStorageFolder, interval, segmentsToWeakLoad);
+
+
+ List<DataSegment> currentBatch = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ // process batches of 10 requests at a time
+ if (i > 0 && i % 10 == 0) {
+ final List<Future<Integer>> futures = currentBatch
+ .stream()
+ .map(segment -> executorService.submit(new
WeakLoad(virtualStorageFabricManager, segment)))
+ .collect(Collectors.toList());
+ List<Throwable> exceptions = new ArrayList<>();
+ int success = 0;
+ int rows = 0;
Review Comment:
## Unread local variable
Variable 'int rows' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/9356)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]