This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ce2f36ce4d feat: add async CursorFactory API and migrate MSQ frame 
processors to use it (#19397)
8ce2f36ce4d is described below

commit 8ce2f36ce4dcba7420cc47801ec23148854dde9a
Author: Clint Wylie <[email protected]>
AuthorDate: Fri May 8 03:39:31 2026 -0700

    feat: add async CursorFactory API and migrate MSQ frame processors to use 
it (#19397)
    
    changes:
    * add `AsyncCursorHolder` to manage async loading lifecycle for a cursor 
holder until ownership of the `CursorHolder` it produces can be transferred to 
the consumer (see javadoc for details)
    * add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor 
factories backed by partial downloads can do I/O without blocking processing 
threads, with a default implementation returning 
`AsyncCursorHolder.completed(makeCursorHolder(spec))` so existing 
implementations remain async-correct without changes
    * add `GroupingEngine.makeCursorHolderAsync` returning `AsyncCursorHolder`, 
and extracting shared `processWithCursorHolder` helper from 
`GroupingEngine.process()`, so that a caller which can yield and then resume 
can wait for the `CursorHolder` to be ready and later process it
    * migrate `ScanQueryFrameProcessor.runWithSegment` to call 
`makeCursorHolderAsync` and yield via `ReturnOrAwait` while the load is pending
    * migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to 
call `GroupingEngine.makeCursorHolderAsync` and yield via `ReturnOrAwait` while 
loading
---
 .../groupby/GroupByPreShuffleFrameProcessor.java   |  81 ++++--
 .../msq/querykit/scan/ScanQueryFrameProcessor.java |  49 +++-
 .../querykit/scan/ScanQueryFrameProcessorTest.java | 183 ++++++++++++
 .../apache/druid/query/groupby/GroupingEngine.java |  75 ++++-
 .../apache/druid/segment/AsyncCursorHolder.java    | 314 +++++++++++++++++++++
 .../org/apache/druid/segment/CursorFactory.java    |  13 +
 .../druid/segment/AsyncCursorHolderTest.java       | 230 +++++++++++++++
 7 files changed, 906 insertions(+), 39 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index e960e9cff79..54f1eda6299 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.querykit.groupby;
 
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.common.guava.FutureUtils;
@@ -58,8 +59,10 @@ import org.apache.druid.query.groupby.ResultRow;
 import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentMapFunction;
 import org.apache.druid.segment.TimeBoundaryInspector;
@@ -94,6 +97,18 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   private SegmentsInputSlice handedOffSegments = null;
   private Yielder<Yielder<ResultRow>> currentResultsYielder;
   private ListenableFuture<DataServerQueryResult<ResultRow>> 
dataServerQueryResultFuture;
+  @Nullable
+  private CursorFactory currentCursorFactory;
+  @Nullable
+  private TimeBoundaryInspector currentTimeBoundaryInspector;
+  /**
+   * In-flight {@link GroupingEngine#makeCursorHolderAsync} handle for the 
current segment, when {@link #resultYielder}
+   * has not yet been derived. Registered on {@link #closer} so the produced 
{@link CursorHolder} is always disposed
+   * regardless of where the underlying load is in its lifecycle. Cleared 
after we transfer ownership of the holder to
+   * {@link GroupingEngine#processCursorHolder} (which moves it onto the 
resulting Sequence's baggage closer).
+   */
+  @Nullable
+  private AsyncCursorHolder asyncCursorHolder;
 
   public GroupByPreShuffleFrameProcessor(
       final GroupByQuery query,
@@ -185,28 +200,60 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder 
segmentHolder) throws IOException
   {
     if (resultYielder == null) {
-      final Segment segment = mapSegment(segmentHolder, closer);
-      final TimeBoundaryInspector tbi = 
segment.as(TimeBoundaryInspector.class);
-      final Sequence<ResultRow> rowSequence;
+      if (asyncCursorHolder == null && currentCursorFactory == null) {
+        // First invocation for this segment: map it, check the TimeBoundary 
fast path, otherwise kick off the async
+        // cursor-holder load and cache the cursor factory + time-boundary 
inspector for the follow-up invocation.
+        final Segment segment = mapSegment(segmentHolder, closer);
+        currentTimeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
+
+        if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(
+            query,
+            currentTimeBoundaryInspector,
+            segmentHolder.getDescriptor()
+        )) {
+          // Resolve this query using the TimeBoundaryInspector, no need for a 
cursor.
+          resultYielder = Yielders.each(
+              Sequences.simple(
+                  
List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query, 
currentTimeBoundaryInspector))
+              )
+          );
+        } else {
+          currentCursorFactory = 
Objects.requireNonNull(segment.as(CursorFactory.class));
+          // Resolve this query using a cursor.
+          final GroupByQuery segmentQuery = (GroupByQuery) query
+              .withQuerySegmentSpec(new 
SpecificSegmentSpec(segmentHolder.getDescriptor()))
+              .optimizeForSegment(new 
PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
+          asyncCursorHolder = closer.register(
+              groupingEngine.makeCursorHolderAsync(
+                  segmentQuery,
+                  currentCursorFactory,
+                  null
+              )
+          );
+        }
+      }
 
-      if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(query, tbi, 
segmentHolder.getDescriptor())) {
-        // Resolve this query using the TimeBoundaryInspector, no need for a 
cursor.
-        rowSequence = 
Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query,
 tbi)));
-      } else {
-        // Resolve this query using a cursor.
-        final GroupByQuery segmentQuery = (GroupByQuery) query
-            .withQuerySegmentSpec(new 
SpecificSegmentSpec(segmentHolder.getDescriptor()))
-            .optimizeForSegment(new 
PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
-        rowSequence = groupingEngine.process(
-            segmentQuery,
-            Objects.requireNonNull(segment.as(CursorFactory.class)),
-            tbi,
+      if (asyncCursorHolder != null) {
+        if (!asyncCursorHolder.isReady()) {
+          final SettableFuture<?> awaitFuture = SettableFuture.create();
+          asyncCursorHolder.addReadyCallback(() -> awaitFuture.set(null));
+          return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture));
+        }
+        // The holder is ready, ownership of the holder transitions onto the 
returned Sequence's baggage closer
+        final CursorHolder holder = asyncCursorHolder.release();
+        asyncCursorHolder = null;
+        // currentCursorFactory is non-null whenever asyncCursorHolder is 
non-null (both are set together in the
+        // first-invocation branch above). The requireNonNull pins the 
invariant for static analysis.
+        final Sequence<ResultRow> rowSequence = 
groupingEngine.processCursorHolder(
+            query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segmentHolder.getDescriptor())),
+            Objects.requireNonNull(currentCursorFactory),
+            holder,
+            currentTimeBoundaryInspector,
             bufferPool,
             null
         );
+        resultYielder = Yielders.each(rowSequence);
       }
-
-      resultYielder = Yielders.each(rowSequence);
     }
 
     populateFrameWriterAndFlushIfNeeded();
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 82e596b3e29..0935cf63c05 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.common.guava.FutureUtils;
@@ -69,6 +70,7 @@ import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.scan.ScanQueryEngine;
 import org.apache.druid.query.scan.ScanResultValue;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.CursorFactory;
@@ -118,6 +120,14 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   private final Closer closer = Closer.create();
 
   private Cursor cursor;
+  /**
+   * In-flight {@link CursorFactory#makeCursorHolderAsync} handle for the 
current segment, when {@link #cursor} has not
+   * yet been derived. Registered on {@link #closer} as soon as it is created 
so the produced {@link CursorHolder} is
+   * always disposed regardless of where the underlying load is in its 
lifecycle. Cleared after the holder is consumed
+   * and ownership transitions to {@link #cursorCloser}.
+   */
+  @Nullable
+  private AsyncCursorHolder asyncCursorHolder;
   private ListenableFuture<DataServerQueryResult<Object[]>> 
dataServerQueryResultFuture;
   private Closeable cursorCloser;
   /**
@@ -297,21 +307,36 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder 
segmentHolder) throws IOException
   {
     if (cursor == null) {
-      final Segment segment = mapSegment(segmentHolder, closer);
-      final CursorFactory cursorFactory = segment.as(CursorFactory.class);
-      if (cursorFactory == null) {
-        throw DruidException.defensive(
-            "Null cursor factory found. Probably trying to issue a query 
against a segment being memory unmapped."
+      if (asyncCursorHolder == null) {
+        final Segment segment = mapSegment(segmentHolder, closer);
+        final CursorFactory cursorFactory = segment.as(CursorFactory.class);
+        if (cursorFactory == null) {
+          throw DruidException.defensive(
+              "Null cursor factory found. Probably trying to issue a query 
against a segment being memory unmapped."
+          );
+        }
+
+        asyncCursorHolder = closer.register(
+            cursorFactory.makeCursorHolderAsync(
+                ScanQueryEngine.makeCursorBuildSpec(
+                    query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segmentHolder.getDescriptor())),
+                    null
+                )
+            )
         );
       }
 
-      final CursorHolder nextCursorHolder =
-          cursorFactory.makeCursorHolder(
-              ScanQueryEngine.makeCursorBuildSpec(
-                  query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segmentHolder.getDescriptor())),
-                  null
-              )
-          );
+      if (!asyncCursorHolder.isReady()) {
+        final SettableFuture<?> awaitFuture = SettableFuture.create();
+        asyncCursorHolder.addReadyCallback(() -> awaitFuture.set(null));
+        return ReturnOrAwait.awaitAllFutures(ImmutableList.of(awaitFuture));
+      }
+
+      // Transfer ownership of the holder out of the AsyncCursorHolder; 
setNextCursor manages the holder's lifecycle
+      // from here on. The wrapper stays registered on closer (close() is now 
a no-op since release was called) so
+      // we don't need to track it further.
+      final CursorHolder nextCursorHolder = asyncCursorHolder.release();
+      asyncCursorHolder = null;
 
       final Cursor nextCursor;
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index 8b5952adcaf..9922be0a6c2 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -44,29 +44,40 @@ import org.apache.druid.msq.querykit.ReadableInput;
 import org.apache.druid.msq.querykit.SegmentReferenceHolder;
 import org.apache.druid.msq.test.LimitedFrameWriterFactory;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryEngine;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
+import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexCursorFactory;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.ReferenceCountedSegmentProvider;
+import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentMapFunction;
 import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
 import org.apache.druid.timeline.SegmentId;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
+import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.jupiter.api.Assertions;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
 {
@@ -262,4 +273,176 @@ public class ScanQueryFrameProcessorTest extends 
FrameProcessorTestBase
             + "2011-01-02T00:00:00.000Z/2021-01-01T00:00:00.000Z]]"))
     );
   }
+
+  /**
+   * Verifies that {@link ScanQueryFrameProcessor#runWithSegment} yields via 
{@link
+   * org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures} when 
{@link CursorFactory#makeCursorHolderAsync}
+   * returns an {@link AsyncCursorHolder} that has not yet completed, and 
resumes after it does. Exercises the partial
+   * / non-blocking I/O integration path on the MSQ side without requiring a 
real partial segment.
+   */
+  @Test
+  public void test_runWithSegments_asyncCursorHolderAwaits() throws Exception
+  {
+    final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();
+    final CursorFactory baseCursorFactory = new 
QueryableIndexCursorFactory(queryableIndex);
+    final AsyncCursorHolder deferredAsyncHolder = new AsyncCursorHolder(null);
+
+    final CursorFactory deferredCursorFactory = new CursorFactory()
+    {
+      @Override
+      public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+      {
+        return baseCursorFactory.makeCursorHolder(spec);
+      }
+
+      @Override
+      public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+      {
+        return deferredAsyncHolder;
+      }
+
+      @Override
+      public RowSignature getRowSignature()
+      {
+        return baseCursorFactory.getRowSignature();
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        return baseCursorFactory.getColumnCapabilities(column);
+      }
+    };
+
+    final QueryableIndexSegment baseSegment = new 
QueryableIndexSegment(queryableIndex, SegmentId.dummy("test"));
+    final Segment wrappedSegment = new Segment()
+    {
+      @Override
+      public SegmentId getId()
+      {
+        return baseSegment.getId();
+      }
+
+      @Override
+      public Interval getDataInterval()
+      {
+        return baseSegment.getDataInterval();
+      }
+
+      @Override
+      public void validateOrElseThrow(PolicyEnforcer policyEnforcer)
+      {
+        baseSegment.validateOrElseThrow(policyEnforcer);
+      }
+
+      @Override
+      public boolean isTombstone()
+      {
+        return baseSegment.isTombstone();
+      }
+
+      @Override
+      public String getDebugString()
+      {
+        return baseSegment.getDebugString();
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public <T> T as(@Nonnull Class<T> clazz)
+      {
+        if (CursorFactory.class.equals(clazz)) {
+          return (T) deferredCursorFactory;
+        }
+        return baseSegment.as(clazz);
+      }
+
+      @Override
+      public void close()
+      {
+        baseSegment.close();
+      }
+    };
+
+    final ScanQuery query =
+        Druids.newScanQueryBuilder()
+              .dataSource("test")
+              .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.ETERNITY)))
+              .columns(baseCursorFactory.getRowSignature().getColumnNames())
+              .build();
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+    final FrameWriterFactory frameWriterFactory = new 
LimitedFrameWriterFactory(
+        FrameWriters.makeFrameWriterFactory(
+            FrameType.latestRowBased(),
+            new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+            baseCursorFactory.getRowSignature(),
+            Collections.emptyList(),
+            false
+        ),
+        1
+    );
+
+    final ReferenceCountedSegmentProvider segmentReferenceProvider = new 
ReferenceCountedSegmentProvider(wrappedSegment);
+    final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
+        query,
+        null,
+        new DefaultObjectMapper(),
+        ReadableInput.segment(
+            new SegmentReferenceHolder(
+                new SegmentReference(
+                    SegmentId.dummy("test").toDescriptor(),
+                    segmentReferenceProvider.acquireReference(),
+                    null
+                ),
+                null,
+                null
+            )
+        ),
+        SegmentMapFunction.IDENTITY,
+        new ResourceHolder<>()
+        {
+          @Override
+          public WritableFrameChannel get()
+          {
+            return outputChannel.writable();
+          }
+
+          @Override
+          public void close()
+          {
+            try {
+              outputChannel.writable().close();
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        },
+        new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
+    );
+
+    final ListenableFuture<Object> retVal = exec.runFully(processor, null);
+
+    // Processor should be awaiting the deferred holder and have produced no 
rows yet.
+    Thread.sleep(200);
+    Assertions.assertFalse(retVal.isDone(), "processor should be awaiting the 
deferred AsyncCursorHolder");
+    Assertions.assertFalse(outputChannel.readable().canRead(), "no frames 
should have been written yet");
+
+    // Complete the load and verify the processor proceeds to produce all rows.
+    
deferredAsyncHolder.set(baseCursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query,
 null)));
+
+    final Sequence<List<Object>> rowsFromProcessor = 
FrameTestUtil.readRowsFromFrameChannel(
+        outputChannel.readable(),
+        FrameReader.create(baseCursorFactory.getRowSignature())
+    );
+
+    FrameTestUtil.assertRowsEqual(
+        FrameTestUtil.readRowsFromCursorFactory(baseCursorFactory, 
baseCursorFactory.getRowSignature(), false),
+        rowsFromProcessor
+    );
+
+    Assert.assertEquals(Unit.instance(), retVal.get(30, TimeUnit.SECONDS));
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
index 3f85bbf63e1..2b4ad7fefff 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
@@ -75,6 +75,7 @@ import 
org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
 import org.apache.druid.query.groupby.orderby.LimitSpec;
 import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.AsyncCursorHolder;
 import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
@@ -482,8 +483,49 @@ public class GroupingEngine
       @Nullable GroupByQueryMetrics groupByQueryMetrics
   )
   {
-    final GroupByQueryConfig querySpecificConfig = 
configSupplier.get().withOverrides(query);
+    validateForProcess(query, cursorFactory);
+    final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, 
groupByQueryMetrics);
+    final CursorHolder cursorHolder = 
cursorFactory.makeCursorHolder(buildSpec);
+    return processWithCursorHolder(query, cursorFactory, cursorHolder, 
timeBoundaryInspector, bufferPool, buildSpec);
+  }
 
+  /**
+   * Obtain an {@link AsyncCursorHolder} for this query's cursor build spec. 
Pairs with {@link #processCursorHolder},
+   * allowing a non-blocking caller to call this, yield until the returned 
holder is ready, and then call
+   * {@link #processCursorHolder} on its processing thread to run the actual 
aggregation.
+   */
+  public AsyncCursorHolder makeCursorHolderAsync(
+      GroupByQuery query,
+      CursorFactory cursorFactory,
+      @Nullable GroupByQueryMetrics groupByQueryMetrics
+  )
+  {
+    validateForProcess(query, cursorFactory);
+    final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, 
groupByQueryMetrics);
+    return cursorFactory.makeCursorHolderAsync(buildSpec);
+  }
+
+  /**
+   * Run the aggregation against an already-loaded {@link CursorHolder}. The 
caller is responsible for acquiring the
+   * holder (either directly from a {@link CursorFactory#makeCursorHolder} as 
we do in {@link #process} or using
+   * {@link #makeCursorHolderAsync} + {@link AsyncCursorHolder#release})
+   */
+  public Sequence<ResultRow> processCursorHolder(
+      GroupByQuery query,
+      CursorFactory cursorFactory,
+      CursorHolder cursorHolder,
+      @Nullable TimeBoundaryInspector timeBoundaryInspector,
+      NonBlockingPool<ByteBuffer> bufferPool,
+      @Nullable GroupByQueryMetrics groupByQueryMetrics
+  )
+  {
+    validateForProcess(query, cursorFactory);
+    final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, 
groupByQueryMetrics);
+    return processWithCursorHolder(query, cursorFactory, cursorHolder, 
timeBoundaryInspector, bufferPool, buildSpec);
+  }
+
+  private static void validateForProcess(GroupByQuery query, @Nullable 
CursorFactory cursorFactory)
+  {
     if (cursorFactory == null) {
       throw new ISE(
           "Null cursor factory found. Probably trying to issue a query against 
a segment being memory unmapped."
@@ -494,21 +536,35 @@ public class GroupingEngine
     if (intervals.size() != 1) {
       throw new IAE("Should only have one interval, got[%s]", intervals);
     }
+  }
 
-    final ResourceHolder<ByteBuffer> bufferHolder = bufferPool.take();
+  private Sequence<ResultRow> processWithCursorHolder(
+      GroupByQuery query,
+      CursorFactory cursorFactory,
+      CursorHolder cursorHolder,
+      @Nullable TimeBoundaryInspector timeBoundaryInspector,
+      NonBlockingPool<ByteBuffer> bufferPool,
+      CursorBuildSpec buildSpec
+  )
+  {
+    // Register the cursor holder on the closer before any work that could 
throw, so a single catch path covers
+    // every cleanup scenario (bufferPool.take() failure, pipeline 
construction failure, etc.) and the original
+    // exception is preserved with any close errors as suppressed.
+    final Closer closer = Closer.create();
+    closer.register(cursorHolder);
 
-    Closer closer = Closer.create();
-    closer.register(bufferHolder);
+    final GroupByQueryConfig querySpecificConfig;
     try {
-      final String fudgeTimestampString = 
query.context().getString(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP);
+      querySpecificConfig = configSupplier.get().withOverrides(query);
+
+      final ResourceHolder<ByteBuffer> bufferHolder = bufferPool.take();
+      closer.register(bufferHolder);
 
+      final String fudgeTimestampString = 
query.context().getString(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP);
       final DateTime fudgeTimestamp = fudgeTimestampString == null
                                       ? null
                                       : 
DateTimes.utc(Long.parseLong(fudgeTimestampString));
 
-      final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, 
groupByQueryMetrics);
-      final CursorHolder cursorHolder = 
closer.register(cursorFactory.makeCursorHolder(buildSpec));
-
       if (cursorHolder.isPreAggregated()) {
         query = 
query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
       }
@@ -546,8 +602,7 @@ public class GroupingEngine
       return result.withBaggage(closer);
     }
     catch (Throwable e) {
-      CloseableUtils.closeAndWrapExceptions(closer);
-      throw e;
+      throw CloseableUtils.closeAndWrapInCatch(e, closer);
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java 
b/processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java
new file mode 100644
index 00000000000..e682f111619
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, 
returned by
+ * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle 
management safe even when the holder is
+ * still loading: callers receive a single Closeable handle and can register 
it once with their cleanup machinery,
+ * regardless of where the underlying load is in its lifecycle.
+ * <p>
+ * The hazard this exists to avoid: returning a {@code 
ListenableFuture<CursorHolder>} (or similar future-of-Closeable)
+ * makes correct cleanup error-prone, where canceling the future or letting a 
caller fail before consuming the future
+ * can orphan the produced holder, leaking the underlying resources. By 
exposing a Closeable that internally tracks the
+ * load and disposes whatever has materialized, callers don't have to write 
that bookkeeping themselves.
+ * <p>
+ * <h3>Producer protocol</h3>
+ * Producers feed results in via {@link #set(CursorHolder)} or {@link 
#setException(Throwable)}, both of which return
+ * a boolean. If they return {@code false}, this wrapper has already been 
closed and the producer is responsible for
+ * closing whatever holder it just produced.
+ * Producers may pass a {@link Runnable} canceler at construction time which 
runs on {@link #close()} when the wrapper
+ * is closed before the {@link #set} has been called, giving the producer an 
opportunity to abort its work. The canceler
+ * is best-effort: a producer may have already produced the holder by the time 
it observes cancellation, in which case
+ * its {@link #set} call will return false and it must close the holder it 
tried to set.
+ * <p>
+ * <h3>Consumer protocol</h3>
+ * Consumers wait for {@link #isReady()} via {@link #addReadyCallback}, and 
{@link #release()} to transfer ownership of
+ * the {@link CursorHolder} (or throw the producer exception). Calling {@link 
#release()} before {@link #isReady()}
+ * returns {@code true}, multiple times, or after this holder has been closed 
will throw a {@link DruidException}.
+ * <p>
+ * For example (using {@link ReturnOrAwait} to show intended yield-then-resume 
usage pattern):
+ * <pre>{@code
+ * if (asyncHolder == null) {
+ *     asyncHolder = cursorFactory.makeCursorHolderAsync(spec);
+ *     closer.register(asyncHolder);  // safe at any lifecycle point, close() 
handles in-flight loads
+ * }
+ * if (!asyncHolder.isReady()) {
+ *     SettableFuture<?> awaitFuture = SettableFuture.create();
+ *     asyncHolder.addReadyCallback(() -> awaitFuture.set(null));
+ *     return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture));
+ * }
+ * final CursorHolder holder = asyncHolder.release();  // ownership transfers 
to the caller
+ * // ... use holder; close it when done (or hand it to a component that owns 
its lifecycle) ...
+ * }</pre>
+ */
+public class AsyncCursorHolder implements Closeable
+{
+  private static final Logger LOG = new Logger(AsyncCursorHolder.class);
+
+  /**
+   * Completed {@link AsyncCursorHolder} backed by an already available {@link 
CursorHolder}
+   */
+  public static AsyncCursorHolder completed(CursorHolder holder)
+  {
+    final AsyncCursorHolder result = new AsyncCursorHolder(null);
+    result.set(holder);
+    return result;
+  }
+
+  @Nullable
+  private final Runnable canceler;
+
+  @GuardedBy("this")
+  @Nullable
+  private CursorHolder result = null;
+  @GuardedBy("this")
+  @Nullable
+  private Throwable error = null;
+  @GuardedBy("this")
+  private boolean closed = false;
+  @GuardedBy("this")
+  private boolean disposed = false;
+  @GuardedBy("this")
+  private final List<Runnable> readyCallbacks = new ArrayList<>();
+
+  /**
+   * @param canceler optional callback invoked from {@link #close()} when the 
wrapper is closed before the load has
+   *                 completed ({@link #set} or {@link #setException}). 
Producers that support cancellation should
+   *                 provide one; producers that don't can pass {@code null}, 
in which case {@link #close()} just stops
+   *                 observing the result.
+   */
+  public AsyncCursorHolder(@Nullable Runnable canceler)
+  {
+    this.canceler = canceler;
+  }
+
+  /**
+   * Allows producer to mark the load successful with the given holder. 
Returns {@code true} if accepted, {@code false}
+   * if this wrapper has already been closed, in which case the producer is 
responsible for closing {@link CursorHolder}
+   * itself. Throws {@link DruidException} if the load was already completed 
(from prior calls to this method or
+   * {@link #setException}).
+   * <p>
+   * Callbacks registered via {@link #addReadyCallback} fire outside the lock 
to avoid lock-ordering deadlocks and
+   * unbounded lock holds.
+   */
+  public boolean set(CursorHolder holder)
+  {
+    if (holder == null) {
+      throw DruidException.defensive("CursorHolder cannot be null");
+    }
+    return setInternal(Either.value(holder));
+  }
+
+  /**
+   * Allows producer to mark the load as failed. Returns {@code true} if 
accepted, {@code false} if this wrapper has
+   * already been closed (no holder was produced, so there's nothing for the 
producer to clean up). Throws
+   * {@link DruidException} if the load was already completed (from prior 
calls to this method or {@link #set}).
+   * <p>
+   * Callbacks registered via {@link #addReadyCallback} fire outside the lock 
to avoid lock-ordering deadlocks and
+   * unbounded lock holds.
+   */
+  public boolean setException(Throwable t)
+  {
+    return setInternal(Either.error(t));
+  }
+
+  private boolean setInternal(Either<Throwable, CursorHolder> value)
+  {
+    final List<Runnable> callbacksToFire;
+    synchronized (this) {
+      if (closed) {
+        return false;
+      }
+      if (result != null || error != null) {
+        throw DruidException.defensive("AsyncCursorHolder is already 
completed");
+      }
+      if (value.isError()) {
+        error = value.error();
+      } else {
+        result = value.valueOrThrow();
+      }
+      callbacksToFire = drainCallbacks();
+    }
+    fireCallbacks(callbacksToFire);
+    return true;
+  }
+
+  /**
+   * Whether the load has completed (successfully or with failure). Once true, 
stays true. Callers that need to wait
+   * for readiness without blocking the current thread should register a 
{@link #addReadyCallback} and yield.
+   */
+  public synchronized boolean isReady()
+  {
+    return result != null || error != null;
+  }
+
+  /**
+   * Take ownership of the underlying {@link CursorHolder}. After this 
returns, {@link #close()} on this
+   * {@code AsyncCursorHolder} is a no-op; the caller is responsible for 
closing the returned holder. Useful when
+   * passing the holder to another component (e.g. a cursor-lifecycle manager) 
that takes ownership of it.
+   * <p>
+   * Throws {@link DruidException} if the holder is not yet ready, has already 
been released, or this wrapper
+   * has been closed. Wraps and rethrows the failure if the underlying load 
failed. Does not block; callers must
+   * check {@link #isReady()} first (typically by yielding via a {@link 
#addReadyCallback}-driven wait pattern).
+   */
+  public synchronized CursorHolder release()
+  {
+    if (closed) {
+      throw DruidException.defensive("AsyncCursorHolder is already closed");
+    }
+    if (disposed) {
+      throw DruidException.defensive("AsyncCursorHolder has already been 
released");
+    }
+    if (error != null) {
+      // pass through as is
+      if (error instanceof RuntimeException runtime) {
+        throw runtime;
+      } else if (error instanceof Error e) {
+        throw e;
+      }
+      throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+                          .ofCategory(DruidException.Category.UNCATEGORIZED)
+                          .build(error, error.getMessage());
+    }
+    if (result == null) {
+      throw DruidException.defensive("AsyncCursorHolder is not ready yet");
+    }
+    final CursorHolder resultToReturn = result;
+    // clear result so it can be eligible for gc
+    result = null;
+    disposed = true;
+    return resultToReturn;
+  }
+
+  /**
+   * Register a callback to fire when {@link #isReady()} becomes true (whether 
the load succeeded or failed). If the
+   * holder is already ready, the callback fires synchronously on the calling 
thread. Otherwise it fires on whatever
+   * thread invokes {@link #set} or {@link #setException}, outside the 
wrapper's lock so the callback may safely
+   * re-enter the wrapper. Multiple callbacks may be registered; each fires 
once.
+   */
+  public void addReadyCallback(Runnable callback)
+  {
+    final boolean fireImmediately;
+    synchronized (this) {
+      if (result != null || error != null) {
+        fireImmediately = true;
+      } else {
+        readyCallbacks.add(callback);
+        fireImmediately = false;
+      }
+    }
+    if (fireImmediately) {
+      callback.run();
+    }
+  }
+
+  /**
+   * Close the wrapper. Safe at any lifecycle point and idempotent:
+   * <ul>
+   *   <li>Already-loaded: closes the underlying {@link CursorHolder} 
immediately.</li>
+   *   <li>Loading in progress: invokes the canceler (if one was supplied at 
construction). The producer may still
+   *       call {@link #set} / {@link #setException} after this; if the 
producer wins the race and calls {@code set}
+   *       with a holder, {@code set} returns false and the producer is 
responsible for closing it.</li>
+   *   <li>Load failed: no-op (no holder was produced).</li>
+   *   <li>Already released: no-op.</li>
+   *   <li>Already closed: throws {@link DruidException}.</li>
+   * </ul>
+   */
+  @Override
+  public void close()
+  {
+    final CursorHolder holderToClose;
+    final Runnable cancelerToRun;
+    synchronized (this) {
+      if (closed) {
+        throw DruidException.defensive("Already closed");
+      }
+      closed = true;
+      if (disposed) {
+        // Ownership was already transferred via release(); the caller manages 
the holder lifecycle.
+        return;
+      }
+      if (result != null) {
+        // Result is here and no one has released it; we close it.
+        disposed = true;
+        holderToClose = result;
+        cancelerToRun = null;
+      } else if (error != null) {
+        // Load already failed; nothing to dispose.
+        holderToClose = null;
+        cancelerToRun = null;
+      } else {
+        // Load not yet completed; signal cancellation to the producer (if 
any).
+        holderToClose = null;
+        cancelerToRun = canceler;
+      }
+    }
+    if (holderToClose != null) {
+      try {
+        holderToClose.close();
+      }
+      catch (Throwable ignored) {
+        // Best-effort cleanup
+      }
+    }
+    if (cancelerToRun != null) {
+      try {
+        cancelerToRun.run();
+      }
+      catch (Throwable t) {
+        // Best-effort cancel
+        LOG.warn(t, "AsyncCursorHolder canceler exception");
+      }
+    }
+  }
+
+  @GuardedBy("this")
+  private List<Runnable> drainCallbacks()
+  {
+    final List<Runnable> snapshot = List.copyOf(readyCallbacks);
+    readyCallbacks.clear();
+    return snapshot;
+  }
+
+  private static void fireCallbacks(List<Runnable> callbacks)
+  {
+    for (Runnable cb : callbacks) {
+      try {
+        cb.run();
+      }
+      catch (Throwable t) {
+        // Best-effort; one bad callback shouldn't break others.
+        LOG.warn(t, "AsyncCursorHolder callback exception");
+      }
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java 
b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
index 2effd1b5fa2..4a79490adc3 100644
--- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
@@ -32,6 +32,19 @@ public interface CursorFactory extends ColumnInspector
    */
   CursorHolder makeCursorHolder(CursorBuildSpec spec);
 
+  /**
+   * Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for 
cursor factories that may need to do I/O
+   * (e.g. download column data from deep storage) before they can serve a 
cursor. Callers running on threads that
+   * must not block should use this.
+   * <p>
+   * The default implementation completes synchronously by delegating to 
{@link #makeCursorHolder(CursorBuildSpec)},
+   * which keeps every existing implementation async-correct without changes.
+   */
+  default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    return AsyncCursorHolder.completed(makeCursorHolder(spec));
+  }
+
   /**
    * Returns the {@link RowSignature} of the data available from this cursor 
factory. For mutable segments, even though
    * the signature may evolve over time, any particular object returned by 
this method is an immutable snapshot.
diff --git 
a/processing/src/test/java/org/apache/druid/segment/AsyncCursorHolderTest.java 
b/processing/src/test/java/org/apache/druid/segment/AsyncCursorHolderTest.java
new file mode 100644
index 00000000000..409f6b6a16b
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/AsyncCursorHolderTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.error.DruidException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class AsyncCursorHolderTest
+{
+  @Test
+  void testCloseAfterReleaseDoesNotDoubleCloseHolder()
+  {
+    final CountingCursorHolder holder = new CountingCursorHolder();
+    final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(holder);
+
+    final CursorHolder released = asyncHolder.release();
+    Assertions.assertSame(holder, released);
+    Assertions.assertEquals(0, holder.closeCount(), "release should not close 
the holder");
+
+    asyncHolder.close();
+    Assertions.assertEquals(0, holder.closeCount(), "close after release must 
not double-close the holder");
+  }
+
+  @Test
+  void testCloseWhenAlreadyReadyClosesHolder()
+  {
+    final CountingCursorHolder holder = new CountingCursorHolder();
+    final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(holder);
+
+    asyncHolder.close();
+    Assertions.assertEquals(1, holder.closeCount(), "close should close the 
holder when ready and not released");
+  }
+
+  @Test
+  void testCloseMultiple()
+  {
+    final CountingCursorHolder holder = new CountingCursorHolder();
+    final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(holder);
+
+    asyncHolder.close();
+    Throwable t = Assertions.assertThrows(DruidException.class, 
asyncHolder::close);
+    Assertions.assertEquals(1, holder.closeCount());
+    Assertions.assertEquals("Already closed", t.getMessage());
+  }
+
+  @Test
+  void testCloseBeforeSetInvokesCancelerAndProducerClosesOrphan()
+  {
+    final AtomicInteger cancelerCalls = new AtomicInteger();
+    final AsyncCursorHolder asyncHolder = new 
AsyncCursorHolder(cancelerCalls::incrementAndGet);
+
+    Assertions.assertFalse(asyncHolder.isReady());
+    asyncHolder.close();
+    Assertions.assertEquals(1, cancelerCalls.get(), "close before completion 
should invoke the canceler");
+
+    // Producer races and produces a holder anyway: set should return false; 
producer must close the orphan.
+    final CountingCursorHolder lateHolder = new CountingCursorHolder();
+    final boolean accepted = asyncHolder.set(lateHolder);
+    Assertions.assertFalse(accepted, "set should be rejected after close");
+    Assertions.assertEquals(0, lateHolder.closeCount(), "wrapper does NOT 
close orphan; producer is responsible");
+  }
+
+  @Test
+  void testLateProducerSetAfterCloseClosesOrphan()
+  {
+    // Simulates a producer (e.g. a future-based loader) that delivers a 
holder after the wrapper has been closed:
+    // set returns false, the producer notices, and closes the orphan itself.
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    asyncHolder.close();
+
+    final CountingCursorHolder lateHolder = new CountingCursorHolder();
+    final boolean accepted = asyncHolder.set(lateHolder);
+    Assertions.assertFalse(accepted, "set should be rejected after close");
+    // (Producer-side responsibility — wrapper does not close on the 
producer's behalf.)
+    Assertions.assertEquals(0, lateHolder.closeCount(), "wrapper does NOT 
close orphan; producer is responsible");
+  }
+
+  @Test
+  void testNoCancelerIsCalledIfLoadAlreadyCompleted()
+  {
+    final AtomicInteger cancelerCalls = new AtomicInteger();
+    final AsyncCursorHolder asyncHolder = new 
AsyncCursorHolder(cancelerCalls::incrementAndGet);
+    asyncHolder.set(new CountingCursorHolder());
+
+    asyncHolder.close();
+    Assertions.assertEquals(0, cancelerCalls.get(), "canceler must not run if 
the load already completed");
+  }
+
+  @Test
+  void testReleaseBeforeReadyThrows()
+  {
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    Assertions.assertThrows(DruidException.class, asyncHolder::release);
+  }
+
+  @Test
+  void testReleaseAfterReleaseThrows()
+  {
+    final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(new 
CountingCursorHolder());
+    asyncHolder.release();
+    Assertions.assertThrows(DruidException.class, asyncHolder::release);
+  }
+
+  @Test
+  void testReleaseAfterCloseThrows()
+  {
+    final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(new 
CountingCursorHolder());
+    asyncHolder.close();
+    Assertions.assertThrows(DruidException.class, asyncHolder::release);
+  }
+
+  @Test
+  void testReleaseAfterFailedLoadThrowsWrappedFailure()
+  {
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    final IllegalArgumentException failure = new 
IllegalArgumentException("boom");
+    asyncHolder.setException(failure);
+
+    final IllegalArgumentException thrown = 
Assertions.assertThrows(IllegalArgumentException.class, asyncHolder::release);
+    Assertions.assertSame(failure, thrown, "release should propagate the 
underlying failure");
+  }
+
+  @Test
+  void testSetAfterSetThrows()
+  {
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    asyncHolder.set(new CountingCursorHolder());
+    Assertions.assertThrows(DruidException.class, () -> asyncHolder.set(new 
CountingCursorHolder()));
+  }
+
+  @Test
+  void testSetAfterSetExceptionThrows()
+  {
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    asyncHolder.setException(new RuntimeException("boom"));
+    Assertions.assertThrows(DruidException.class, () -> asyncHolder.set(new 
CountingCursorHolder()));
+  }
+
+  @Test
+  void testAddReadyCallbackFiresImmediatelyWhenAlreadyReady()
+  {
+    final AsyncCursorHolder asyncHolder = AsyncCursorHolder.completed(new 
CountingCursorHolder());
+    final AtomicInteger fired = new AtomicInteger();
+    asyncHolder.addReadyCallback(fired::incrementAndGet);
+    Assertions.assertEquals(1, fired.get(), "callback should fire 
synchronously when already ready");
+  }
+
+  @Test
+  void testAddReadyCallbackFiresOnSet()
+  {
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    final AtomicInteger fired = new AtomicInteger();
+    asyncHolder.addReadyCallback(fired::incrementAndGet);
+    Assertions.assertEquals(0, fired.get(), "callback should not fire before 
completion");
+
+    asyncHolder.set(new CountingCursorHolder());
+    Assertions.assertEquals(1, fired.get(), "callback should fire when set is 
called");
+  }
+
+  @Test
+  void testAddReadyCallbackFiresOnSetException()
+  {
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    final AtomicInteger fired = new AtomicInteger();
+    asyncHolder.addReadyCallback(fired::incrementAndGet);
+    Assertions.assertEquals(0, fired.get(), "callback should not fire before 
completion");
+
+    asyncHolder.setException(new RuntimeException("boom"));
+    Assertions.assertEquals(1, fired.get(), "callback should fire when 
setException is called");
+  }
+
+  @Test
+  void testMultipleCallbacksAllFire()
+  {
+    final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(null);
+    final AtomicInteger fired = new AtomicInteger();
+    asyncHolder.addReadyCallback(fired::incrementAndGet);
+    asyncHolder.addReadyCallback(fired::incrementAndGet);
+    asyncHolder.addReadyCallback(fired::incrementAndGet);
+
+    asyncHolder.set(new CountingCursorHolder());
+    Assertions.assertEquals(3, fired.get(), "all registered callbacks should 
fire");
+  }
+
+  /**
+   * Minimal {@link CursorHolder} that just counts close invocations. Other 
methods are unimplemented because the
+   * tests don't exercise them.
+   */
+  private static class CountingCursorHolder implements CursorHolder
+  {
+    private int closeCount;
+
+    int closeCount()
+    {
+      return closeCount;
+    }
+
+    @Override
+    public Cursor asCursor()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close()
+    {
+      closeCount++;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to