This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6ee9e42f03f fix issue with ScanQueryFrameProcessor cursor build not
adjusting intervals (#17168)
6ee9e42f03f is described below
commit 6ee9e42f03f6c2b4d4b34e5ea528fe9351c37e33
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Sep 26 08:39:14 2024 -0700
fix issue with ScanQueryFrameProcessor cursor build not adjusting intervals
(#17168)
* fix issue with ScanQueryFrameProcessor cursor build not adjusting
intervals
* all hail the robot overlords
* style bot
---
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 17 +++-
.../scan/ScanQueryFrameProcessorFactory.java | 3 +-
.../querykit/scan/ScanQueryFrameProcessorTest.java | 104 ++++++++++++++++++++-
3 files changed, 119 insertions(+), 5 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index dbcd271cccf..06dce22a189 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -40,6 +40,7 @@ import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.InvalidFieldException;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -63,6 +64,8 @@ import org.apache.druid.query.Order;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Cursor;
@@ -259,7 +262,12 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
}
final CursorHolder nextCursorHolder =
-
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query,
null));
+ cursorFactory.makeCursorHolder(
+ ScanQueryEngine.makeCursorBuildSpec(
+ query.withQuerySegmentSpec(new
SpecificSegmentSpec(segment.getDescriptor())),
+ null
+ )
+ );
final Cursor nextCursor = nextCursorHolder.asCursor();
if (nextCursor == null) {
@@ -305,7 +313,12 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
}
final CursorHolder nextCursorHolder =
-
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query,
null));
+ cursorFactory.makeCursorHolder(
+ ScanQueryEngine.makeCursorBuildSpec(
+ query.withQuerySegmentSpec(new
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
+ null
+ )
+ );
final Cursor nextCursor = nextCursorHolder.asCursor();
if (nextCursor == null) {
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index 97ade19f5bc..1636e117740 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -56,8 +56,7 @@ public class ScanQueryFrameProcessorFactory extends
BaseLeafFrameProcessorFactor
{
super(query);
this.query = Preconditions.checkNotNull(query, "query");
- this.runningCountForLimit =
- query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong()
: null;
+ this.runningCountForLimit = query.isLimited() &&
query.getOrderBys().isEmpty() ? new AtomicLong() : null;
}
@JsonProperty
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index bfb511f949f..af0a7203570 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -19,9 +19,11 @@
package org.apache.druid.msq.querykit.scan;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.collections.StupidResourceHolder;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
@@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.input.table.RichSegmentDescriptor;
+import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.FrameProcessorTestBase;
@@ -46,10 +50,15 @@ import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
+import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Test;
@@ -60,6 +69,91 @@ import java.util.function.Function;
public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
{
+
+ @Test
+ public void test_runWithSegments() throws Exception
+ {
+ final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();
+
+ final CursorFactory cursorFactory =
+ new QueryableIndexCursorFactory(queryableIndex);
+
+ // put funny intervals on query to ensure it is adjusted to the segment
interval before building cursor
+ final ScanQuery query =
+ Druids.newScanQueryBuilder()
+ .dataSource("test")
+ .intervals(
+ new MultipleIntervalSegmentSpec(
+ ImmutableList.of(
+ Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
+ Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
+ )
+ )
+ )
+ .columns(cursorFactory.getRowSignature().getColumnNames())
+ .build();
+
+ final BlockingQueueFrameChannel outputChannel =
BlockingQueueFrameChannel.minimal();
+
+ // Limit output frames to 1 row to ensure we test edge cases
+ final FrameWriterFactory frameWriterFactory = new
LimitedFrameWriterFactory(
+ FrameWriters.makeRowBasedFrameWriterFactory(
+ new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+ cursorFactory.getRowSignature(),
+ Collections.emptyList(),
+ false
+ ),
+ 1
+ );
+
+ final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
+ query,
+ null,
+ new DefaultObjectMapper(),
+ ReadableInput.segment(
+ new SegmentWithDescriptor(
+ () -> new StupidResourceHolder<>(new CompleteSegment(null, new
QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))),
+ new RichSegmentDescriptor(queryableIndex.getDataInterval(),
queryableIndex.getDataInterval(), "dummy_version", 0)
+ )
+ ),
+ Function.identity(),
+ new ResourceHolder<WritableFrameChannel>()
+ {
+ @Override
+ public WritableFrameChannel get()
+ {
+ return outputChannel.writable();
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ outputChannel.writable().close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
+ );
+
+ ListenableFuture<Object> retVal = exec.runFully(processor, null);
+
+ final Sequence<List<Object>> rowsFromProcessor =
FrameTestUtil.readRowsFromFrameChannel(
+ outputChannel.readable(),
+ FrameReader.create(cursorFactory.getRowSignature())
+ );
+
+ FrameTestUtil.assertRowsEqual(
+ FrameTestUtil.readRowsFromCursorFactory(cursorFactory,
cursorFactory.getRowSignature(), false),
+ rowsFromProcessor
+ );
+
+ Assert.assertEquals(Unit.instance(), retVal.get());
+ }
+
@Test
public void test_runWithInputChannel() throws Exception
{
@@ -83,10 +177,18 @@ public class ScanQueryFrameProcessorTest extends
FrameProcessorTestBase
}
}
+ // put funny intervals on query to ensure it is adjusted to the segment
interval before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
- .intervals(new
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+ .intervals(
+ new MultipleIntervalSegmentSpec(
+ ImmutableList.of(
+ Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
+ Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
+ )
+ )
+ )
.columns(cursorFactory.getRowSignature().getColumnNames())
.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]