gianm commented on code in PR #19535: URL: https://github.com/apache/druid/pull/19535#discussion_r3338719602
########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexSegment.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.Closeable; + +/** + * {@link Segment} wrapper around a {@link PartialQueryableIndex}. Mirrors {@link QueryableIndexSegment} but wires up + * the V10-specific {@link V10TimeBoundaryInspector} (which answers from + * {@link org.apache.druid.segment.projections.ProjectionMetadata} min/max fields without downloading the + * {@code __time} column) and the partial-aware {@link PartialQueryableIndexCursorFactory} (which downloads + * required files on the supplied download executor before handing back a cursor). + * <p> + * Lifecycle: this segment is intended to exist as a transient reference-hold scope over an externally-owned + * {@link PartialQueryableIndex}, it never closes the underlying index. The {@code onClose} hook is what + * {@link #close()} fires when the segment is closed (e.g. to 'release' reference tracking stuff in the cache layer). + */ +public class PartialQueryableIndexSegment implements ReferenceCountedSegmentProvider.LeafReference +{ + private final PartialQueryableIndex index; + private final PartialQueryableIndexCursorFactory cursorFactory; + private final TimeBoundaryInspector timeBoundaryInspector; + private final SegmentId segmentId; + private final Closeable onClose; + + public PartialQueryableIndexSegment( + PartialQueryableIndex index, + SegmentId segmentId, + Closeable onClose, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.timeBoundaryInspector = V10TimeBoundaryInspector.forBaseProjection( + index.getBaseProjectionMetadata(), + index.getDataInterval() + ); + this.cursorFactory = new PartialQueryableIndexCursorFactory( + index, + timeBoundaryInspector, + bundleAcquirer + ); + this.segmentId = segmentId; + this.onClose = onClose; + } + + @Override + public SegmentId getId() + { + return segmentId; + } + + @Override + public Interval getDataInterval() + { + return index.getDataInterval(); + } + + @Override + public void close() + { + CloseableUtils.closeAndWrapExceptions(onClose); + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public <T> T as(@Nonnull Class<T> clazz) Review Comment: Is it possible to include a `PhysicalSegmentInspector`? It would be useful to get the number of rows, which is used for incrementing certain counters. ########## server/src/main/java/org/apache/druid/server/SegmentManager.java: ########## @@ -225,6 +224,32 @@ public AcquireSegmentAction acquireSegment(DataSegment dataSegment) return cacheManager.acquireSegment(dataSegment); } + /** + * Partial-load variant of {@link #acquireCachedSegment(SegmentId)}, returns a {@link Segment} when the cache holds Review Comment: Is the segment returned by `acquireCachedSegment` guaranteed to be fully-loaded? (Its javadocs do not seem to make that clear.) ########## server/src/main/java/org/apache/druid/server/SegmentManager.java: ########## @@ -225,6 +224,32 @@ public AcquireSegmentAction acquireSegment(DataSegment dataSegment) return cacheManager.acquireSegment(dataSegment); } + /** + * Partial-load variant of {@link #acquireCachedSegment(SegmentId)}, returns a {@link Segment} when the cache holds + * an entry for the id; empty otherwise. The returned segment may not be fully loaded, callers must use async methods + * like {@link org.apache.druid.segment.CursorFactory#makeCursorHolderAsync} to download data on-demand. If the + * returned segment is only partially loaded, the synchronous methods like {@code makeCursorHolder} will fail if + * anything is still missing. If the segment is fully loaded, or not capable of partial loading, this method will + * still return a segment if it is present in cache and any async methods will function properly and return + * immediately. + */ + public Optional<Segment> acquireCachedPartialSegment(SegmentId segmentId) + { + return cacheManager.acquireCachedPartialSegment(segmentId); + } + + /** + * Partial-load variant of {@link #acquireSegment(DataSegment)}, returns an {@link AcquireSegmentAction} that Review Comment: Is the segment returned by `acquireSegment` guaranteed to be fully-loaded? (Its javadocs do not seem to make that clear.) ########## server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java: ########## @@ -536,16 +897,30 @@ private AcquireSegmentAction acquireExistingSegment(SegmentCacheEntryIdentifier location.addWeakReservationHoldIfExists(identifier) ); if (hold != null) { - if (hold.getEntry().isMounted()) { + if (!(hold.getEntry() instanceof CompleteSegmentCacheEntry complete)) { + // The eager (complete) acquire path found a non-complete entry under this id. This only arises if + // partial-load on-disk state survived a toggle of druid.segmentCache.virtualStoragePartialDownloadsEnabled + // to false (getCachedSegments reserves an on-disk partial layout regardless of the flag). The eager path + // cannot serve a partial layout; surface a clear operator error rather than a ClassCastException. + throw DruidException.forPersona(DruidException.Persona.OPERATOR) Review Comment: Two concerns: 1. Won't this also be an issue on downgrade? 2. Is it possible to repair this rather than throwing an error? As is, it will be difficult to turn off partial downloads once they are enabled. ########## multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java: ########## @@ -140,7 +140,7 @@ public synchronized Optional<Segment> acquireIfCached() throw DruidException.defensive("Segment with descriptor[%s] is already acquired", descriptor); } - final Optional<Segment> cachedSegment = segmentManager.acquireCachedSegment(segmentId); + final Optional<Segment> cachedSegment = segmentManager.acquireCachedPartialSegment(segmentId); Review Comment: Make it an option whether `RegularLoadableSegment` does a partial fetch or full fetch? It may not be a safe assumption that all MSQ implementations of query processing are going to be able to handle partial segments / async stuff. Even if all existing ones do, there's also future ones, ones from extensions, etc. ########## processing/src/main/java/org/apache/druid/segment/PartialBundleAcquirer.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.ListeningExecutorService; + +import java.io.Closeable; + +/** + * Context object that bundles together pieces of cache-layer machinery needed to do on-demand partial segment loading: + * <ol> + * <li>A hook for acquiring a hold on the cache-layer bundle.</li> + * <li>The executor on which partial downloads should run.</li> + * </ol> + */ +public interface PartialBundleAcquirer +{ + /** + * Acquire a hold on the cache-layer bundle with the given name. The returned {@link Closeable} releases the hold Review Comment: How are callers supposed to know what the bundle names are? ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; + + // Mount the cache-layer bundle BEFORE submitting downloads. This sparse-allocates the bundle's container files + // and reserves the disk-usage accounting at bundle granularity. The returned Closeable releases the hold when + // the cursor closes, allowing the cache to later evict the bundle if needed. + // PartialBundleAcquirer.acquire() throws DruidException on failure. Review Comment: This comment is too long ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) Review Comment: Given this is going to blow up, the default impl for `makeCursorHolderAsync` (which calls `makeCursorHolder`) isn't very useful, and seems likely to lead to delegating `CursorFactory` being implemented improperly. Consider deleting it, and perhaps replacing it with an abstract base class that can be used for leaf `CursorFactory` that don't have special logic for async cursor creation. ########## processing/src/main/java/org/apache/druid/segment/PartialBundleAcquirer.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.ListeningExecutorService; + +import java.io.Closeable; + +/** + * Context object that bundles together pieces of cache-layer machinery needed to do on-demand partial segment loading: + * <ol> + * <li>A hook for acquiring a hold on the cache-layer bundle.</li> + * <li>The executor on which partial downloads should run.</li> + * </ol> + */ +public interface PartialBundleAcquirer +{ + /** + * Acquire a hold on the cache-layer bundle with the given name. The returned {@link Closeable} releases the hold + * when closed (the bundle itself stays in the cache for re-use by subsequent acquires). + * <p> + * Implementations should be safe to call concurrently for different bundle names. Review Comment: "should be" or "must be"? Does the wording imply it's *not* safe to call concurrently with the same bundle name? ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; Review Comment: How did we know that the bundle name should be the projection name? Is that specified somewhere or just how things are currently implemented? ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexSegment.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.Closeable; + +/** + * {@link Segment} wrapper around a {@link PartialQueryableIndex}. Mirrors {@link QueryableIndexSegment} but wires up + * the V10-specific {@link V10TimeBoundaryInspector} (which answers from + * {@link org.apache.druid.segment.projections.ProjectionMetadata} min/max fields without downloading the + * {@code __time} column) and the partial-aware {@link PartialQueryableIndexCursorFactory} (which downloads + * required files on the supplied download executor before handing back a cursor). + * <p> + * Lifecycle: this segment is intended to exist as a transient reference-hold scope over an externally-owned Review Comment: What typically owns the `PartialQueryableIndex`? ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; + + // Mount the cache-layer bundle BEFORE submitting downloads. This sparse-allocates the bundle's container files + // and reserves the disk-usage accounting at bundle granularity. The returned Closeable releases the hold when + // the cursor closes, allowing the cache to later evict the bundle if needed. + // PartialBundleAcquirer.acquire() throws DruidException on failure. + final Closeable bundleHold = bundleAcquirer.acquire(bundleName); + + try { + // Submit one materialization task per column so a multi-threaded download executor can fan them out + final ListeningExecutorService downloadExec = bundleAcquirer.getDownloadExec(); + final List<ListenableFuture<?>> columnDownloads = new ArrayList<>(requiredColumns.size()); + for (String column : requiredColumns) { + columnDownloads.add(downloadExec.submit(() -> { + rowSelector.getColumnHolder(column); + return null; + })); + } + + // No canceler: downloads are allowed to complete even if the awaiter cancels. Once they complete and the inner + // CursorHolder is handed to asyncHolder.set(), it's owned by the asyncHolder until the awaiter takes one of + // two paths: asyncHolder.close() (cancel, closes the held holder) or asyncHolder.release() (success, transfers + // ownership to the caller). Either way the holder is drained and nothing leaks. + final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null); + Futures.addCallback( + Futures.allAsList(columnDownloads), Review Comment: Seems like a good use case for `AsyncResource.collect(columnDownloads)` and `AsyncResource.transform` (from #19539) ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; + + // Mount the cache-layer bundle BEFORE submitting downloads. This sparse-allocates the bundle's container files + // and reserves the disk-usage accounting at bundle granularity. The returned Closeable releases the hold when + // the cursor closes, allowing the cache to later evict the bundle if needed. + // PartialBundleAcquirer.acquire() throws DruidException on failure. + final Closeable bundleHold = bundleAcquirer.acquire(bundleName); + + try { + // Submit one materialization task per column so a multi-threaded download executor can fan them out + final ListeningExecutorService downloadExec = bundleAcquirer.getDownloadExec(); + final List<ListenableFuture<?>> columnDownloads = new ArrayList<>(requiredColumns.size()); + for (String column : requiredColumns) { + columnDownloads.add(downloadExec.submit(() -> { + rowSelector.getColumnHolder(column); + return null; + })); + } + + // No canceler: downloads are allowed to complete even if the awaiter cancels. Once they complete and the inner + // CursorHolder is handed to asyncHolder.set(), it's owned by the asyncHolder until the awaiter takes one of + // two paths: asyncHolder.close() (cancel, closes the held holder) or asyncHolder.release() (success, transfers + // ownership to the caller). Either way the holder is drained and nothing leaks. + final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null); + Futures.addCallback( + Futures.allAsList(columnDownloads), + new FutureCallback<>() + { + @Override + public void onSuccess(List<Object> ignored) + { + try { + final CursorHolder inner = delegate.makeCursorHolderForProjection(spec, matched); + final CursorHolder holder = wrapWithBundleRelease(inner, bundleHold); + if (!asyncHolder.set(holder)) { + // wrapper was closed while we were producing the holder; close it ourselves so it doesn't leak. + holder.close(); + } + } + catch (Throwable t) { + CloseableUtils.closeAndSuppressExceptions(bundleHold, t::addSuppressed); + asyncHolder.setException(t); + } + } + + @Override + public void onFailure(Throwable t) + { + CloseableUtils.closeAndSuppressExceptions(bundleHold, t::addSuppressed); + asyncHolder.setException(t); + } + }, + // Run the cursor-build callback inline on whichever download thread finishes last. The build itself does no + // I/O (columns are already materialized), so it doesn't need to round-trip through the download executor. + MoreExecutors.directExecutor() + ); + return asyncHolder; + } + catch (Throwable t) { + // Failure between acquire and the addCallback wiring (getDownloadExec, downloadExec.submit shut-down rejection, + // Futures.addCallback rejecting on a bad executor, etc.). Ownership of bundleHold hasn't transferred to the + // callback yet, so release it here. Already-submitted download tasks will complete with no callback wired, + // their captured row-selector refs drop naturally. + throw CloseableUtils.closeAndWrapInCatch(t, bundleHold); + } + } + + /** + * Determine the set of physical column names to required from the chosen row selector given a {@link CursorBuildSpec} + */ + private static Set<String> requiredColumns( Review Comment: nit: the `private static` is typically placed after all non-`static` methods ########## server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java: ########## @@ -515,6 +515,45 @@ public void release(CacheEntry entry) } } + /** + * Remove a {@link #weakCacheEntries} entry that currently has no outstanding holds, unlinking it from the SIEVE + * queue and terminating its phaser (which fires the underlying {@link CacheEntry#unmount}). No-op when the entry + * is absent, is a {@link #staticCacheEntries} entry, or still has outstanding holds. + * <p> + * This exists for callers that register a weak entry <em>without</em> a {@link ReservationHold} (the bootstrap + * reserve path uses {@link #reserveWeak}) and need to clean it up after a failed mount. The normal runtime path Review Comment: I think this will be useful in #19539: lingering entries on failed mount was a problem raised in review (https://github.com/apache/druid/pull/19539#discussion_r3341105641) ########## server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java: ########## @@ -526,6 +548,345 @@ public AcquireSegmentAction acquireSegment(final DataSegment dataSegment) } } + @Override + public Optional<Segment> acquireCachedPartialSegment(SegmentId segmentId) + { + if (!config.isVirtualStoragePartialDownloadsEnabled()) { + return acquireCachedSegment(segmentId); + } + return acquireCachedInternal(segmentId, false); + } + + @Override + public AcquireSegmentAction acquirePartialSegment(DataSegment dataSegment) + { + final SegmentRangeReader rangeReader = tryOpenRangeReader(dataSegment); + if (rangeReader == null) { + // Storage backend doesn't support range reads for this segment (e.g. V9 format or zipped) + return acquireSegment(dataSegment); + } + return acquirePartialInternal(dataSegment, rangeReader, false); + } + + + /** + * Shared implementation for {@link #acquireCachedSegment} and {@link #acquireCachedPartialSegment}. Walks the + * storage locations checking for existing entries. + * <p> + * When {@code requireFullyDownloaded} is {@code true} the entry must also report + * {@link SegmentCacheEntry#isFullyDownloaded()} (the contract for {@link #acquireCachedSegment}), which never + * hands back an entry that isn't behaviorally fully-materialized. When {@code false} a mounted entry is + * sufficient ({@link #acquireCachedPartialSegment}'s lazy-mount contract), even if some bundles are still + * not yet on disk. In either case {@link SegmentCacheEntry#acquireReference(Closeable)} composes the weak-entry hold + * into the returned segment's close lifecycle. + */ + private Optional<Segment> acquireCachedInternal(SegmentId segmentId, boolean requireFullyDownloaded) + { + final SegmentCacheEntryIdentifier id = new SegmentCacheEntryIdentifier(segmentId); + for (StorageLocation location : locations) { + final SegmentCacheEntry staticEntry = location.getStaticCacheEntry(id); + if (staticEntry != null) { + if (staticEntry.isMounted() && (!requireFullyDownloaded || staticEntry.isFullyDownloaded())) { + return staticEntry.acquireReference(); + } + return Optional.empty(); + } + if (!config.isVirtualStorage()) { + continue; + } + final StorageLocation.ReservationHold<SegmentCacheEntry> hold = location.addWeakReservationHoldIfExists(id); + if (hold != null) { + if (hold.getEntry().isMounted() && (!requireFullyDownloaded || hold.getEntry().isFullyDownloaded())) { + return hold.getEntry().acquireReference(hold); + } + hold.close(); + return Optional.empty(); + } + } + return Optional.empty(); + } + + /** + * Shared scaffolding for both partial acquire APIs. {@code fullDownload=false} powers + * {@link #acquirePartialSegment} (lazy mount, header bytes only; bundles mount on demand at query time); + * {@code fullDownload=true} powers the partial-eligible branch of {@link #acquireSegment} (mount + + * {@link PartialSegmentFileMapperV10#ensureAllDownloaded} so the returned segment is fully-materialized). + * <p> + * Fast path: {@link #findExistingPartialWithHold} locates an existing entry across locations under a hold. If the + * entry is already usable (mounted, and fully-downloaded when required), return an immediate-future action whose + * {@code loadCleanup} is the hold (the supplier mints fresh segments per call, each with its own metadata + * reference, so no separate cleanup is needed). + * <p> + * Slow path: under the per-segment lock, {@link #findOrReservePartial} reuses an existing not-yet-usable entry or + * reserves a fresh weak one, and the action submits mount (+ optional ensureAllDownloaded) to + * {@link #virtualStorageLoadOnDemandExec} so callers that yield on the future never block a processing thread on + * deep-storage I/O. + */ + private AcquireSegmentAction acquirePartialInternal( + DataSegment dataSegment, + SegmentRangeReader rangeReader, + boolean fullDownload Review Comment: On the `fullDownload` path I don't see stuff that is creating a cache entry and hold on the `StorageLocation` to account for the storage space. The logic seems to be fetching things outside of a hold. Is that right or did I miss it? ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; + + // Mount the cache-layer bundle BEFORE submitting downloads. This sparse-allocates the bundle's container files + // and reserves the disk-usage accounting at bundle granularity. The returned Closeable releases the hold when + // the cursor closes, allowing the cache to later evict the bundle if needed. + // PartialBundleAcquirer.acquire() throws DruidException on failure. + final Closeable bundleHold = bundleAcquirer.acquire(bundleName); + + try { + // Submit one materialization task per column so a multi-threaded download executor can fan them out + final ListeningExecutorService downloadExec = bundleAcquirer.getDownloadExec(); + final List<ListenableFuture<?>> columnDownloads = new ArrayList<>(requiredColumns.size()); + for (String column : requiredColumns) { + columnDownloads.add(downloadExec.submit(() -> { + rowSelector.getColumnHolder(column); + return null; + })); + } + + // No canceler: downloads are allowed to complete even if the awaiter cancels. Once they complete and the inner Review Comment: why not cancel them? Is it because canceling them is awkward for some reason, or you feel it's better to not cancel them? I suppose there are pros and cons. The main pro I can think of (for canceling) is that no other query may need the files, so we're wasting time fetching them. The main con is that some other query _does_ need the files, so we're going to need to re-start the fetch if we cancel it. Maybe the best behavior is to cancel it unless some other query is also waiting for the same thing. ########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; + + // Mount the cache-layer bundle BEFORE submitting downloads. This sparse-allocates the bundle's container files + // and reserves the disk-usage accounting at bundle granularity. The returned Closeable releases the hold when + // the cursor closes, allowing the cache to later evict the bundle if needed. + // PartialBundleAcquirer.acquire() throws DruidException on failure. + final Closeable bundleHold = bundleAcquirer.acquire(bundleName); + + try { + // Submit one materialization task per column so a multi-threaded download executor can fan them out + final ListeningExecutorService downloadExec = bundleAcquirer.getDownloadExec(); + final List<ListenableFuture<?>> columnDownloads = new ArrayList<>(requiredColumns.size()); + for (String column : requiredColumns) { + columnDownloads.add(downloadExec.submit(() -> { + rowSelector.getColumnHolder(column); + return null; + })); + } + + // No canceler: downloads are allowed to complete even if the awaiter cancels. Once they complete and the inner + // CursorHolder is handed to asyncHolder.set(), it's owned by the asyncHolder until the awaiter takes one of + // two paths: asyncHolder.close() (cancel, closes the held holder) or asyncHolder.release() (success, transfers + // ownership to the caller). Either way the holder is drained and nothing leaks. + final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null); + Futures.addCallback( + Futures.allAsList(columnDownloads), + new FutureCallback<>() + { + @Override + public void onSuccess(List<Object> ignored) + { + try { + final CursorHolder inner = delegate.makeCursorHolderForProjection(spec, matched); + final CursorHolder holder = wrapWithBundleRelease(inner, bundleHold); + if (!asyncHolder.set(holder)) { + // wrapper was closed while we were producing the holder; close it ourselves so it doesn't leak. + holder.close(); + } + } + catch (Throwable t) { + CloseableUtils.closeAndSuppressExceptions(bundleHold, t::addSuppressed); + asyncHolder.setException(t); + } + } + + @Override + public void onFailure(Throwable t) + { + CloseableUtils.closeAndSuppressExceptions(bundleHold, t::addSuppressed); + asyncHolder.setException(t); + } + }, + // Run the cursor-build callback inline on whichever download thread finishes last. The build itself does no + // I/O (columns are already materialized), so it doesn't need to round-trip through the download executor. + MoreExecutors.directExecutor() + ); + return asyncHolder; + } + catch (Throwable t) { + // Failure between acquire and the addCallback wiring (getDownloadExec, downloadExec.submit shut-down rejection, + // Futures.addCallback rejecting on a bad executor, etc.). Ownership of bundleHold hasn't transferred to the + // callback yet, so release it here. Already-submitted download tasks will complete with no callback wired, + // their captured row-selector refs drop naturally. + throw CloseableUtils.closeAndWrapInCatch(t, bundleHold); + } + } + + /** + * Determine the set of physical column names to required from the chosen row selector given a {@link CursorBuildSpec} + */ + private static Set<String> requiredColumns( + QueryableIndex rowSelector, + @Nullable QueryableProjection<QueryableIndex> matched, + CursorBuildSpec originalSpec + ) + { + final CursorBuildSpec effective = matched != null ? matched.getCursorBuildSpec() : originalSpec; + if (effective.getPhysicalColumns() != null) { + final Set<String> required = new LinkedHashSet<>(effective.getPhysicalColumns()); + // physicalColumns enumerates the SELECTED columns, but QueryableIndexCursorHolder also reads __time while + // building the cursor, independent of physicalColumns: unconditionally for a time-ordered index (its + // interval-checking offset reads timestamps), and via a synthesized __time range filter for a non-time-ordered + // index whose data extends past the query interval. That read happens after the cursor holder is handed back, + // so __time must be pre-fetched on the async path or it becomes a lazy deep-storage download on a processing + // thread. Predicting exactly when the holder reads __time would mean replicating its internals (fragile, and it + // reads __time in the common cases anyway), so always include it: __time is cheap, and it resolves to a + // no-download constant column for projections that don't carry a real time column. + required.add(ColumnHolder.TIME_COLUMN_NAME); + return required; + } + // Conservative fallback when physicalColumns isn't declared, fetch every column on the chosen row selector + // plus __time (which is special-cased and not enumerated by getColumnNames()). Review Comment: Seems weird. Can you consider changing it in this PR, so `getColumnNames()` does include `__time`? Looking around at the usages of `getColumnNames()`, it doesn't seem like anything is super-dependent on them not including `__time`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
