This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a9ca2da52a0 feat: use java virtual threads for virtual storage load on 
demand thread pool by default (#19396)
a9ca2da52a0 is described below

commit a9ca2da52a04067f99feee6db5caff8cb07fbe3d
Author: Clint Wylie <[email protected]>
AuthorDate: Wed May 6 15:19:04 2026 -0700

    feat: use java virtual threads for virtual storage load on demand thread 
pool by default (#19396)
    
    changes:
    * switch default `SegmentLocalCacheManager.virtualStorageLoadOnDemandExec` 
to virtual threads with a `Semaphore` for backpressure
    * added `SegmentLoaderConfig.virtualStorageUseVirtualThreads` 
(`druid.segmentCache.virtualStorageUseVirtualThreads`) config that defaults to 
true, but allows opt-out via setting to false
    * raise default `SegmentLoaderConfig.virtualStorageLoadThreads` default to 
Math.max(32, 4 * cores), sized as ~4x lookahead per processing thread
    * convert `SegmentCacheEntry` from `synchronized` to `ReentrantLock` so 
virtual threads park instead of pinning the carrier during mount
    * ensure virtualStorageLoadThreads is greater than 0
---
 .../druid/segment/loading/SegmentLoaderConfig.java |  17 +-
 .../segment/loading/SegmentLocalCacheManager.java  | 177 +++++++++++++++------
 .../loading/SegmentLocalCacheManagerTest.java      |  44 +++++
 3 files changed, 191 insertions(+), 47 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
index de3abfd30b8..3f259eb0e15 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
@@ -75,7 +75,16 @@ public class SegmentLoaderConfig
   private boolean virtualStorage = false;
 
   @JsonProperty("virtualStorageLoadThreads")
-  private int virtualStorageLoadThreads = 2 * 
runtimeInfo.getAvailableProcessors();
+  private int virtualStorageLoadThreads = Math.max(32, 4 * 
runtimeInfo.getAvailableProcessors());
+
+  /**
+   * When true (the default), the on-demand load executor uses one virtual 
thread per task with a {@link
+   * java.util.concurrent.Semaphore} sized by {@link 
#virtualStorageLoadThreads} for backpressure. When false, falls back
+   * to a fixed platform-thread pool of that size. The escape hatch exists in 
case virtual threads behave poorly with a
+   * particular deep storage SDK or workload.
+   */
+  @JsonProperty("virtualStorageUseVirtualThreads")
+  private boolean virtualStorageUseVirtualThreads = true;
 
   /**
    * When enabled, weakly-held cache entries are evicted immediately upon 
release of all holds, rather than
@@ -162,6 +171,11 @@ public class SegmentLoaderConfig
     return virtualStorageLoadThreads;
   }
 
+  public boolean isVirtualStorageUseVirtualThreads()
+  {
+    return virtualStorageUseVirtualThreads;
+  }
+
   public boolean isVirtualStorageEphemeral()
   {
     return virtualStorageIsEphemeral;
@@ -218,6 +232,7 @@ public class SegmentLoaderConfig
            ", statusQueueMaxSize=" + statusQueueMaxSize +
            ", virtualStorage=" + virtualStorage +
            ", virtualStorageLoadThreads=" + virtualStorageLoadThreads +
+           ", virtualStorageUseVirtualThreads=" + 
virtualStorageUseVirtualThreads +
            ", virtualStorageIsEphemeral=" + virtualStorageIsEphemeral +
            ", combinedMaxSize=" + combinedMaxSize +
            '}';
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 9b5496fbadb..cd36f54ef26 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -64,9 +64,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 
 /**
@@ -116,6 +118,12 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
   private final IndexIO indexIO;
 
   private final ListeningExecutorService virtualStorageLoadOnDemandExec;
+  /**
+   * Bounds the number of in-flight on-demand loads when running with virtual 
threads. Null when virtual storage is
+   * disabled or when the legacy fixed-pool path is selected (the pool size is 
the implicit limit there).
+   */
+  @Nullable
+  private final Semaphore virtualStorageLoadOnDemandPermits;
   private ExecutorService loadOnBootstrapExec = null;
   private ExecutorService loadOnDownloadExec = null;
 
@@ -137,29 +145,51 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     log.info("Using storage location strategy[%s].", 
this.strategy.getClass().getSimpleName());
 
     if (config.isVirtualStorage()) {
-      log.info(
-          "Using virtual storage mode - on demand load threads: [%d].",
-          config.getVirtualStorageLoadThreads()
-      );
       if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload() > 0) {
         throw DruidException.defensive("Invalid configuration: virtualStorage 
is incompatible with numThreadsToLoadSegmentsIntoPageCacheOnDownload");
       }
       if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() > 0) {
         throw DruidException.defensive("Invalid configuration: virtualStorage 
is incompatible with numThreadsToLoadSegmentsIntoPageCacheOnBootstrap");
       }
+      if (config.getVirtualStorageLoadThreads() <= 0) {
+        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                            .ofCategory(DruidException.Category.INVALID_INPUT)
+                            .build(
+                                "virtualStorageLoadThreads must be greater 
than 0, got [%d]",
+                                config.getVirtualStorageLoadThreads()
+                            );
+      }
       if (config.isVirtualStorageEphemeral()) {
         for (StorageLocation location : locations) {
           location.setAreWeakEntriesEphemeral(true);
         }
       }
-      virtualStorageLoadOnDemandExec =
-          MoreExecutors.listeningDecorator(
-              // probably replace this with virtual threads once minimum 
version is java 21
-              Executors.newFixedThreadPool(
-                  config.getVirtualStorageLoadThreads(),
-                  
Execs.makeThreadFactory("VirtualStorageOnDemandLoadingThread-%s")
-              )
-          );
+      if (config.isVirtualStorageUseVirtualThreads()) {
+        log.info(
+            "Using virtual storage mode with virtual threads - max concurrent 
on demand loads: [%d].",
+            config.getVirtualStorageLoadThreads()
+        );
+        virtualStorageLoadOnDemandPermits = new 
Semaphore(config.getVirtualStorageLoadThreads());
+        virtualStorageLoadOnDemandExec = MoreExecutors.listeningDecorator(
+            Executors.newThreadPerTaskExecutor(
+                Thread.ofVirtual()
+                      .name("VirtualStorageOnDemandLoadingThread-", 0)
+                      .factory()
+            )
+        );
+      } else {
+        log.info(
+            "Using virtual storage mode with fixed platform thread pool - on 
demand load threads: [%d].",
+            config.getVirtualStorageLoadThreads()
+        );
+        virtualStorageLoadOnDemandPermits = null;
+        virtualStorageLoadOnDemandExec = MoreExecutors.listeningDecorator(
+            Executors.newFixedThreadPool(
+                config.getVirtualStorageLoadThreads(),
+                
Execs.makeThreadFactory("VirtualStorageOnDemandLoadingThread-%s")
+            )
+        );
+      }
     } else {
       log.info(
           "Number of threads to load segments into page cache - on bootstrap: 
[%d], on download: [%d].",
@@ -181,6 +211,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
         );
       }
       virtualStorageLoadOnDemandExec = null;
+      virtualStorageLoadOnDemandPermits = null;
     }
   }
 
@@ -786,15 +817,27 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
           final long startTime = System.nanoTime();
           return virtualStorageLoadOnDemandExec.submit(
               () -> {
-                final long execStartTime = System.nanoTime();
-                final long waitTime = execStartTime - startTime;
-                entry.mount(location);
-                return new AcquireSegmentResult(
-                    entry.referenceProvider,
-                    entry.dataSegment.getSize(),
-                    waitTime,
-                    System.nanoTime() - execStartTime
-                );
+                // Acquire a permit inside the task so that waiting parks the 
(virtual) thread instead of blocking the
+                // submitter. In fixed-pool mode permits is null and the pool 
size itself bounds concurrency.
+                if (virtualStorageLoadOnDemandPermits != null) {
+                  virtualStorageLoadOnDemandPermits.acquireUninterruptibly();
+                }
+                try {
+                  final long execStartTime = System.nanoTime();
+                  final long waitTime = execStartTime - startTime;
+                  entry.mount(location);
+                  return new AcquireSegmentResult(
+                      entry.referenceProvider,
+                      entry.dataSegment.getSize(),
+                      waitTime,
+                      System.nanoTime() - execStartTime
+                  );
+                }
+                finally {
+                  if (virtualStorageLoadOnDemandPermits != null) {
+                    virtualStorageLoadOnDemandPermits.release();
+                  }
+                }
               }
           );
         }
@@ -982,6 +1025,9 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     private File storageDir;
     private ReferenceCountedSegmentProvider referenceProvider;
     private final AtomicReference<Runnable> onUnmount = new 
AtomicReference<>();
+    // switched from synchronized to use a ReentrantLock to avoid pinning 
virtual threads to platform threads until
+    // https://openjdk.org/jeps/491, we could consider switching back after 
java 24+ is the minimum version
+    private final ReentrantLock entryLock = new ReentrantLock();
 
     private SegmentCacheEntry(final DataSegment dataSegment)
     {
@@ -1003,9 +1049,15 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     }
 
     @Override
-    public synchronized boolean isMounted()
+    public boolean isMounted()
     {
-      return referenceProvider != null;
+      entryLock.lock();
+      try {
+        return referenceProvider != null;
+      }
+      finally {
+        entryLock.unlock();
+      }
     }
 
     @Override
@@ -1024,7 +1076,8 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       }
 
       try {
-        synchronized (this) {
+        entryLock.lock();
+        try {
           if (location != null) {
             log.debug(
                 "already mounted [%s] in location[%s], but asked to load in 
[%s], unmounting old location",
@@ -1070,7 +1123,9 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
           lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
           referenceProvider = ReferenceCountedSegmentProvider.of(segment);
         }
-
+        finally {
+          entryLock.unlock();
+        }
 
         // since we do not hold a lock on the location while mounting, make 
sure that we actually are reserved and
         // should have mounted, otherwise unmount so we don't leave any 
orphaned files
@@ -1114,16 +1169,21 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     @Override
     public void unmount()
     {
-      final Lock lock;
-      synchronized (this) {
+      final Lock locationLock;
+      entryLock.lock();
+      try {
         if (location == null) {
           return;
         }
-        lock = location.getLock().readLock();
+        locationLock = location.getLock().readLock();
       }
-      lock.lock();
+      finally {
+        entryLock.unlock();
+      }
+      locationLock.lock();
       try {
-        synchronized (this) {
+        entryLock.lock();
+        try {
           if (referenceProvider != null) {
             ReferenceCountedSegmentProvider provider = referenceProvider;
             referenceProvider = null;
@@ -1145,32 +1205,53 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
             onUnmountRunnable.run();
           }
         }
+        finally {
+          entryLock.unlock();
+        }
       }
       finally {
-        lock.unlock();
+        locationLock.unlock();
       }
     }
 
-    public synchronized Optional<Segment> acquireReference()
+    public Optional<Segment> acquireReference()
     {
-      if (referenceProvider == null) {
-        return Optional.empty();
+      entryLock.lock();
+      try {
+        if (referenceProvider == null) {
+          return Optional.empty();
+        }
+        return referenceProvider.acquireReference();
+      }
+      finally {
+        entryLock.unlock();
       }
-      return referenceProvider.acquireReference();
     }
 
-    public synchronized boolean setDeleteInfoFileOnUnmount()
+    public boolean setDeleteInfoFileOnUnmount()
     {
-      if (location == null) {
-        return false;
+      entryLock.lock();
+      try {
+        if (location == null) {
+          return false;
+        }
+        onUnmount.set(() -> deleteSegmentInfoFile(dataSegment));
+        return true;
+      }
+      finally {
+        entryLock.unlock();
       }
-      onUnmount.set(() -> deleteSegmentInfoFile(dataSegment));
-      return true;
     }
 
-    public synchronized void clearOnUnmount()
+    public void clearOnUnmount()
     {
-      onUnmount.set(null);
+      entryLock.lock();
+      try {
+        onUnmount.set(null);
+      }
+      finally {
+        entryLock.unlock();
+      }
     }
 
     public void loadIntoPageCache()
@@ -1178,7 +1259,8 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       if (!isMounted()) {
         return;
       }
-      synchronized (this) {
+      entryLock.lock();
+      try {
         final File[] children = storageDir.listFiles();
         if (children != null) {
           for (File child : children) {
@@ -1192,6 +1274,9 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
           }
         }
       }
+      finally {
+        entryLock.unlock();
+      }
     }
 
     public boolean checkExists(final File location)
@@ -1204,7 +1289,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       return new File(location, relativePathString);
     }
 
-    @GuardedBy("this")
+    @GuardedBy("entryLock")
     private void loadInLocationWithStartMarker(final DataSegment segment, 
final File storageDir)
         throws SegmentLoadingException
     {
@@ -1228,7 +1313,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       }
     }
 
-    @GuardedBy("this")
+    @GuardedBy("entryLock")
     private void loadInLocation(final DataSegment segment, final File 
storageDir)
         throws SegmentLoadingException
     {
@@ -1246,7 +1331,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       }
     }
 
-    @GuardedBy("this")
+    @GuardedBy("entryLock")
     private SegmentizerFactory getSegmentFactory(final File segmentFiles) 
throws SegmentLoadingException
     {
       final File factoryJson = new File(segmentFiles, "factory.json");
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index 55310348d21..79c84af538b 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -946,6 +946,50 @@ public class SegmentLocalCacheManagerTest extends 
InitializedNullHandlingTest
     segmentActionAfterDrop.close();
   }
 
+  @Test
+  public void testVirtualStorageRejectsNonPositiveLoadThreads()
+  {
+    final StorageLocationConfig locationConfig = new 
StorageLocationConfig(localSegmentCacheDir, 10000L, null);
+    final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig()
+    {
+      @Override
+      public List<StorageLocationConfig> getLocations()
+      {
+        return ImmutableList.of(locationConfig);
+      }
+
+      @Override
+      public boolean isVirtualStorage()
+      {
+        return true;
+      }
+
+      @Override
+      public int getVirtualStorageLoadThreads()
+      {
+        return 0;
+      }
+    };
+    final List<StorageLocation> storageLocations = 
loaderConfig.toStorageLocations();
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> new SegmentLocalCacheManager(
+                storageLocations,
+                loaderConfig,
+                new 
LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
+                TestHelper.getTestIndexIO(jsonMapper, ColumnConfig.DEFAULT),
+                jsonMapper
+            )
+        ),
+        new DruidExceptionMatcher(
+            DruidException.Persona.OPERATOR,
+            DruidException.Category.INVALID_INPUT,
+            "general"
+        ).expectMessageIs("virtualStorageLoadThreads must be greater than 0, 
got [0]")
+    );
+  }
+
   @Test
   public void testGetBootstrapSegmentVirtualStorage() throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to