gianm commented on code in PR #19535:
URL: https://github.com/apache/druid/pull/19535#discussion_r3338719602


##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexSegment.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CloseableUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+/**
+ * {@link Segment} wrapper around a {@link PartialQueryableIndex}. Mirrors 
{@link QueryableIndexSegment} but wires up
+ * the V10-specific {@link V10TimeBoundaryInspector} (which answers from
+ * {@link org.apache.druid.segment.projections.ProjectionMetadata} min/max 
fields without downloading the
+ * {@code __time} column) and the partial-aware {@link 
PartialQueryableIndexCursorFactory} (which downloads
+ * required files on the supplied download executor before handing back a 
cursor).
+ * <p>
+ * Lifecycle: this segment is intended to exist as a transient reference-hold 
scope over an externally-owned
+ * {@link PartialQueryableIndex}, it never closes the underlying index. The 
{@code onClose} hook is what
+ * {@link #close()} fires when the segment is closed (e.g. to 'release' 
reference tracking stuff in the cache layer).
+ */
+public class PartialQueryableIndexSegment implements 
ReferenceCountedSegmentProvider.LeafReference
+{
+  private final PartialQueryableIndex index;
+  private final PartialQueryableIndexCursorFactory cursorFactory;
+  private final TimeBoundaryInspector timeBoundaryInspector;
+  private final SegmentId segmentId;
+  private final Closeable onClose;
+
+  public PartialQueryableIndexSegment(
+      PartialQueryableIndex index,
+      SegmentId segmentId,
+      Closeable onClose,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.timeBoundaryInspector = V10TimeBoundaryInspector.forBaseProjection(
+        index.getBaseProjectionMetadata(),
+        index.getDataInterval()
+    );
+    this.cursorFactory = new PartialQueryableIndexCursorFactory(
+        index,
+        timeBoundaryInspector,
+        bundleAcquirer
+    );
+    this.segmentId = segmentId;
+    this.onClose = onClose;
+  }
+
+  @Override
+  public SegmentId getId()
+  {
+    return segmentId;
+  }
+
+  @Override
+  public Interval getDataInterval()
+  {
+    return index.getDataInterval();
+  }
+
+  @Override
+  public void close()
+  {
+    CloseableUtils.closeAndWrapExceptions(onClose);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Nullable
+  @Override
+  public <T> T as(@Nonnull Class<T> clazz)

Review Comment:
   Is it possible to include a `PhysicalSegmentInspector`? It would be useful 
to get the number of rows, which is used for incrementing certain counters.



##########
server/src/main/java/org/apache/druid/server/SegmentManager.java:
##########
@@ -225,6 +224,32 @@ public AcquireSegmentAction acquireSegment(DataSegment 
dataSegment)
     return cacheManager.acquireSegment(dataSegment);
   }
 
+  /**
+   * Partial-load variant of {@link #acquireCachedSegment(SegmentId)}, returns 
a {@link Segment} when the cache holds

Review Comment:
   Is the segment returned by `acquireCachedSegment` guaranteed to be 
fully-loaded? (Its javadocs do not seem to make that clear.)



##########
server/src/main/java/org/apache/druid/server/SegmentManager.java:
##########
@@ -225,6 +224,32 @@ public AcquireSegmentAction acquireSegment(DataSegment 
dataSegment)
     return cacheManager.acquireSegment(dataSegment);
   }
 
+  /**
+   * Partial-load variant of {@link #acquireCachedSegment(SegmentId)}, returns 
a {@link Segment} when the cache holds
+   * an entry for the id; empty otherwise. The returned segment may not be 
fully loaded, callers must use async methods
+   * like {@link org.apache.druid.segment.CursorFactory#makeCursorHolderAsync} 
to download data on-demand. If the
+   * returned segment is only partially loaded, the synchronous methods like 
{@code makeCursorHolder} will fail if
+   * anything is still missing. If the segment is fully loaded, or not capable 
of partial loading, this method will
+   * still return a segment if it is present in cache and any async methods 
will function properly and return
+   * immediately.
+   */
+  public Optional<Segment> acquireCachedPartialSegment(SegmentId segmentId)
+  {
+    return cacheManager.acquireCachedPartialSegment(segmentId);
+  }
+
+  /**
+   * Partial-load variant of {@link #acquireSegment(DataSegment)}, returns an 
{@link AcquireSegmentAction} that

Review Comment:
   Is the segment returned by `acquireSegment` guaranteed to be fully-loaded? 
(Its javadocs do not seem to make that clear.)



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -536,16 +897,30 @@ private AcquireSegmentAction 
acquireExistingSegment(SegmentCacheEntryIdentifier
             location.addWeakReservationHoldIfExists(identifier)
         );
         if (hold != null) {
-          if (hold.getEntry().isMounted()) {
+          if (!(hold.getEntry() instanceof CompleteSegmentCacheEntry 
complete)) {
+            // The eager (complete) acquire path found a non-complete entry 
under this id. This only arises if
+            // partial-load on-disk state survived a toggle of 
druid.segmentCache.virtualStoragePartialDownloadsEnabled
+            // to false (getCachedSegments reserves an on-disk partial layout 
regardless of the flag). The eager path
+            // cannot serve a partial layout; surface a clear operator error 
rather than a ClassCastException.
+            throw DruidException.forPersona(DruidException.Persona.OPERATOR)

Review Comment:
   Two concerns:
   
   1. Won't this also be an issue on downgrade?
   2. Is it possible to repair this rather than throwing an error? As is, it 
will be difficult to turn off partial downloads once they are enabled.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java:
##########
@@ -140,7 +140,7 @@ public synchronized Optional<Segment> acquireIfCached()
       throw DruidException.defensive("Segment with descriptor[%s] is already 
acquired", descriptor);
     }
 
-    final Optional<Segment> cachedSegment = 
segmentManager.acquireCachedSegment(segmentId);
+    final Optional<Segment> cachedSegment = 
segmentManager.acquireCachedPartialSegment(segmentId);

Review Comment:
   Make it an option whether `RegularLoadableSegment` does a partial fetch or 
full fetch? It may not be a safe assumption that all MSQ implementations of 
query processing are going to be able to handle partial segments / async stuff. 
Even if all existing ones do, there's also future ones, ones from extensions, 
etc.



##########
processing/src/main/java/org/apache/druid/segment/PartialBundleAcquirer.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import java.io.Closeable;
+
+/**
+ * Context object that bundles together pieces of cache-layer machinery needed 
to do on-demand partial segment loading:
+ * <ol>
+ *   <li>A hook for acquiring a hold on the cache-layer bundle.</li>
+ *   <li>The executor on which partial downloads should run.</li>
+ * </ol>
+ */
+public interface PartialBundleAcquirer
+{
+  /**
+   * Acquire a hold on the cache-layer bundle with the given name. The 
returned {@link Closeable} releases the hold

Review Comment:
   How are callers supposed to know what the bundle names are?



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    // refuse to download here so we never accidentally block a processing 
thread
+    if (!index.isFullyDownloaded()) {
+      throw DruidException.defensive(
+          "Sync makeCursorHolder requires the segment to be fully downloaded; 
use makeCursorHolderAsync for "
+          + "on-demand loading, or acquire the segment via the eager path so 
all files are loaded up front."
+      );
+    }
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
+    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
+    final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;
+
+    // Mount the cache-layer bundle BEFORE submitting downloads. This 
sparse-allocates the bundle's container files
+    // and reserves the disk-usage accounting at bundle granularity. The 
returned Closeable releases the hold when
+    // the cursor closes, allowing the cache to later evict the bundle if 
needed.
+    // PartialBundleAcquirer.acquire() throws DruidException on failure.

Review Comment:
   This comment is too long



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)

Review Comment:
   Given this is going to blow up, the default impl for `makeCursorHolderAsync` 
(which calls `makeCursorHolder`) isn't very useful, and seems likely to lead to 
delegating `CursorFactory` being implemented improperly. Consider deleting it, 
and perhaps replacing it with an abstract base class that can be used for leaf 
`CursorFactory` that don't have special logic for async cursor creation.



##########
processing/src/main/java/org/apache/druid/segment/PartialBundleAcquirer.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import java.io.Closeable;
+
+/**
+ * Context object that bundles together pieces of cache-layer machinery needed 
to do on-demand partial segment loading:
+ * <ol>
+ *   <li>A hook for acquiring a hold on the cache-layer bundle.</li>
+ *   <li>The executor on which partial downloads should run.</li>
+ * </ol>
+ */
+public interface PartialBundleAcquirer
+{
+  /**
+   * Acquire a hold on the cache-layer bundle with the given name. The 
returned {@link Closeable} releases the hold
+   * when closed (the bundle itself stays in the cache for re-use by 
subsequent acquires).
+   * <p>
+   * Implementations should be safe to call concurrently for different bundle 
names.

Review Comment:
   "should be" or "must be"? Does the wording imply it's *not* safe to call 
concurrently with the same bundle name?



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    // refuse to download here so we never accidentally block a processing 
thread
+    if (!index.isFullyDownloaded()) {
+      throw DruidException.defensive(
+          "Sync makeCursorHolder requires the segment to be fully downloaded; 
use makeCursorHolderAsync for "
+          + "on-demand loading, or acquire the segment via the eager path so 
all files are loaded up front."
+      );
+    }
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
+    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
+    final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;

Review Comment:
   How did we know that the bundle name should be the projection name? Is that 
specified somewhere or just how things are currently implemented?



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexSegment.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CloseableUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+/**
+ * {@link Segment} wrapper around a {@link PartialQueryableIndex}. Mirrors 
{@link QueryableIndexSegment} but wires up
+ * the V10-specific {@link V10TimeBoundaryInspector} (which answers from
+ * {@link org.apache.druid.segment.projections.ProjectionMetadata} min/max 
fields without downloading the
+ * {@code __time} column) and the partial-aware {@link 
PartialQueryableIndexCursorFactory} (which downloads
+ * required files on the supplied download executor before handing back a 
cursor).
+ * <p>
+ * Lifecycle: this segment is intended to exist as a transient reference-hold 
scope over an externally-owned

Review Comment:
   What typically owns the `PartialQueryableIndex`?



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    // refuse to download here so we never accidentally block a processing 
thread
+    if (!index.isFullyDownloaded()) {
+      throw DruidException.defensive(
+          "Sync makeCursorHolder requires the segment to be fully downloaded; 
use makeCursorHolderAsync for "
+          + "on-demand loading, or acquire the segment via the eager path so 
all files are loaded up front."
+      );
+    }
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
+    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
+    final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;
+
+    // Mount the cache-layer bundle BEFORE submitting downloads. This 
sparse-allocates the bundle's container files
+    // and reserves the disk-usage accounting at bundle granularity. The 
returned Closeable releases the hold when
+    // the cursor closes, allowing the cache to later evict the bundle if 
needed.
+    // PartialBundleAcquirer.acquire() throws DruidException on failure.
+    final Closeable bundleHold = bundleAcquirer.acquire(bundleName);
+
+    try {
+      // Submit one materialization task per column so a multi-threaded 
download executor can fan them out
+      final ListeningExecutorService downloadExec = 
bundleAcquirer.getDownloadExec();
+      final List<ListenableFuture<?>> columnDownloads = new 
ArrayList<>(requiredColumns.size());
+      for (String column : requiredColumns) {
+        columnDownloads.add(downloadExec.submit(() -> {
+          rowSelector.getColumnHolder(column);
+          return null;
+        }));
+      }
+
+      // No canceler: downloads are allowed to complete even if the awaiter 
cancels. Once they complete and the inner
+      // CursorHolder is handed to asyncHolder.set(), it's owned by the 
asyncHolder until the awaiter takes one of
+      // two paths: asyncHolder.close() (cancel, closes the held holder) or 
asyncHolder.release() (success, transfers
+      // ownership to the caller). Either way the holder is drained and 
nothing leaks.
+      final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+      Futures.addCallback(
+          Futures.allAsList(columnDownloads),

Review Comment:
   Seems like a good use case for `AsyncResource.collect(columnDownloads)` and 
`AsyncResource.transform` (from #19539)



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    // refuse to download here so we never accidentally block a processing 
thread
+    if (!index.isFullyDownloaded()) {
+      throw DruidException.defensive(
+          "Sync makeCursorHolder requires the segment to be fully downloaded; 
use makeCursorHolderAsync for "
+          + "on-demand loading, or acquire the segment via the eager path so 
all files are loaded up front."
+      );
+    }
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
+    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
+    final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;
+
+    // Mount the cache-layer bundle BEFORE submitting downloads. This 
sparse-allocates the bundle's container files
+    // and reserves the disk-usage accounting at bundle granularity. The 
returned Closeable releases the hold when
+    // the cursor closes, allowing the cache to later evict the bundle if 
needed.
+    // PartialBundleAcquirer.acquire() throws DruidException on failure.
+    final Closeable bundleHold = bundleAcquirer.acquire(bundleName);
+
+    try {
+      // Submit one materialization task per column so a multi-threaded 
download executor can fan them out
+      final ListeningExecutorService downloadExec = 
bundleAcquirer.getDownloadExec();
+      final List<ListenableFuture<?>> columnDownloads = new 
ArrayList<>(requiredColumns.size());
+      for (String column : requiredColumns) {
+        columnDownloads.add(downloadExec.submit(() -> {
+          rowSelector.getColumnHolder(column);
+          return null;
+        }));
+      }
+
+      // No canceler: downloads are allowed to complete even if the awaiter 
cancels. Once they complete and the inner
+      // CursorHolder is handed to asyncHolder.set(), it's owned by the 
asyncHolder until the awaiter takes one of
+      // two paths: asyncHolder.close() (cancel, closes the held holder) or 
asyncHolder.release() (success, transfers
+      // ownership to the caller). Either way the holder is drained and 
nothing leaks.
+      final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+      Futures.addCallback(
+          Futures.allAsList(columnDownloads),
+          new FutureCallback<>()
+          {
+            @Override
+            public void onSuccess(List<Object> ignored)
+            {
+              try {
+                final CursorHolder inner = 
delegate.makeCursorHolderForProjection(spec, matched);
+                final CursorHolder holder = wrapWithBundleRelease(inner, 
bundleHold);
+                if (!asyncHolder.set(holder)) {
+                  // wrapper was closed while we were producing the holder; 
close it ourselves so it doesn't leak.
+                  holder.close();
+                }
+              }
+              catch (Throwable t) {
+                CloseableUtils.closeAndSuppressExceptions(bundleHold, 
t::addSuppressed);
+                asyncHolder.setException(t);
+              }
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              CloseableUtils.closeAndSuppressExceptions(bundleHold, 
t::addSuppressed);
+              asyncHolder.setException(t);
+            }
+          },
+          // Run the cursor-build callback inline on whichever download thread 
finishes last. The build itself does no
+          // I/O (columns are already materialized), so it doesn't need to 
round-trip through the download executor.
+          MoreExecutors.directExecutor()
+      );
+      return asyncHolder;
+    }
+    catch (Throwable t) {
+      // Failure between acquire and the addCallback wiring (getDownloadExec, 
downloadExec.submit shut-down rejection,
+      // Futures.addCallback rejecting on a bad executor, etc.). Ownership of 
bundleHold hasn't transferred to the
+      // callback yet, so release it here. Already-submitted download tasks 
will complete with no callback wired,
+      // their captured row-selector refs drop naturally.
+      throw CloseableUtils.closeAndWrapInCatch(t, bundleHold);
+    }
+  }
+
+  /**
+   * Determine the set of physical column names to required from the chosen 
row selector given a {@link CursorBuildSpec}
+   */
+  private static Set<String> requiredColumns(

Review Comment:
   nit: the `private static` is typically placed after all non-`static` methods



##########
server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java:
##########
@@ -515,6 +515,45 @@ public void release(CacheEntry entry)
     }
   }
 
+  /**
+   * Remove a {@link #weakCacheEntries} entry that currently has no 
outstanding holds, unlinking it from the SIEVE
+   * queue and terminating its phaser (which fires the underlying {@link 
CacheEntry#unmount}). No-op when the entry
+   * is absent, is a {@link #staticCacheEntries} entry, or still has 
outstanding holds.
+   * <p>
+   * This exists for callers that register a weak entry <em>without</em> a 
{@link ReservationHold} (the bootstrap
+   * reserve path uses {@link #reserveWeak}) and need to clean it up after a 
failed mount. The normal runtime path

Review Comment:
   I think this will be useful in #19539: lingering entries on failed mount was 
a problem raised in review 
(https://github.com/apache/druid/pull/19539#discussion_r3341105641)



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -526,6 +548,345 @@ public AcquireSegmentAction acquireSegment(final 
DataSegment dataSegment)
     }
   }
 
+  @Override
+  public Optional<Segment> acquireCachedPartialSegment(SegmentId segmentId)
+  {
+    if (!config.isVirtualStoragePartialDownloadsEnabled()) {
+      return acquireCachedSegment(segmentId);
+    }
+    return acquireCachedInternal(segmentId, false);
+  }
+
+  @Override
+  public AcquireSegmentAction acquirePartialSegment(DataSegment dataSegment)
+  {
+    final SegmentRangeReader rangeReader = tryOpenRangeReader(dataSegment);
+    if (rangeReader == null) {
+      // Storage backend doesn't support range reads for this segment (e.g. V9 
format or zipped)
+      return acquireSegment(dataSegment);
+    }
+    return acquirePartialInternal(dataSegment, rangeReader, false);
+  }
+
+
+  /**
+   * Shared implementation for {@link #acquireCachedSegment} and {@link 
#acquireCachedPartialSegment}. Walks the
+   * storage locations checking for existing entries.
+   * <p>
+   * When {@code requireFullyDownloaded} is {@code true} the entry must also 
report
+   * {@link SegmentCacheEntry#isFullyDownloaded()} (the contract for {@link 
#acquireCachedSegment}), which never
+   * hands back an entry that isn't behaviorally fully-materialized. When 
{@code false} a mounted entry is
+   * sufficient ({@link #acquireCachedPartialSegment}'s lazy-mount contract), 
even if some bundles are still
+   * not yet on disk. In either case {@link 
SegmentCacheEntry#acquireReference(Closeable)} composes the weak-entry hold
+   * into the returned segment's close lifecycle.
+   */
+  private Optional<Segment> acquireCachedInternal(SegmentId segmentId, boolean 
requireFullyDownloaded)
+  {
+    final SegmentCacheEntryIdentifier id = new 
SegmentCacheEntryIdentifier(segmentId);
+    for (StorageLocation location : locations) {
+      final SegmentCacheEntry staticEntry = location.getStaticCacheEntry(id);
+      if (staticEntry != null) {
+        if (staticEntry.isMounted() && (!requireFullyDownloaded || 
staticEntry.isFullyDownloaded())) {
+          return staticEntry.acquireReference();
+        }
+        return Optional.empty();
+      }
+      if (!config.isVirtualStorage()) {
+        continue;
+      }
+      final StorageLocation.ReservationHold<SegmentCacheEntry> hold = 
location.addWeakReservationHoldIfExists(id);
+      if (hold != null) {
+        if (hold.getEntry().isMounted() && (!requireFullyDownloaded || 
hold.getEntry().isFullyDownloaded())) {
+          return hold.getEntry().acquireReference(hold);
+        }
+        hold.close();
+        return Optional.empty();
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Shared scaffolding for both partial acquire APIs. {@code 
fullDownload=false} powers
+   * {@link #acquirePartialSegment} (lazy mount, header bytes only; bundles 
mount on demand at query time);
+   * {@code fullDownload=true} powers the partial-eligible branch of {@link 
#acquireSegment} (mount +
+   * {@link PartialSegmentFileMapperV10#ensureAllDownloaded} so the returned 
segment is fully-materialized).
+   * <p>
+   * Fast path: {@link #findExistingPartialWithHold} locates an existing entry 
across locations under a hold. If the
+   * entry is already usable (mounted, and fully-downloaded when required), 
return an immediate-future action whose
+   * {@code loadCleanup} is the hold (the supplier mints fresh segments per 
call, each with its own metadata
+   * reference, so no separate cleanup is needed).
+   * <p>
+   * Slow path: under the per-segment lock, {@link #findOrReservePartial} 
reuses an existing not-yet-usable entry or
+   * reserves a fresh weak one, and the action submits mount (+ optional 
ensureAllDownloaded) to
+   * {@link #virtualStorageLoadOnDemandExec} so callers that yield on the 
future never block a processing thread on
+   * deep-storage I/O.
+   */
+  private AcquireSegmentAction acquirePartialInternal(
+      DataSegment dataSegment,
+      SegmentRangeReader rangeReader,
+      boolean fullDownload

Review Comment:
   On the `fullDownload` path I don't see stuff that is creating a cache entry 
and hold on the `StorageLocation` to account for the storage space. The logic 
seems to be fetching things outside of a hold. Is that right or did I miss it?



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    // refuse to download here so we never accidentally block a processing 
thread
+    if (!index.isFullyDownloaded()) {
+      throw DruidException.defensive(
+          "Sync makeCursorHolder requires the segment to be fully downloaded; 
use makeCursorHolderAsync for "
+          + "on-demand loading, or acquire the segment via the eager path so 
all files are loaded up front."
+      );
+    }
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
+    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
+    final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;
+
+    // Mount the cache-layer bundle BEFORE submitting downloads. This 
sparse-allocates the bundle's container files
+    // and reserves the disk-usage accounting at bundle granularity. The 
returned Closeable releases the hold when
+    // the cursor closes, allowing the cache to later evict the bundle if 
needed.
+    // PartialBundleAcquirer.acquire() throws DruidException on failure.
+    final Closeable bundleHold = bundleAcquirer.acquire(bundleName);
+
+    try {
+      // Submit one materialization task per column so a multi-threaded 
download executor can fan them out
+      final ListeningExecutorService downloadExec = 
bundleAcquirer.getDownloadExec();
+      final List<ListenableFuture<?>> columnDownloads = new 
ArrayList<>(requiredColumns.size());
+      for (String column : requiredColumns) {
+        columnDownloads.add(downloadExec.submit(() -> {
+          rowSelector.getColumnHolder(column);
+          return null;
+        }));
+      }
+
+      // No canceler: downloads are allowed to complete even if the awaiter 
cancels. Once they complete and the inner

Review Comment:
   why not cancel them? Is it because canceling them is awkward for some 
reason, or you feel it's better to not cancel them? I suppose there are pros 
and cons. The main pro I can think of (for canceling) is that no other query 
may need the files, so we're wasting time fetching them. The main con is that 
some other query _does_ need the files, so we're going to need to re-start the 
fetch if we cancel it.
   
   Maybe the best behavior is to cancel it unless some other query is also 
waiting for the same thing.



##########
processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.projections.QueryableProjection;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}.
+ * <p>
+ * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the 
segment to already be fully downloaded,
+ * intended for callers that acquired the segment via the eager 
(download-everything-up-front) path, so by the time
+ * they ask for a cursor every internal file is on disk. If anything is 
missing it throws
+ * {@link DruidException#defensive} so that we never trigger downloads on the 
sync path, since processing threads must
+ * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only 
path that performs downloads on demand;
+ * callers acknowledge that by opting into the async variant when they acquire 
a partial segment.
+ * <p>
+ * <b>Async download granularity.</b> Pre-fetch is column-level. {@link 
#makeCursorHolderAsync} calls
+ * {@link QueryableIndex#getColumnHolder} on each required column; the 
memoized supplier on the underlying
+ * {@link PartialQueryableIndex} eagerly invokes
+ * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} 
inside that call, which is what triggers
+ * the deep-storage range read. The cursor holder constructed afterward sees 
the already-materialized holders via the
+ * same memoized suppliers, so no further downloads happen at cursor-read time.
+ * <p>
+ * If a projection matches, the required columns are looked up against the 
projection's row selector and its rewritten
+ * {@link CursorBuildSpec} (which carries physical columns in the projection's 
namespace). When
+ * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column 
on the chosen row selector is pre-fetched
+ * as required by the contract of {@link CursorBuildSpec}.
+ * <p>
+ * <b>Parallelism.</b> Each column's materialization is submitted as a 
separate task to the supplied download executor.
+ * The cursor holder is constructed once every column task has completed.
+ */
+public class PartialQueryableIndexCursorFactory implements CursorFactory
+{
+  private final PartialQueryableIndex index;
+  private final QueryableIndexCursorFactory delegate;
+  private final PartialBundleAcquirer bundleAcquirer;
+
+  public PartialQueryableIndexCursorFactory(
+      PartialQueryableIndex index,
+      TimeBoundaryInspector timeBoundaryInspector,
+      PartialBundleAcquirer bundleAcquirer
+  )
+  {
+    this.index = index;
+    this.delegate = new QueryableIndexCursorFactory(index, 
timeBoundaryInspector);
+    this.bundleAcquirer = bundleAcquirer;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    // refuse to download here so we never accidentally block a processing 
thread
+    if (!index.isFullyDownloaded()) {
+      throw DruidException.defensive(
+          "Sync makeCursorHolder requires the segment to be fully downloaded; 
use makeCursorHolderAsync for "
+          + "on-demand loading, or acquire the segment via the eager path so 
all files are loaded up front."
+      );
+    }
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final QueryableProjection<QueryableIndex> matched = 
index.getProjection(spec);
+    final QueryableIndex rowSelector = matched != null ? 
matched.getRowSelector() : index;
+    final Set<String> requiredColumns = requiredColumns(rowSelector, matched, 
spec);
+    final String bundleName = matched != null ? matched.getName() : 
Projections.BASE_TABLE_PROJECTION_NAME;
+
+    // Mount the cache-layer bundle BEFORE submitting downloads. This 
sparse-allocates the bundle's container files
+    // and reserves the disk-usage accounting at bundle granularity. The 
returned Closeable releases the hold when
+    // the cursor closes, allowing the cache to later evict the bundle if 
needed.
+    // PartialBundleAcquirer.acquire() throws DruidException on failure.
+    final Closeable bundleHold = bundleAcquirer.acquire(bundleName);
+
+    try {
+      // Submit one materialization task per column so a multi-threaded 
download executor can fan them out
+      final ListeningExecutorService downloadExec = 
bundleAcquirer.getDownloadExec();
+      final List<ListenableFuture<?>> columnDownloads = new 
ArrayList<>(requiredColumns.size());
+      for (String column : requiredColumns) {
+        columnDownloads.add(downloadExec.submit(() -> {
+          rowSelector.getColumnHolder(column);
+          return null;
+        }));
+      }
+
+      // No canceler: downloads are allowed to complete even if the awaiter 
cancels. Once they complete and the inner
+      // CursorHolder is handed to asyncHolder.set(), it's owned by the 
asyncHolder until the awaiter takes one of
+      // two paths: asyncHolder.close() (cancel, closes the held holder) or 
asyncHolder.release() (success, transfers
+      // ownership to the caller). Either way the holder is drained and 
nothing leaks.
+      final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+      Futures.addCallback(
+          Futures.allAsList(columnDownloads),
+          new FutureCallback<>()
+          {
+            @Override
+            public void onSuccess(List<Object> ignored)
+            {
+              try {
+                final CursorHolder inner = 
delegate.makeCursorHolderForProjection(spec, matched);
+                final CursorHolder holder = wrapWithBundleRelease(inner, 
bundleHold);
+                if (!asyncHolder.set(holder)) {
+                  // wrapper was closed while we were producing the holder; 
close it ourselves so it doesn't leak.
+                  holder.close();
+                }
+              }
+              catch (Throwable t) {
+                CloseableUtils.closeAndSuppressExceptions(bundleHold, 
t::addSuppressed);
+                asyncHolder.setException(t);
+              }
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              CloseableUtils.closeAndSuppressExceptions(bundleHold, 
t::addSuppressed);
+              asyncHolder.setException(t);
+            }
+          },
+          // Run the cursor-build callback inline on whichever download thread 
finishes last. The build itself does no
+          // I/O (columns are already materialized), so it doesn't need to 
round-trip through the download executor.
+          MoreExecutors.directExecutor()
+      );
+      return asyncHolder;
+    }
+    catch (Throwable t) {
+      // Failure between acquire and the addCallback wiring (getDownloadExec, 
downloadExec.submit shut-down rejection,
+      // Futures.addCallback rejecting on a bad executor, etc.). Ownership of 
bundleHold hasn't transferred to the
+      // callback yet, so release it here. Already-submitted download tasks 
will complete with no callback wired,
+      // their captured row-selector refs drop naturally.
+      throw CloseableUtils.closeAndWrapInCatch(t, bundleHold);
+    }
+  }
+
+  /**
+   * Determine the set of physical column names to required from the chosen 
row selector given a {@link CursorBuildSpec}
+   */
+  private static Set<String> requiredColumns(
+      QueryableIndex rowSelector,
+      @Nullable QueryableProjection<QueryableIndex> matched,
+      CursorBuildSpec originalSpec
+  )
+  {
+    final CursorBuildSpec effective = matched != null ? 
matched.getCursorBuildSpec() : originalSpec;
+    if (effective.getPhysicalColumns() != null) {
+      final Set<String> required = new 
LinkedHashSet<>(effective.getPhysicalColumns());
+      // physicalColumns enumerates the SELECTED columns, but 
QueryableIndexCursorHolder also reads __time while
+      // building the cursor, independent of physicalColumns: unconditionally 
for a time-ordered index (its
+      // interval-checking offset reads timestamps), and via a synthesized 
__time range filter for a non-time-ordered
+      // index whose data extends past the query interval. That read happens 
after the cursor holder is handed back,
+      // so __time must be pre-fetched on the async path or it becomes a lazy 
deep-storage download on a processing
+      // thread. Predicting exactly when the holder reads __time would mean 
replicating its internals (fragile, and it
+      // reads __time in the common cases anyway), so always include it: 
__time is cheap, and it resolves to a
+      // no-download constant column for projections that don't carry a real 
time column.
+      required.add(ColumnHolder.TIME_COLUMN_NAME);
+      return required;
+    }
+    // Conservative fallback when physicalColumns isn't declared, fetch every 
column on the chosen row selector
+    // plus __time (which is special-cased and not enumerated by 
getColumnNames()).

Review Comment:
   Seems weird. Can you consider changing it in this PR, so `getColumnNames()` 
does include `__time`? Looking around at the usages of `getColumnNames()`, it 
doesn't seem like anything is super-dependent on them not including `__time`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to