This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a9ca2da52a0 feat: use java virtual threads for virtual storage load on
demand thread pool by default (#19396)
a9ca2da52a0 is described below
commit a9ca2da52a04067f99feee6db5caff8cb07fbe3d
Author: Clint Wylie <[email protected]>
AuthorDate: Wed May 6 15:19:04 2026 -0700
feat: use java virtual threads for virtual storage load on demand thread
pool by default (#19396)
changes:
* switch default `SegmentLocalCacheManager.virtualStorageLoadOnDemandExec`
to virtual threads with a `Semaphore` for backpressure
* added `SegmentLoaderConfig.virtualStorageUseVirtualThreads`
(`druid.segmentCache.virtualStorageUseVirtualThreads`) config that defaults to
true, but allows opt-out via setting to false
* raise default `SegmentLoaderConfig.virtualStorageLoadThreads` default to
Math.max(32, 4 * cores), sized as ~4x lookahead per processing thread
* convert `SegmentCacheEntry` from `synchronized` to `ReentrantLock` so
virtual threads park instead of pinning the carrier during mount
* ensure virtualStorageLoadThreads is greater than 0
---
.../druid/segment/loading/SegmentLoaderConfig.java | 17 +-
.../segment/loading/SegmentLocalCacheManager.java | 177 +++++++++++++++------
.../loading/SegmentLocalCacheManagerTest.java | 44 +++++
3 files changed, 191 insertions(+), 47 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
index de3abfd30b8..3f259eb0e15 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
@@ -75,7 +75,16 @@ public class SegmentLoaderConfig
private boolean virtualStorage = false;
@JsonProperty("virtualStorageLoadThreads")
- private int virtualStorageLoadThreads = 2 *
runtimeInfo.getAvailableProcessors();
+ private int virtualStorageLoadThreads = Math.max(32, 4 *
runtimeInfo.getAvailableProcessors());
+
+ /**
+ * When true (the default), the on-demand load executor uses one virtual
thread per task with a {@link
+ * java.util.concurrent.Semaphore} sized by {@link
#virtualStorageLoadThreads} for backpressure. When false, falls back
+ * to a fixed platform-thread pool of that size. The escape hatch exists in
case virtual threads behave poorly with a
+ * particular deep storage SDK or workload.
+ */
+ @JsonProperty("virtualStorageUseVirtualThreads")
+ private boolean virtualStorageUseVirtualThreads = true;
/**
* When enabled, weakly-held cache entries are evicted immediately upon
release of all holds, rather than
@@ -162,6 +171,11 @@ public class SegmentLoaderConfig
return virtualStorageLoadThreads;
}
+ public boolean isVirtualStorageUseVirtualThreads()
+ {
+ return virtualStorageUseVirtualThreads;
+ }
+
public boolean isVirtualStorageEphemeral()
{
return virtualStorageIsEphemeral;
@@ -218,6 +232,7 @@ public class SegmentLoaderConfig
", statusQueueMaxSize=" + statusQueueMaxSize +
", virtualStorage=" + virtualStorage +
", virtualStorageLoadThreads=" + virtualStorageLoadThreads +
+ ", virtualStorageUseVirtualThreads=" +
virtualStorageUseVirtualThreads +
", virtualStorageIsEphemeral=" + virtualStorageIsEphemeral +
", combinedMaxSize=" + combinedMaxSize +
'}';
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 9b5496fbadb..cd36f54ef26 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -64,9 +64,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
/**
@@ -116,6 +118,12 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
private final IndexIO indexIO;
private final ListeningExecutorService virtualStorageLoadOnDemandExec;
+ /**
+ * Bounds the number of in-flight on-demand loads when running with virtual
threads. Null when virtual storage is
+ * disabled or when the legacy fixed-pool path is selected (the pool size is
the implicit limit there).
+ */
+ @Nullable
+ private final Semaphore virtualStorageLoadOnDemandPermits;
private ExecutorService loadOnBootstrapExec = null;
private ExecutorService loadOnDownloadExec = null;
@@ -137,29 +145,51 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
log.info("Using storage location strategy[%s].",
this.strategy.getClass().getSimpleName());
if (config.isVirtualStorage()) {
- log.info(
- "Using virtual storage mode - on demand load threads: [%d].",
- config.getVirtualStorageLoadThreads()
- );
if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload() > 0) {
throw DruidException.defensive("Invalid configuration: virtualStorage
is incompatible with numThreadsToLoadSegmentsIntoPageCacheOnDownload");
}
if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() > 0) {
throw DruidException.defensive("Invalid configuration: virtualStorage
is incompatible with numThreadsToLoadSegmentsIntoPageCacheOnBootstrap");
}
+ if (config.getVirtualStorageLoadThreads() <= 0) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "virtualStorageLoadThreads must be greater
than 0, got [%d]",
+ config.getVirtualStorageLoadThreads()
+ );
+ }
if (config.isVirtualStorageEphemeral()) {
for (StorageLocation location : locations) {
location.setAreWeakEntriesEphemeral(true);
}
}
- virtualStorageLoadOnDemandExec =
- MoreExecutors.listeningDecorator(
- // probably replace this with virtual threads once minimum
version is java 21
- Executors.newFixedThreadPool(
- config.getVirtualStorageLoadThreads(),
-
Execs.makeThreadFactory("VirtualStorageOnDemandLoadingThread-%s")
- )
- );
+ if (config.isVirtualStorageUseVirtualThreads()) {
+ log.info(
+ "Using virtual storage mode with virtual threads - max concurrent
on demand loads: [%d].",
+ config.getVirtualStorageLoadThreads()
+ );
+ virtualStorageLoadOnDemandPermits = new
Semaphore(config.getVirtualStorageLoadThreads());
+ virtualStorageLoadOnDemandExec = MoreExecutors.listeningDecorator(
+ Executors.newThreadPerTaskExecutor(
+ Thread.ofVirtual()
+ .name("VirtualStorageOnDemandLoadingThread-", 0)
+ .factory()
+ )
+ );
+ } else {
+ log.info(
+ "Using virtual storage mode with fixed platform thread pool - on
demand load threads: [%d].",
+ config.getVirtualStorageLoadThreads()
+ );
+ virtualStorageLoadOnDemandPermits = null;
+ virtualStorageLoadOnDemandExec = MoreExecutors.listeningDecorator(
+ Executors.newFixedThreadPool(
+ config.getVirtualStorageLoadThreads(),
+
Execs.makeThreadFactory("VirtualStorageOnDemandLoadingThread-%s")
+ )
+ );
+ }
} else {
log.info(
"Number of threads to load segments into page cache - on bootstrap:
[%d], on download: [%d].",
@@ -181,6 +211,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
);
}
virtualStorageLoadOnDemandExec = null;
+ virtualStorageLoadOnDemandPermits = null;
}
}
@@ -786,15 +817,27 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
final long startTime = System.nanoTime();
return virtualStorageLoadOnDemandExec.submit(
() -> {
- final long execStartTime = System.nanoTime();
- final long waitTime = execStartTime - startTime;
- entry.mount(location);
- return new AcquireSegmentResult(
- entry.referenceProvider,
- entry.dataSegment.getSize(),
- waitTime,
- System.nanoTime() - execStartTime
- );
+ // Acquire a permit inside the task so that waiting parks the
(virtual) thread instead of blocking the
+ // submitter. In fixed-pool mode permits is null and the pool
size itself bounds concurrency.
+ if (virtualStorageLoadOnDemandPermits != null) {
+ virtualStorageLoadOnDemandPermits.acquireUninterruptibly();
+ }
+ try {
+ final long execStartTime = System.nanoTime();
+ final long waitTime = execStartTime - startTime;
+ entry.mount(location);
+ return new AcquireSegmentResult(
+ entry.referenceProvider,
+ entry.dataSegment.getSize(),
+ waitTime,
+ System.nanoTime() - execStartTime
+ );
+ }
+ finally {
+ if (virtualStorageLoadOnDemandPermits != null) {
+ virtualStorageLoadOnDemandPermits.release();
+ }
+ }
}
);
}
@@ -982,6 +1025,9 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
private File storageDir;
private ReferenceCountedSegmentProvider referenceProvider;
private final AtomicReference<Runnable> onUnmount = new
AtomicReference<>();
+ // switched from synchronized to use a ReentrantLock to avoid pinning
virtual threads to platform threads until
+ // https://openjdk.org/jeps/491, we could consider switching back after
java 24+ is the minimum version
+ private final ReentrantLock entryLock = new ReentrantLock();
private SegmentCacheEntry(final DataSegment dataSegment)
{
@@ -1003,9 +1049,15 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
@Override
- public synchronized boolean isMounted()
+ public boolean isMounted()
{
- return referenceProvider != null;
+ entryLock.lock();
+ try {
+ return referenceProvider != null;
+ }
+ finally {
+ entryLock.unlock();
+ }
}
@Override
@@ -1024,7 +1076,8 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
try {
- synchronized (this) {
+ entryLock.lock();
+ try {
if (location != null) {
log.debug(
"already mounted [%s] in location[%s], but asked to load in
[%s], unmounting old location",
@@ -1070,7 +1123,9 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
referenceProvider = ReferenceCountedSegmentProvider.of(segment);
}
-
+ finally {
+ entryLock.unlock();
+ }
// since we do not hold a lock on the location while mounting, make
sure that we actually are reserved and
// should have mounted, otherwise unmount so we don't leave any
orphaned files
@@ -1114,16 +1169,21 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
@Override
public void unmount()
{
- final Lock lock;
- synchronized (this) {
+ final Lock locationLock;
+ entryLock.lock();
+ try {
if (location == null) {
return;
}
- lock = location.getLock().readLock();
+ locationLock = location.getLock().readLock();
}
- lock.lock();
+ finally {
+ entryLock.unlock();
+ }
+ locationLock.lock();
try {
- synchronized (this) {
+ entryLock.lock();
+ try {
if (referenceProvider != null) {
ReferenceCountedSegmentProvider provider = referenceProvider;
referenceProvider = null;
@@ -1145,32 +1205,53 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
onUnmountRunnable.run();
}
}
+ finally {
+ entryLock.unlock();
+ }
}
finally {
- lock.unlock();
+ locationLock.unlock();
}
}
- public synchronized Optional<Segment> acquireReference()
+ public Optional<Segment> acquireReference()
{
- if (referenceProvider == null) {
- return Optional.empty();
+ entryLock.lock();
+ try {
+ if (referenceProvider == null) {
+ return Optional.empty();
+ }
+ return referenceProvider.acquireReference();
+ }
+ finally {
+ entryLock.unlock();
}
- return referenceProvider.acquireReference();
}
- public synchronized boolean setDeleteInfoFileOnUnmount()
+ public boolean setDeleteInfoFileOnUnmount()
{
- if (location == null) {
- return false;
+ entryLock.lock();
+ try {
+ if (location == null) {
+ return false;
+ }
+ onUnmount.set(() -> deleteSegmentInfoFile(dataSegment));
+ return true;
+ }
+ finally {
+ entryLock.unlock();
}
- onUnmount.set(() -> deleteSegmentInfoFile(dataSegment));
- return true;
}
- public synchronized void clearOnUnmount()
+ public void clearOnUnmount()
{
- onUnmount.set(null);
+ entryLock.lock();
+ try {
+ onUnmount.set(null);
+ }
+ finally {
+ entryLock.unlock();
+ }
}
public void loadIntoPageCache()
@@ -1178,7 +1259,8 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
if (!isMounted()) {
return;
}
- synchronized (this) {
+ entryLock.lock();
+ try {
final File[] children = storageDir.listFiles();
if (children != null) {
for (File child : children) {
@@ -1192,6 +1274,9 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
}
}
+ finally {
+ entryLock.unlock();
+ }
}
public boolean checkExists(final File location)
@@ -1204,7 +1289,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
return new File(location, relativePathString);
}
- @GuardedBy("this")
+ @GuardedBy("entryLock")
private void loadInLocationWithStartMarker(final DataSegment segment,
final File storageDir)
throws SegmentLoadingException
{
@@ -1228,7 +1313,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
}
- @GuardedBy("this")
+ @GuardedBy("entryLock")
private void loadInLocation(final DataSegment segment, final File
storageDir)
throws SegmentLoadingException
{
@@ -1246,7 +1331,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
}
- @GuardedBy("this")
+ @GuardedBy("entryLock")
private SegmentizerFactory getSegmentFactory(final File segmentFiles)
throws SegmentLoadingException
{
final File factoryJson = new File(segmentFiles, "factory.json");
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index 55310348d21..79c84af538b 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -946,6 +946,50 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
segmentActionAfterDrop.close();
}
+ @Test
+ public void testVirtualStorageRejectsNonPositiveLoadThreads()
+ {
+ final StorageLocationConfig locationConfig = new
StorageLocationConfig(localSegmentCacheDir, 10000L, null);
+ final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return ImmutableList.of(locationConfig);
+ }
+
+ @Override
+ public boolean isVirtualStorage()
+ {
+ return true;
+ }
+
+ @Override
+ public int getVirtualStorageLoadThreads()
+ {
+ return 0;
+ }
+ };
+ final List<StorageLocation> storageLocations =
loaderConfig.toStorageLocations();
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new SegmentLocalCacheManager(
+ storageLocations,
+ loaderConfig,
+ new
LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
+ TestHelper.getTestIndexIO(jsonMapper, ColumnConfig.DEFAULT),
+ jsonMapper
+ )
+ ),
+ new DruidExceptionMatcher(
+ DruidException.Persona.OPERATOR,
+ DruidException.Category.INVALID_INPUT,
+ "general"
+ ).expectMessageIs("virtualStorageLoadThreads must be greater than 0,
got [0]")
+ );
+ }
+
@Test
public void testGetBootstrapSegmentVirtualStorage() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]