This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f9e81b8c07 MSQ: Push down limits in ScanQueryFrameProcessor. (#18441)
7f9e81b8c07 is described below

commit 7f9e81b8c0777c238eb51db039cc442ef74c5631
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Sep 5 15:31:02 2025 -0700

    MSQ: Push down limits in ScanQueryFrameProcessor. (#18441)
    
    When the scan query sorts by something that matches the underlying
    cursor, stop reading early.
---
 .../msq/querykit/scan/ScanQueryFrameProcessor.java | 106 +++++++++++++++++----
 1 file changed, 87 insertions(+), 19 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index f000b13208d..a17c38cbf6b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -64,6 +64,7 @@ import org.apache.druid.msq.querykit.QueryKitUtils;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.IterableRowsCursorHelper;
 import org.apache.druid.query.Order;
+import org.apache.druid.query.OrderBy;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.scan.ScanQueryEngine;
 import org.apache.druid.query.scan.ScanResultValue;
@@ -101,8 +102,14 @@ import java.util.stream.Collectors;
 public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
 {
   private static final Logger log = new Logger(ScanQueryFrameProcessor.class);
+  private static final int NO_LIMIT = -1;
+
   private final ScanQuery query;
-  private final AtomicLong runningCountForLimit;
+  /**
+   * Running count of rows emitted, shared across all processors generated by 
the same {@link ScanQueryStageProcessor}.
+   * Only set when {@link ScanQuery#getOrderBys()} is empty.
+   */
+  private final AtomicLong sharedRunningCountForLimit;
   private final ObjectMapper jsonMapper;
   private final SettableLongVirtualColumn partitionBoostVirtualColumn;
   private final VirtualColumns frameWriterVirtualColumns;
@@ -117,9 +124,19 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
   private SegmentsInputSlice handedOffSegments = null;
 
+  /**
+   * Number of rows read so far from the current cursor.
+   */
+  private long cursorRowsRead;
+
+  /**
+   * Limit to be pushed down into the current cursor, or a negative number if 
no limit is pushed down.
+   */
+  private long cursorPushDownLimit = NO_LIMIT;
+
   public ScanQueryFrameProcessor(
       final ScanQuery query,
-      @Nullable final AtomicLong runningCountForLimit,
+      @Nullable final AtomicLong sharedRunningCountForLimit,
       final ObjectMapper jsonMapper,
       final ReadableInput baseInput,
       final SegmentMapFunction segmentMapFn,
@@ -134,7 +151,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         frameWriterFactoryHolder
     );
     this.query = query;
-    this.runningCountForLimit = runningCountForLimit;
+    this.sharedRunningCountForLimit = sharedRunningCountForLimit;
     this.jsonMapper = jsonMapper;
     this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
 
@@ -154,8 +171,12 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   @Override
   public ReturnOrAwait<Object> runIncrementally(final IntSet readableInputs) 
throws IOException
   {
-    if (runningCountForLimit != null
-        && runningCountForLimit.get() > query.getScanRowsOffset() + 
query.getScanRowsLimit()) {
+    if (sharedRunningCountForLimit != null
+        && sharedRunningCountForLimit.get() >= getQueryOffsetPlusLimit()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    }
+
+    if ((cursorPushDownLimit >= 0 && cursorRowsRead >= cursorPushDownLimit)) {
       return ReturnOrAwait.returnObject(Unit.instance());
     }
 
@@ -244,7 +265,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         cursorYielder.close();
         return ReturnOrAwait.returnObject(handedOffSegments);
       } else {
-        final long rowsFlushed = setNextCursor(cursorYielder.get(), null, 
null);
+        final long rowsFlushed = setNextCursor(cursorYielder.get(), 
Collections.emptyList(), null, null);
         closer.register(cursorYielder);
         if (rowsFlushed > 0) {
           return ReturnOrAwait.runAgain();
@@ -307,7 +328,12 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         nextCursorHolder.close();
         return ReturnOrAwait.returnObject(Unit.instance());
       } else {
-        final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, 
segmentHolder.get().getSegment());
+        final long rowsFlushed = setNextCursor(
+            nextCursor,
+            nextCursorHolder.getOrdering(),
+            nextCursorHolder,
+            segmentHolder.get().getSegment()
+        );
         assert rowsFlushed == 0; // There's only ever one cursor when running 
with a segment
       }
     }
@@ -359,7 +385,12 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
           nextCursorHolder.close();
           return ReturnOrAwait.returnObject(Unit.instance());
         }
-        final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, 
frameSegment);
+        final long rowsFlushed = setNextCursor(
+            nextCursor,
+            nextCursorHolder.getOrdering(),
+            nextCursorHolder,
+            frameSegment
+        );
 
         if (rowsFlushed > 0) {
           return ReturnOrAwait.runAgain();
@@ -414,23 +445,34 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
     createFrameWriterIfNeeded();
 
     while (!cursor.isDone()) {
-      if (!frameWriter.addSelection()) {
-        if (frameWriter.getNumRows() > 0) {
-          final long numRowsWritten = flushFrameWriter();
+      boolean flush;
 
-          if (runningCountForLimit != null) {
-            runningCountForLimit.addAndGet(numRowsWritten);
-          }
+      if (frameWriter.addSelection()) {
+        cursorRowsRead++;
+        cursor.advance();
+        cursorOffset.increment();
+        
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 
1);
 
-          return;
-        } else {
+        // Flush if we reached cursorPushDownLimit.
+        flush = cursorPushDownLimit >= 0 && cursorRowsRead >= 
cursorPushDownLimit;
+      } else {
+        // addSelection failed because the frame is full.
+        if (frameWriter.getNumRows() == 0) {
           throw new FrameRowTooLargeException(currentAllocatorCapacity);
         }
+
+        flush = true;
       }
 
-      cursor.advance();
-      cursorOffset.increment();
-      
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 
1);
+      if (flush) {
+        final long numRowsWritten = flushFrameWriter();
+
+        if (sharedRunningCountForLimit != null) {
+          sharedRunningCountForLimit.addAndGet(numRowsWritten);
+        }
+
+        break;
+      }
     }
   }
 
@@ -465,6 +507,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
 
   private long setNextCursor(
       final Cursor cursor,
+      final List<OrderBy> ordering,
       @Nullable final Closeable cursorCloser,
       final Segment segment
   ) throws IOException
@@ -479,6 +522,16 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
     this.cursorCloser = cursorCloser;
     this.segment = segment;
     this.cursorOffset.reset();
+    this.cursorRowsRead = 0;
+
+    if (query.isLimited()
+        && ordering.size() >= query.getOrderBys().size()
+        && query.getOrderBys().equals(ordering.subList(0, 
query.getOrderBys().size()))) {
+      cursorPushDownLimit = getQueryOffsetPlusLimit();
+    } else {
+      cursorPushDownLimit = NO_LIMIT;
+    }
+
     return rowsFlushed;
   }
 
@@ -497,4 +550,19 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
     }
     return baseColumnSelectorFactory;
   }
+
+  /**
+   * Returns the {@link ScanQuery#getScanRowsOffset()} plus {@link 
ScanQuery#getScanRowsLimit()}, or
+   * {@link Long#MAX_VALUE} if that addition would overflow.
+   */
+  private long getQueryOffsetPlusLimit()
+  {
+    final long sum = query.getScanRowsOffset() + query.getScanRowsLimit();
+    if (sum < 0) {
+      // Overflow
+      return Long.MAX_VALUE;
+    } else {
+      return sum;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to