This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ee9e42f03f fix issue with ScanQueryFrameProcessor cursor build not 
adjusting intervals (#17168)
6ee9e42f03f is described below

commit 6ee9e42f03f6c2b4d4b34e5ea528fe9351c37e33
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Sep 26 08:39:14 2024 -0700

    fix issue with ScanQueryFrameProcessor cursor build not adjusting intervals 
(#17168)
    
    * fix issue with ScanQueryFrameProcessor cursor build not adjusting 
intervals
    
    * all hail the robot overlords
    
    * style bot
---
 .../msq/querykit/scan/ScanQueryFrameProcessor.java |  17 +++-
 .../scan/ScanQueryFrameProcessorFactory.java       |   3 +-
 .../querykit/scan/ScanQueryFrameProcessorTest.java | 104 ++++++++++++++++++++-
 3 files changed, 119 insertions(+), 5 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index dbcd271cccf..06dce22a189 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -40,6 +40,7 @@ import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.frame.write.InvalidFieldException;
 import org.apache.druid.frame.write.InvalidNullByteException;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.Unit;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -63,6 +64,8 @@ import org.apache.druid.query.Order;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.scan.ScanQueryEngine;
 import org.apache.druid.query.scan.ScanResultValue;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.CompleteSegment;
 import org.apache.druid.segment.Cursor;
@@ -259,7 +262,12 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
       }
 
       final CursorHolder nextCursorHolder =
-          
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, 
null));
+          cursorFactory.makeCursorHolder(
+              ScanQueryEngine.makeCursorBuildSpec(
+                  query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segment.getDescriptor())),
+                  null
+              )
+          );
       final Cursor nextCursor = nextCursorHolder.asCursor();
 
       if (nextCursor == null) {
@@ -305,7 +313,12 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         }
 
         final CursorHolder nextCursorHolder =
-            
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, 
null));
+            cursorFactory.makeCursorHolder(
+                ScanQueryEngine.makeCursorBuildSpec(
+                    query.withQuerySegmentSpec(new 
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
+                    null
+                )
+            );
         final Cursor nextCursor = nextCursorHolder.asCursor();
 
         if (nextCursor == null) {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index 97ade19f5bc..1636e117740 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -56,8 +56,7 @@ public class ScanQueryFrameProcessorFactory extends 
BaseLeafFrameProcessorFactor
   {
     super(query);
     this.query = Preconditions.checkNotNull(query, "query");
-    this.runningCountForLimit =
-        query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() 
: null;
+    this.runningCountForLimit = query.isLimited() && 
query.getOrderBys().isEmpty() ? new AtomicLong() : null;
   }
 
   @JsonProperty
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index bfb511f949f..af0a7203570 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -19,9 +19,11 @@
 
 package org.apache.druid.msq.querykit.scan;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.collections.StupidResourceHolder;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
 import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
@@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Unit;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.input.table.RichSegmentDescriptor;
+import org.apache.druid.msq.input.table.SegmentWithDescriptor;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.StagePartition;
 import org.apache.druid.msq.querykit.FrameProcessorTestBase;
@@ -46,10 +50,15 @@ import org.apache.druid.msq.test.LimitedFrameWriterFactory;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.CompleteSegment;
 import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
+import org.apache.druid.timeline.SegmentId;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -60,6 +69,91 @@ import java.util.function.Function;
 
 public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
 {
+
+  @Test
+  public void test_runWithSegments() throws Exception
+  {
+    final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();
+
+    final CursorFactory cursorFactory =
+        new QueryableIndexCursorFactory(queryableIndex);
+
+    // put funny intervals on query to ensure it is adjusted to the segment 
interval before building cursor
+    final ScanQuery query =
+        Druids.newScanQueryBuilder()
+              .dataSource("test")
+              .intervals(
+                  new MultipleIntervalSegmentSpec(
+                      ImmutableList.of(
+                          Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
+                          Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
+                      )
+                  )
+              )
+              .columns(cursorFactory.getRowSignature().getColumnNames())
+              .build();
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    // Limit output frames to 1 row to ensure we test edge cases
+    final FrameWriterFactory frameWriterFactory = new 
LimitedFrameWriterFactory(
+        FrameWriters.makeRowBasedFrameWriterFactory(
+            new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+            cursorFactory.getRowSignature(),
+            Collections.emptyList(),
+            false
+        ),
+        1
+    );
+
+    final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
+        query,
+        null,
+        new DefaultObjectMapper(),
+        ReadableInput.segment(
+            new SegmentWithDescriptor(
+                () -> new StupidResourceHolder<>(new CompleteSegment(null, new 
QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))),
+                new RichSegmentDescriptor(queryableIndex.getDataInterval(), 
queryableIndex.getDataInterval(), "dummy_version", 0)
+            )
+        ),
+        Function.identity(),
+        new ResourceHolder<WritableFrameChannel>()
+        {
+          @Override
+          public WritableFrameChannel get()
+          {
+            return outputChannel.writable();
+          }
+
+          @Override
+          public void close()
+          {
+            try {
+              outputChannel.writable().close();
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        },
+        new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
+    );
+
+    ListenableFuture<Object> retVal = exec.runFully(processor, null);
+
+    final Sequence<List<Object>> rowsFromProcessor = 
FrameTestUtil.readRowsFromFrameChannel(
+        outputChannel.readable(),
+        FrameReader.create(cursorFactory.getRowSignature())
+    );
+
+    FrameTestUtil.assertRowsEqual(
+        FrameTestUtil.readRowsFromCursorFactory(cursorFactory, 
cursorFactory.getRowSignature(), false),
+        rowsFromProcessor
+    );
+
+    Assert.assertEquals(Unit.instance(), retVal.get());
+  }
+
   @Test
   public void test_runWithInputChannel() throws Exception
   {
@@ -83,10 +177,18 @@ public class ScanQueryFrameProcessorTest extends 
FrameProcessorTestBase
       }
     }
 
+    // put funny intervals on query to ensure it is adjusted to the segment 
interval before building cursor
     final ScanQuery query =
         Druids.newScanQueryBuilder()
               .dataSource("test")
-              .intervals(new 
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+              .intervals(
+                  new MultipleIntervalSegmentSpec(
+                      ImmutableList.of(
+                          Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
+                          Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
+                      )
+                  )
+              )
               .columns(cursorFactory.getRowSignature().getColumnNames())
               .build();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to