FrankChen021 commented on code in PR #19535: URL: https://github.com/apache/druid/pull/19535#discussion_r3348717808
########## processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Order; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Partial-aware {@link CursorFactory} for {@link PartialQueryableIndex}. + * <p> + * <b>Sync vs async contract.</b> {@link #makeCursorHolder} requires the segment to already be fully downloaded, + * intended for callers that acquired the segment via the eager (download-everything-up-front) path, so by the time + * they ask for a cursor every internal file is on disk. If anything is missing it throws + * {@link DruidException#defensive} so that we never trigger downloads on the sync path, since processing threads must + * not block on deep-storage I/O. {@link #makeCursorHolderAsync} is the only path that performs downloads on demand; + * callers acknowledge that by opting into the async variant when they acquire a partial segment. + * <p> + * <b>Async download granularity.</b> Pre-fetch is column-level. {@link #makeCursorHolderAsync} calls + * {@link QueryableIndex#getColumnHolder} on each required column; the memoized supplier on the underlying + * {@link PartialQueryableIndex} eagerly invokes + * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} inside that call, which is what triggers + * the deep-storage range read. The cursor holder constructed afterward sees the already-materialized holders via the + * same memoized suppliers, so no further downloads happen at cursor-read time. + * <p> + * If a projection matches, the required columns are looked up against the projection's row selector and its rewritten + * {@link CursorBuildSpec} (which carries physical columns in the projection's namespace). When + * {@link CursorBuildSpec#getPhysicalColumns()} is {@code null}, every column on the chosen row selector is pre-fetched + * as required by the contract of {@link CursorBuildSpec}. + * <p> + * <b>Parallelism.</b> Each column's materialization is submitted as a separate task to the supplied download executor. + * The cursor holder is constructed once every column task has completed. + */ +public class PartialQueryableIndexCursorFactory implements CursorFactory +{ + private final PartialQueryableIndex index; + private final QueryableIndexCursorFactory delegate; + private final PartialBundleAcquirer bundleAcquirer; + + public PartialQueryableIndexCursorFactory( + PartialQueryableIndex index, + TimeBoundaryInspector timeBoundaryInspector, + PartialBundleAcquirer bundleAcquirer + ) + { + this.index = index; + this.delegate = new QueryableIndexCursorFactory(index, timeBoundaryInspector); + this.bundleAcquirer = bundleAcquirer; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + // refuse to download here so we never accidentally block a processing thread + if (!index.isFullyDownloaded()) { + throw DruidException.defensive( + "Sync makeCursorHolder requires the segment to be fully downloaded; use makeCursorHolderAsync for " + + "on-demand loading, or acquire the segment via the eager path so all files are loaded up front." + ); + } + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final QueryableProjection<QueryableIndex> matched = index.getProjection(spec); + final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; + final Set<String> requiredColumns = requiredColumns(rowSelector, matched, spec); + final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; + + // Mount the cache-layer bundle BEFORE submitting downloads. This sparse-allocates the bundle's container files + // and reserves the disk-usage accounting at bundle granularity. The returned Closeable releases the hold when + // the cursor closes, allowing the cache to later evict the bundle if needed. + // PartialBundleAcquirer.acquire() throws DruidException on failure. + final Closeable bundleHold = bundleAcquirer.acquire(bundleName); + // Release the bundle hold at most once: the canceler, the success-path holder close, and the failure callback can + // all race to release it. + final AtomicBoolean holdReleased = new AtomicBoolean(false); + final Closeable releaseHoldOnce = () -> { + if (holdReleased.compareAndSet(false, true)) { + bundleHold.close(); + } + }; + + try { + // Submit one materialization task per column so a multi-threaded download executor can fan them out + final ListeningExecutorService downloadExec = bundleAcquirer.getDownloadExec(); + final List<ListenableFuture<?>> columnDownloads = new ArrayList<>(requiredColumns.size()); + for (String column : requiredColumns) { + columnDownloads.add(downloadExec.submit(() -> { + rowSelector.getColumnHolder(column); + return null; + })); + } + + // Canceler runs if the awaiter closes this holder before it's ready (e.g. query cancel/timeout). cancel(true) + // stops column downloads that haven't begun their deep-storage read yet: queued tasks are skipped, and tasks + // parked on the download executor's permit are interrupted out of the (interruptible) wait before doing any I/O. + // A download already in its read/write loop runs to completion. After canceling, release the bundle hold. Once + // the holder is produced and handed to set(), ownership transfers to the awaiter, which drains it via + // close() (cancel) or release() (success); the once-guard keeps the hold release safe across all of these paths. + final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(() -> { + for (ListenableFuture<?> columnDownload : columnDownloads) { + columnDownload.cancel(true); + } + CloseableUtils.closeAndSuppressExceptions(releaseHoldOnce, ignored -> {}); Review Comment: [P1] Keep bundle holds until running downloads finish When an async cursor is closed after a column download has already entered rowSelector.getColumnHolder/mapFile, cancel(true) does not wait for that callable to stop; the code even documents that in-flight read/write loops may run to completion. The canceler still releases the bundle hold immediately, which drops the bundle reference and storage hold while mapper work may still be using the container. That allows SIEVE/ephemeral eviction or a pending unmount to reach PartialSegmentFileMapperV10.evictContainer despite mapFile/ensureFilesAvailable still being in flight, violating that mapper's concurrency contract and risking deletion/unmap of a container being read or written. Defer releasing the bundle hold/reference until submitted downloads have actually returned, or add per-download protection so eviction cannot run while a started download continues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
