github-advanced-security[bot] commented on code in PR #18176: URL: https://github.com/apache/druid/pull/18176#discussion_r2286646094
########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageFabricTest.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.query; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartControllerModule; +import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.apache.druid.msq.guice.IndexerMemoryManagementModule; +import org.apache.druid.msq.guice.MSQDurableStorageModule; +import org.apache.druid.msq.guice.MSQExternalDataSourceModule; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.guice.MSQSqlModule; +import org.apache.druid.msq.guice.SqlTaskModule; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Virtual storage fabric mode tests for classic native JSON queries + */ +class QueryVirtualStorageFabricTest extends EmbeddedClusterTestBase +{ + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedRouter router = new EmbeddedRouter(); + private final MinIOStorageResource storageResource = new MinIOStorageResource(); + + private EmbeddedMSQApis msqApis; + + @Override + public EmbeddedDruidCluster createCluster() + { + historical.addProperty("druid.segmentCache.isVirtualStorageFabric", "true") + .addProperty("druid.segmentCache.minVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addProperty("druid.segmentCache.maxVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addBeforeStartHook( + (cluster, self) -> self.addProperty( + "druid.segmentCache.locations", + StringUtils.format( + "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]", + cluster.getTestFolder().newFolder().getAbsolutePath(), + HumanReadableBytes.parse("1MiB") + ) + ) + ) + .addProperty("druid.server.maxSize", String.valueOf(HumanReadableBytes.parse("100MiB"))); + + coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always"); + + overlord.addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + indexer.setServerMemory(400_000_000) + .addProperty("druid.worker.capacity", "4") + .addProperty("druid.processing.numThreads", "3") + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addExtensions( + DartControllerModule.class, + DartWorkerModule.class, + DartControllerMemoryManagementModule.class, + DartWorkerMemoryManagementModule.class, + IndexerMemoryManagementModule.class, + MSQDurableStorageModule.class, + MSQIndexingModule.class, + MSQSqlModule.class, + SqlTaskModule.class, + MSQExternalDataSourceModule.class + ) + .addResource(storageResource) + .addServer(coordinator) + .addServer(overlord) + .addServer(indexer) + .addServer(historical) + .addServer(broker) + .addServer(router); + } + + @BeforeAll + void loadData() throws IOException + { + msqApis = new EmbeddedMSQApis(cluster, overlord); + dataSource = createTestDatasourceName(); + loadWikiData(); + } + + @Override + protected void beforeEachTest() + { + // don't change the datasource name for each run because we set things up before all tests + } + + @Test + void testQueryTooMuchData() + { + Throwable t = Assertions.assertThrows( + RuntimeException.class, + () -> cluster.runSql("select * from \"%s\"", dataSource) + ); + Assertions.assertTrue(t.getMessage().contains("Unable to load segment")); + Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough disk space has been allocated to load all segments involved in the query")); + } + + @Test + void testQueryPartials() + { + // at the time this test was written, we can divide the segments up into these intervals and fit the required + // segments in the cache, this is kind of brittle, but not quite sure what better to do and still expect exact + // results.. + // "2015-09-12T00:00:00Z/2025-09-12T08:00:00Z" + // "2015-09-12T08:00:00Z/2025-09-12T14:00:00Z" + // "2015-09-12T14:00:00Z/2025-09-12T19:00:00Z" + // "2015-09-12T19:00:00Z/2025-09-13T00:00:00Z" + final String[] queries = new String[]{ + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 00:00:00' and __time < TIMESTAMP '2015-09-12 08:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 08:00:00' and __time < TIMESTAMP '2015-09-12 14:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'" + }; + final long[] expectedResults = new long[] { + 9770, + 10524, + 10267, + 8683 + }; + + Assertions.assertEquals(expectedResults[0], Long.parseLong(cluster.runSql(queries[0], dataSource))); + Assertions.assertEquals(expectedResults[1], Long.parseLong(cluster.runSql(queries[1], dataSource))); + Assertions.assertEquals(expectedResults[2], Long.parseLong(cluster.runSql(queries[2], dataSource))); + Assertions.assertEquals(expectedResults[3], Long.parseLong(cluster.runSql(queries[3], dataSource))); Review Comment: ## Missing catch of NumberFormatException Potential uncaught 'java.lang.NumberFormatException'. [Show more details](https://github.com/apache/druid/security/code-scanning/10237) ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageFabricTest.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.query; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartControllerModule; +import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.apache.druid.msq.guice.IndexerMemoryManagementModule; +import org.apache.druid.msq.guice.MSQDurableStorageModule; +import org.apache.druid.msq.guice.MSQExternalDataSourceModule; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.guice.MSQSqlModule; +import org.apache.druid.msq.guice.SqlTaskModule; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Virtual storage fabric mode tests for classic native JSON queries + */ +class QueryVirtualStorageFabricTest extends EmbeddedClusterTestBase +{ + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedRouter router = new EmbeddedRouter(); + private final MinIOStorageResource storageResource = new MinIOStorageResource(); + + private EmbeddedMSQApis msqApis; + + @Override + public EmbeddedDruidCluster createCluster() + { + historical.addProperty("druid.segmentCache.isVirtualStorageFabric", "true") + .addProperty("druid.segmentCache.minVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addProperty("druid.segmentCache.maxVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addBeforeStartHook( + (cluster, self) -> self.addProperty( + "druid.segmentCache.locations", + StringUtils.format( + "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]", + cluster.getTestFolder().newFolder().getAbsolutePath(), + HumanReadableBytes.parse("1MiB") + ) + ) + ) + .addProperty("druid.server.maxSize", String.valueOf(HumanReadableBytes.parse("100MiB"))); + + coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always"); + + overlord.addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + indexer.setServerMemory(400_000_000) + .addProperty("druid.worker.capacity", "4") + .addProperty("druid.processing.numThreads", "3") + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addExtensions( + DartControllerModule.class, + DartWorkerModule.class, + DartControllerMemoryManagementModule.class, + DartWorkerMemoryManagementModule.class, + IndexerMemoryManagementModule.class, + MSQDurableStorageModule.class, + MSQIndexingModule.class, + MSQSqlModule.class, + SqlTaskModule.class, + MSQExternalDataSourceModule.class + ) + .addResource(storageResource) + .addServer(coordinator) + .addServer(overlord) + .addServer(indexer) + .addServer(historical) + .addServer(broker) + .addServer(router); + } + + @BeforeAll + void loadData() throws IOException + { + msqApis = new EmbeddedMSQApis(cluster, overlord); + dataSource = createTestDatasourceName(); + loadWikiData(); + } + + @Override + protected void beforeEachTest() + { + // don't change the datasource name for each run because we set things up before all tests + } + + @Test + void testQueryTooMuchData() + { + Throwable t = Assertions.assertThrows( + RuntimeException.class, + () -> cluster.runSql("select * from \"%s\"", dataSource) + ); + Assertions.assertTrue(t.getMessage().contains("Unable to load segment")); + Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough disk space has been allocated to load all segments involved in the query")); + } + + @Test + void testQueryPartials() + { + // at the time this test was written, we can divide the segments up into these intervals and fit the required + // segments in the cache, this is kind of brittle, but not quite sure what better to do and still expect exact + // results.. + // "2015-09-12T00:00:00Z/2025-09-12T08:00:00Z" + // "2015-09-12T08:00:00Z/2025-09-12T14:00:00Z" + // "2015-09-12T14:00:00Z/2025-09-12T19:00:00Z" + // "2015-09-12T19:00:00Z/2025-09-13T00:00:00Z" + final String[] queries = new String[]{ + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 00:00:00' and __time < TIMESTAMP '2015-09-12 08:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 08:00:00' and __time < TIMESTAMP '2015-09-12 14:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'" + }; + final long[] expectedResults = new long[] { + 9770, + 10524, + 10267, + 8683 + }; + + Assertions.assertEquals(expectedResults[0], Long.parseLong(cluster.runSql(queries[0], dataSource))); + Assertions.assertEquals(expectedResults[1], Long.parseLong(cluster.runSql(queries[1], dataSource))); Review Comment: ## Missing catch of NumberFormatException Potential uncaught 'java.lang.NumberFormatException'. [Show more details](https://github.com/apache/druid/security/code-scanning/10235) ########## server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java: ########## @@ -272,481 +554,460 @@ infoDir = new File(locations.get(0).getPath(), "info_dir"); } else { throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("Could not determine infoDir. Make sure 'druid.segmentCache.infoDir' " - + "or 'druid.segmentCache.locations' is set correctly."); + .ofCategory(DruidException.Category.NOT_FOUND) + .build("Could not determine infoDir. Make sure 'druid.segmentCache.infoDir' " + + "or 'druid.segmentCache.locations' is set correctly."); } return infoDir; } - private static String getSegmentDir(DataSegment segment) + private Supplier<ListenableFuture<Optional<Segment>>> makeOnDemandLoadSupplier( + final SegmentCacheEntry entry, + final StorageLocation location + ) { - return DataSegmentPusher.getDefaultStorageDir(segment, false); + return Suppliers.memoize( + () -> virtualStorageFabricLoadOnDemandExec.submit( + () -> { + entry.mount(location); + return entry.acquireReference(); + } + ) + ); } - /** - * Checks whether a segment is already cached. It can return false even if {@link #reserve(DataSegment)} - * has been successful for a segment but is not downloaded yet. - */ - boolean isSegmentCached(final DataSegment segment) + private ReferenceCountingLock lock(final DataSegment dataSegment) { - return findStoragePathIfCached(segment) != null; + return segmentLocks.compute( + dataSegment, + (segment, lock) -> { + final ReferenceCountingLock nonNullLock; + if (lock == null) { + nonNullLock = new ReferenceCountingLock(); + } else { + nonNullLock = lock; + } + nonNullLock.increment(); + return nonNullLock; + } + ); } - /** - * This method will try to find if the segment is already downloaded on any location. If so, the segment path - * is returned. Along with that, location state is also updated with the segment location. Refer to - * {@link StorageLocation#maybeReserve(String, DataSegment)} for more details. - * If the segment files are damaged in any location, they are removed from the location. - * @param segment - Segment to check - * @return - Path corresponding to segment directory if found, null otherwise. - */ - @Nullable - private File findStoragePathIfCached(final DataSegment segment) + private void unlock(final DataSegment dataSegment, final ReferenceCountingLock lock) { - for (StorageLocation location : locations) { - String storageDir = getSegmentDir(segment); - File localStorageDir = location.segmentDirectoryAsFile(storageDir); - if (localStorageDir.exists()) { - if (checkSegmentFilesIntact(localStorageDir)) { - log.warn( - "[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", - localStorageDir.getAbsolutePath() - ); - cleanupCacheFiles(location.getPath(), localStorageDir); - location.removeSegmentDir(localStorageDir, segment); - break; - } else { - // Before returning, we also reserve the space. Refer to the StorageLocation#maybeReserve documentation for details. - location.maybeReserve(storageDir, segment); - return localStorageDir; + segmentLocks.compute( + dataSegment, + (segment, existingLock) -> { + if (existingLock == null) { + throw new ISE("Lock has already been removed"); + } else if (existingLock != lock) { + throw new ISE("Different lock instance"); + } else { + if (existingLock.numReferences == 1) { + return null; + } else { + existingLock.decrement(); + return existingLock; + } + } } - } - } - return null; + ); } - /** - * check data intact. - * @param dir segments cache dir - * @return true means segment files may be damaged. - */ - private boolean checkSegmentFilesIntact(File dir) + private SegmentCacheEntry assignLocationAndMount( + final SegmentCacheEntry cacheEntry, + final SegmentLazyLoadFailCallback segmentLoadFailCallback + ) throws SegmentLoadingException { - return checkSegmentFilesIntactWithStartMarker(dir); + try { + for (StorageLocation location : locations) { + if (cacheEntry.checkExists(location.getPath())) { + if (location.isReserved(cacheEntry.id) || location.reserve(cacheEntry)) { + final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id); + entry.lazyLoadCallback = segmentLoadFailCallback; + entry.mount(location); + return entry; + } else { + // entry is not reserved, clean it up + deleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath())); + } + } + } + } + catch (SegmentLoadingException e) { + log.warn(e, "Failed to load segment[%s] in existing location, trying new location", cacheEntry.id); + } + final Iterator<StorageLocation> locationsIterator = strategy.getLocations(); + while (locationsIterator.hasNext()) { + final StorageLocation location = locationsIterator.next(); + if (location.reserve(cacheEntry)) { + try { + final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id); + entry.lazyLoadCallback = segmentLoadFailCallback; + entry.mount(location); + return entry; + } + catch (SegmentLoadingException e) { + log.warn(e, "Failed to load segment[%s] in location[%s], trying next location", cacheEntry.id, location.getPath()); + } + } + } + throw new SegmentLoadingException("Failed to load segment[%s] in all locations.", cacheEntry.id); } /** - * If there is 'downloadStartMarker' existed in localStorageDir, the segments files might be damaged. - * Because each time, Druid will delete the 'downloadStartMarker' file after pulling and unzip the segments from DeepStorage. - * downloadStartMarker existed here may mean something error during download segments and the segment files may be damaged. + * Deletes a directory and logs about it. This method should only be called under the lock of a {@link #segmentLocks} */ - private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) + private static void deleteCacheEntryDirectory(final File path) { - final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME); - return downloadStartMarker.exists(); + log.info("Deleting directory[%s]", path); + try { + FileUtils.deleteDirectory(path); + } + catch (Exception e) { + log.error(e, "Unable to remove directory[%s]", path); + } } /** - * Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged. - * @param segment - * @return - * @throws SegmentLoadingException + * Calls {@link #deleteCacheEntryDirectory(File)} and then checks parent path if it is empty, and recursively + * continues until a non-empty directory or the base path is reached. This method is not thread-safe, and should only + * be used by a single caller. */ - @Override - public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException + private static void cleanupLegacyCacheLocation(final File baseFile, final File cacheFile) { - final ReferenceCountingLock lock = createOrGetLock(segment); - synchronized (lock) { - try { - File segmentDir = findStoragePathIfCached(segment); - if (segmentDir != null) { - return segmentDir; - } + if (cacheFile.equals(baseFile)) { + return; + } - return loadSegmentWithRetry(segment); - } - finally { - unlock(segment, lock); + deleteCacheEntryDirectory(cacheFile); + + File parent = cacheFile.getParentFile(); + if (parent != null) { + File[] children = parent.listFiles(); + if (children == null || children.length == 0) { + cleanupLegacyCacheLocation(baseFile, parent); } } } /** - * If we have already reserved a location before, probably via {@link #reserve(DataSegment)}, then only that location - * should be tried. Otherwise, we would fetch locations using {@link StorageLocationSelectorStrategy} and try all - * of them one by one till there is success. - * Location may fail because of IO failure, most likely in two cases:<p> - * 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p> - * 2. disk failure, druid can't read/write to this disk anymore - * <p> - * Locations are fetched using {@link StorageLocationSelectorStrategy}. + * check if segment data is possibly corrupted. + * @param dir segments cache dir + * @return true means segment files may be damaged. */ - private File loadSegmentWithRetry(DataSegment segment) throws SegmentLoadingException + private static boolean isPossiblyCorrupted(final File dir) { - String segmentDir = getSegmentDir(segment); - - // Try the already reserved location. If location has been reserved outside, then we do not release the location - // here and simply delete any downloaded files. That is, we revert anything we do in this function and nothing else. - for (StorageLocation loc : locations) { - if (loc.isReserved(segmentDir)) { - File storageDir = loc.segmentDirectoryAsFile(segmentDir); - boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, false); - if (!success) { - throw new SegmentLoadingException( - "Failed to load segment[%s] in reserved location[%s]", segment.getId(), loc.getPath().getAbsolutePath() - ); - } - return storageDir; - } - } - - // No location was reserved so we try all the locations - Iterator<StorageLocation> locationsIterator = strategy.getLocations(); - while (locationsIterator.hasNext()) { - - StorageLocation loc = locationsIterator.next(); - - // storageDir is the file path corresponding to segment dir - File storageDir = loc.reserve(segmentDir, segment); - if (storageDir != null) { - boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, true); - if (success) { - return storageDir; - } - } - } - throw new SegmentLoadingException("Failed to load segment[%s] in all locations.", segment.getId()); + return hasStartMarker(dir); } /** - * A helper method over {@link #loadInLocationWithStartMarker(DataSegment, File)} that catches the {@link SegmentLoadingException} - * and emits alerts. - * @param loc - {@link StorageLocation} where segment is to be downloaded in. - * @param segment - {@link DataSegment} to download - * @param storageDir - {@link File} pointing to segment directory - * @param releaseLocation - Whether to release the location in case of failures - * @return - True if segment was downloaded successfully, false otherwise. + * If {@link #DOWNLOAD_START_MARKER_FILE_NAME} exists in the path, the segment files might be damaged because this + * file is typically deleted after the segment is pulled from deep storage. */ - private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc, DataSegment segment, File storageDir, boolean releaseLocation) + private static boolean hasStartMarker(final File localStorageDir) { - try { - loadInLocationWithStartMarker(segment, storageDir); - return true; - } - catch (SegmentLoadingException e) { - try { - log.makeAlert( - e, - "Failed to load segment in current location [%s], try next location if any", - loc.getPath().getAbsolutePath() - ).addData("location", loc.getPath().getAbsolutePath()).emit(); - } - finally { - if (releaseLocation) { - loc.removeSegmentDir(storageDir, segment); - } - cleanupCacheFiles(loc.getPath(), storageDir); - } - } - return false; + final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME); + return downloadStartMarker.exists(); } - private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException + private static final class ReferenceCountingLock { - // We use a marker to prevent the case where a segment is downloaded, but before the download completes, - // the parent directories of the segment are removed - final File downloadStartMarker = new File(storageDir, DOWNLOAD_START_MARKER_FILE_NAME); - synchronized (directoryWriteRemoveLock) { - try { - FileUtils.mkdirp(storageDir); - - if (!downloadStartMarker.createNewFile()) { - throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir); - } - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir); - } - } - loadInLocation(segment, storageDir); + private int numReferences; - if (!downloadStartMarker.delete()) { - throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir); + private void increment() + { + ++numReferences; } - } - private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException - { - // LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the - // LoadSpec dependencies. - final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class); - final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir); - if (result.getSize() != segment.getSize()) { - log.warn( - "Segment [%s] is different than expected size. Expected [%d] found [%d]", - segment.getId(), - segment.getSize(), - result.getSize() - ); + private void decrement() + { + --numReferences; } } - @Override - public boolean reserve(final DataSegment segment) + private final class SegmentCacheEntry implements CacheEntry { - final ReferenceCountingLock lock = createOrGetLock(segment); - synchronized (lock) { - try { - // Maybe the segment was already loaded. This check is required to account for restart scenarios. - if (null != findStoragePathIfCached(segment)) { - return true; - } + private final SegmentCacheEntryIdentifier id; + private final DataSegment dataSegment; + private final String relativePathString; + // guarded by segment lock + private SegmentLazyLoadFailCallback lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP; + // guarded by segment lock + private StorageLocation location; + // guarded by segment lock + private File storageDir; + @GuardedBy("this") + private ReferenceCountedSegmentProvider referenceProvider; - String storageDirStr = getSegmentDir(segment); + private SegmentCacheEntry(final DataSegment dataSegment) + { + this.dataSegment = dataSegment; + this.id = new SegmentCacheEntryIdentifier(dataSegment.getId()); + this.relativePathString = dataSegment.getId().toString(); + } - // check if we already reserved the segment - for (StorageLocation location : locations) { - if (location.isReserved(storageDirStr)) { - return true; - } - } + @Override + public SegmentCacheEntryIdentifier getId() + { + return id; + } - // Not found in any location, reserve now - for (Iterator<StorageLocation> it = strategy.getLocations(); it.hasNext(); ) { - StorageLocation location = it.next(); - if (null != location.reserve(storageDirStr, segment)) { - return true; - } - } - } - finally { - unlock(segment, lock); - } + @Override + public long getSize() + { + return dataSegment.getSize(); } - return false; - } + @Override + public synchronized boolean isMounted() + { + return referenceProvider != null; + } - @Override - public boolean release(final DataSegment segment) - { - final ReferenceCountingLock lock = createOrGetLock(segment); - synchronized (lock) { + @Override + public void mount(StorageLocation mountLocation) throws SegmentLoadingException + { + final Lock lock = mountLocation.getLock().readLock(); + lock.lock(); try { - String storageDir = getSegmentDir(segment); - // Release the first location encountered - for (StorageLocation location : locations) { - if (location.isReserved(storageDir)) { - File localStorageDir = location.segmentDirectoryAsFile(storageDir); - if (localStorageDir.exists()) { - throw new ISE( - "Asking to release a location '%s' while the segment directory '%s' is present on disk. Any state on disk must be deleted before releasing", - location.getPath().getAbsolutePath(), - localStorageDir.getAbsolutePath() + if (!mountLocation.isReserved(this.id) && !mountLocation.isWeakReserved(this.id)) { + log.debug( + "aborting mount in location[%s] since entry[%s] is no longer reserved", + mountLocation.getPath(), + this.id + ); + return; + } + synchronized (this) { + if (location != null) { + log.debug( + "already mounted [%s] in location[%s], but asked to load in [%s], unmounting old location", + id, + location.getPath(), + mountLocation.getPath() + ); + if (!location.equals(mountLocation)) { + throw new SegmentLoadingException( + "already mounted[%s] in location[%s] which is different from requested[%s]", + id, + location.getPath(), + mountLocation.getPath() + ); + } else { + log.debug("already mounted [%s] in location[%s]", id, mountLocation.getPath()); + return; + } + } + location = mountLocation; + storageDir = new File(location.getPath(), relativePathString); + boolean needsLoad = true; + if (storageDir.exists()) { Review Comment: ## Uncontrolled data used in path expression This path depends on a [user-provided value](1). [Show more details](https://github.com/apache/druid/security/code-scanning/10233) ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageFabricTest.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.query; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartControllerModule; +import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.apache.druid.msq.guice.IndexerMemoryManagementModule; +import org.apache.druid.msq.guice.MSQDurableStorageModule; +import org.apache.druid.msq.guice.MSQExternalDataSourceModule; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.guice.MSQSqlModule; +import org.apache.druid.msq.guice.SqlTaskModule; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Virtual storage fabric mode tests for classic native JSON queries + */ +class QueryVirtualStorageFabricTest extends EmbeddedClusterTestBase +{ + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedRouter router = new EmbeddedRouter(); + private final MinIOStorageResource storageResource = new MinIOStorageResource(); + + private EmbeddedMSQApis msqApis; + + @Override + public EmbeddedDruidCluster createCluster() + { + historical.addProperty("druid.segmentCache.isVirtualStorageFabric", "true") + .addProperty("druid.segmentCache.minVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addProperty("druid.segmentCache.maxVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addBeforeStartHook( + (cluster, self) -> self.addProperty( + "druid.segmentCache.locations", + StringUtils.format( + "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]", + cluster.getTestFolder().newFolder().getAbsolutePath(), + HumanReadableBytes.parse("1MiB") + ) + ) + ) + .addProperty("druid.server.maxSize", String.valueOf(HumanReadableBytes.parse("100MiB"))); + + coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always"); + + overlord.addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + indexer.setServerMemory(400_000_000) + .addProperty("druid.worker.capacity", "4") + .addProperty("druid.processing.numThreads", "3") + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addExtensions( + DartControllerModule.class, + DartWorkerModule.class, + DartControllerMemoryManagementModule.class, + DartWorkerMemoryManagementModule.class, + IndexerMemoryManagementModule.class, + MSQDurableStorageModule.class, + MSQIndexingModule.class, + MSQSqlModule.class, + SqlTaskModule.class, + MSQExternalDataSourceModule.class + ) + .addResource(storageResource) + .addServer(coordinator) + .addServer(overlord) + .addServer(indexer) + .addServer(historical) + .addServer(broker) + .addServer(router); + } + + @BeforeAll + void loadData() throws IOException + { + msqApis = new EmbeddedMSQApis(cluster, overlord); + dataSource = createTestDatasourceName(); + loadWikiData(); + } + + @Override + protected void beforeEachTest() + { + // don't change the datasource name for each run because we set things up before all tests + } + + @Test + void testQueryTooMuchData() + { + Throwable t = Assertions.assertThrows( + RuntimeException.class, + () -> cluster.runSql("select * from \"%s\"", dataSource) + ); + Assertions.assertTrue(t.getMessage().contains("Unable to load segment")); + Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough disk space has been allocated to load all segments involved in the query")); + } + + @Test + void testQueryPartials() + { + // at the time this test was written, we can divide the segments up into these intervals and fit the required + // segments in the cache, this is kind of brittle, but not quite sure what better to do and still expect exact + // results.. + // "2015-09-12T00:00:00Z/2025-09-12T08:00:00Z" + // "2015-09-12T08:00:00Z/2025-09-12T14:00:00Z" + // "2015-09-12T14:00:00Z/2025-09-12T19:00:00Z" + // "2015-09-12T19:00:00Z/2025-09-13T00:00:00Z" + final String[] queries = new String[]{ + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 00:00:00' and __time < TIMESTAMP '2015-09-12 08:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 08:00:00' and __time < TIMESTAMP '2015-09-12 14:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'" + }; + final long[] expectedResults = new long[] { + 9770, + 10524, + 10267, + 8683 + }; + + Assertions.assertEquals(expectedResults[0], Long.parseLong(cluster.runSql(queries[0], dataSource))); Review Comment: ## Missing catch of NumberFormatException Potential uncaught 'java.lang.NumberFormatException'. [Show more details](https://github.com/apache/druid/security/code-scanning/10234) ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageFabricTest.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.query; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartControllerModule; +import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.apache.druid.msq.guice.IndexerMemoryManagementModule; +import org.apache.druid.msq.guice.MSQDurableStorageModule; +import org.apache.druid.msq.guice.MSQExternalDataSourceModule; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.guice.MSQSqlModule; +import org.apache.druid.msq.guice.SqlTaskModule; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Virtual storage fabric mode tests for classic native JSON queries + */ +class QueryVirtualStorageFabricTest extends EmbeddedClusterTestBase +{ + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedRouter router = new EmbeddedRouter(); + private final MinIOStorageResource storageResource = new MinIOStorageResource(); + + private EmbeddedMSQApis msqApis; + + @Override + public EmbeddedDruidCluster createCluster() + { + historical.addProperty("druid.segmentCache.isVirtualStorageFabric", "true") + .addProperty("druid.segmentCache.minVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addProperty("druid.segmentCache.maxVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addBeforeStartHook( + (cluster, self) -> self.addProperty( + "druid.segmentCache.locations", + StringUtils.format( + "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]", + cluster.getTestFolder().newFolder().getAbsolutePath(), + HumanReadableBytes.parse("1MiB") + ) + ) + ) + .addProperty("druid.server.maxSize", String.valueOf(HumanReadableBytes.parse("100MiB"))); + + coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always"); + + overlord.addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + indexer.setServerMemory(400_000_000) + .addProperty("druid.worker.capacity", "4") + .addProperty("druid.processing.numThreads", "3") + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addExtensions( + DartControllerModule.class, + DartWorkerModule.class, + DartControllerMemoryManagementModule.class, + DartWorkerMemoryManagementModule.class, + IndexerMemoryManagementModule.class, + MSQDurableStorageModule.class, + MSQIndexingModule.class, + MSQSqlModule.class, + SqlTaskModule.class, + MSQExternalDataSourceModule.class + ) + .addResource(storageResource) + .addServer(coordinator) + .addServer(overlord) + .addServer(indexer) + .addServer(historical) + .addServer(broker) + .addServer(router); + } + + @BeforeAll + void loadData() throws IOException + { + msqApis = new EmbeddedMSQApis(cluster, overlord); + dataSource = createTestDatasourceName(); + loadWikiData(); + } + + @Override + protected void beforeEachTest() + { + // don't change the datasource name for each run because we set things up before all tests + } + + @Test + void testQueryTooMuchData() + { + Throwable t = Assertions.assertThrows( + RuntimeException.class, + () -> cluster.runSql("select * from \"%s\"", dataSource) + ); + Assertions.assertTrue(t.getMessage().contains("Unable to load segment")); + Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough disk space has been allocated to load all segments involved in the query")); + } + + @Test + void testQueryPartials() + { + // at the time this test was written, we can divide the segments up into these intervals and fit the required + // segments in the cache, this is kind of brittle, but not quite sure what better to do and still expect exact + // results.. + // "2015-09-12T00:00:00Z/2025-09-12T08:00:00Z" + // "2015-09-12T08:00:00Z/2025-09-12T14:00:00Z" + // "2015-09-12T14:00:00Z/2025-09-12T19:00:00Z" + // "2015-09-12T19:00:00Z/2025-09-13T00:00:00Z" + final String[] queries = new String[]{ + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 00:00:00' and __time < TIMESTAMP '2015-09-12 08:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 08:00:00' and __time < TIMESTAMP '2015-09-12 14:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'" + }; + final long[] expectedResults = new long[] { + 9770, + 10524, + 10267, + 8683 + }; + + Assertions.assertEquals(expectedResults[0], Long.parseLong(cluster.runSql(queries[0], dataSource))); + Assertions.assertEquals(expectedResults[1], Long.parseLong(cluster.runSql(queries[1], dataSource))); + Assertions.assertEquals(expectedResults[2], Long.parseLong(cluster.runSql(queries[2], dataSource))); Review Comment: ## Missing catch of NumberFormatException Potential uncaught 'java.lang.NumberFormatException'. [Show more details](https://github.com/apache/druid/security/code-scanning/10236) ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageFabricTest.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.query; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartControllerModule; +import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.apache.druid.msq.guice.IndexerMemoryManagementModule; +import org.apache.druid.msq.guice.MSQDurableStorageModule; +import org.apache.druid.msq.guice.MSQExternalDataSourceModule; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.guice.MSQSqlModule; +import org.apache.druid.msq.guice.SqlTaskModule; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Virtual storage fabric mode tests for classic native JSON queries + */ +class QueryVirtualStorageFabricTest extends EmbeddedClusterTestBase +{ + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedRouter router = new EmbeddedRouter(); + private final MinIOStorageResource storageResource = new MinIOStorageResource(); + + private EmbeddedMSQApis msqApis; + + @Override + public EmbeddedDruidCluster createCluster() + { + historical.addProperty("druid.segmentCache.isVirtualStorageFabric", "true") + .addProperty("druid.segmentCache.minVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addProperty("druid.segmentCache.maxVirtualStorageFabricLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addBeforeStartHook( + (cluster, self) -> self.addProperty( + "druid.segmentCache.locations", + StringUtils.format( + "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]", + cluster.getTestFolder().newFolder().getAbsolutePath(), + HumanReadableBytes.parse("1MiB") + ) + ) + ) + .addProperty("druid.server.maxSize", String.valueOf(HumanReadableBytes.parse("100MiB"))); + + coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always"); + + overlord.addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + indexer.setServerMemory(400_000_000) + .addProperty("druid.worker.capacity", "4") + .addProperty("druid.processing.numThreads", "3") + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addExtensions( + DartControllerModule.class, + DartWorkerModule.class, + DartControllerMemoryManagementModule.class, + DartWorkerMemoryManagementModule.class, + IndexerMemoryManagementModule.class, + MSQDurableStorageModule.class, + MSQIndexingModule.class, + MSQSqlModule.class, + SqlTaskModule.class, + MSQExternalDataSourceModule.class + ) + .addResource(storageResource) + .addServer(coordinator) + .addServer(overlord) + .addServer(indexer) + .addServer(historical) + .addServer(broker) + .addServer(router); + } + + @BeforeAll + void loadData() throws IOException + { + msqApis = new EmbeddedMSQApis(cluster, overlord); + dataSource = createTestDatasourceName(); + loadWikiData(); + } + + @Override + protected void beforeEachTest() + { + // don't change the datasource name for each run because we set things up before all tests + } + + @Test + void testQueryTooMuchData() + { + Throwable t = Assertions.assertThrows( + RuntimeException.class, + () -> cluster.runSql("select * from \"%s\"", dataSource) + ); + Assertions.assertTrue(t.getMessage().contains("Unable to load segment")); + Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough disk space has been allocated to load all segments involved in the query")); + } + + @Test + void testQueryPartials() + { + // at the time this test was written, we can divide the segments up into these intervals and fit the required + // segments in the cache, this is kind of brittle, but not quite sure what better to do and still expect exact + // results.. + // "2015-09-12T00:00:00Z/2025-09-12T08:00:00Z" + // "2015-09-12T08:00:00Z/2025-09-12T14:00:00Z" + // "2015-09-12T14:00:00Z/2025-09-12T19:00:00Z" + // "2015-09-12T19:00:00Z/2025-09-13T00:00:00Z" + final String[] queries = new String[]{ + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 00:00:00' and __time < TIMESTAMP '2015-09-12 08:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 08:00:00' and __time < TIMESTAMP '2015-09-12 14:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'", + "select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'" + }; + final long[] expectedResults = new long[] { + 9770, + 10524, + 10267, + 8683 + }; + + Assertions.assertEquals(expectedResults[0], Long.parseLong(cluster.runSql(queries[0], dataSource))); + Assertions.assertEquals(expectedResults[1], Long.parseLong(cluster.runSql(queries[1], dataSource))); + Assertions.assertEquals(expectedResults[2], Long.parseLong(cluster.runSql(queries[2], dataSource))); + Assertions.assertEquals(expectedResults[3], Long.parseLong(cluster.runSql(queries[3], dataSource))); + + for (int i = 0; i < 1000; i++) { + int nextQuery = ThreadLocalRandom.current().nextInt(queries.length); + Assertions.assertEquals(expectedResults[nextQuery], Long.parseLong(cluster.runSql(queries[nextQuery], dataSource))); Review Comment: ## Missing catch of NumberFormatException Potential uncaught 'java.lang.NumberFormatException'. [Show more details](https://github.com/apache/druid/security/code-scanning/10238) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
