This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/28.0.0 by this push:
new 093cfb84ba3 Fix an issue with passing order by and limit to realtime
tasks (#15301) (#15308)
093cfb84ba3 is described below
commit 093cfb84ba35eb687555413b8b3295930a4626b9
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Nov 2 15:17:27 2023 +0530
Fix an issue with passing order by and limit to realtime tasks (#15301)
(#15308)
While running queries on real time tasks using MSQ, there is an issue with
queries with certain order by columns.
If the query specifies a non time column, the query is planned as it is
supported by MSQ. However, this throws an exception when passed to real time
tasks once as the native query stack does not support it. This PR resolves this
by removing the ordering from the query before contacting real time tasks.
Fixes a bug with MSQ while reading data from real time tasks with non
time ordering
---
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 24 +++++++-
.../druid/msq/exec/MSQLoadedSegmentTests.java | 66 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 278a9c251de..ff15e116df6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -57,6 +57,7 @@ import org.apache.druid.msq.input.external.ExternalSegment;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.query.Druids;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.scan.ScanQuery;
@@ -78,6 +79,7 @@ import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -172,13 +174,31 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
}).map(List::toArray);
}
+ /**
+ * Prepares the scan query to be sent to a data server.
+ * If the query contains a non-time ordering, removes the ordering and
limit, as the native query stack does not
+ * support it.
+ */
+ private static ScanQuery prepareScanQueryForDataServer(@NotNull ScanQuery
scanQuery)
+ {
+ if (ScanQuery.Order.NONE.equals(scanQuery.getTimeOrder()) &&
!scanQuery.getOrderBys().isEmpty()) {
+ return Druids.ScanQueryBuilder.copy(scanQuery)
+ .orderBy(ImmutableList.of())
+ .limit(0)
+ .build();
+ } else {
+ return scanQuery;
+ }
+ }
+
@Override
protected ReturnOrAwait<Unit> runWithLoadedSegment(final
SegmentWithDescriptor segment) throws IOException
{
if (cursor == null) {
+ ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
final Pair<LoadedSegmentDataProvider.DataServerQueryStatus,
Yielder<Object[]>> statusSequencePair =
segment.fetchRowsFromDataServer(
- query,
+ preparedQuery,
ScanQueryFrameProcessor::mappingFunction,
closer
);
@@ -188,7 +208,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
return runWithSegment(segment);
}
- RowSignature rowSignature = ScanQueryKit.getAndValidateSignature(query,
jsonMapper);
+ RowSignature rowSignature =
ScanQueryKit.getAndValidateSignature(preparedQuery, jsonMapper);
Pair<Cursor, Closeable> cursorFromIterable =
IterableRowsCursorHelper.getCursorFromYielder(
statusSequencePair.rhs,
rowSignature
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
index b2c07e267e4..ae10c16a008 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
@@ -38,6 +38,7 @@ import
org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@@ -50,6 +51,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -57,6 +59,7 @@ import java.io.IOException;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -148,6 +151,69 @@ public class MSQLoadedSegmentTests extends MSQTestBase
.verifyResults();
}
+ @Test
+ public void testSelectWithLoadedSegmentsOnFooWithOrderBy() throws IOException
+ {
+ RowSignature resultSignature = RowSignature.builder()
+ .add("cnt", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .build();
+
+ doAnswer(
+ invocationOnMock -> {
+ ScanQuery query = invocationOnMock.getArgument(0);
+ ScanQuery.verifyOrderByForNativeExecution(query);
+ Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit());
+ return Pair.of(
+ LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS,
+ Yielders.each(
+ Sequences.simple(
+ ImmutableList.of(
+ new Object[]{1L, "qwe"},
+ new Object[]{1L, "tyu"}
+ )
+ )
+ )
+ );
+ }
+
+ )
+ .when(loadedSegmentDataProvider)
+ .fetchRowsFromDataServer(any(), any(), any(), any());
+
+ testSelectQuery()
+ .setSql("select cnt, dim1 from foo order by dim1")
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(
+ newScanQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("cnt", "dim1")
+ .orderBy(ImmutableList.of(new
ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING)))
+
.context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature))
+ .build()
+ )
+ .columnMappings(ColumnMappings.identity(resultSignature))
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(TaskReportMSQDestination.INSTANCE)
+ .build()
+ )
+ .setQueryContext(REALTIME_QUERY_CTX)
+ .setExpectedRowSignature(resultSignature)
+ .setExpectedResultRows(ImmutableList.of(
+ new Object[]{1L, ""},
+ new Object[]{1L, "1"},
+ new Object[]{1L, "10.1"},
+ new Object[]{1L, "2"},
+ new Object[]{1L, "abc"},
+ new Object[]{1L, "def"},
+ new Object[]{1L, "qwe"},
+ new Object[]{1L, "tyu"}
+ ))
+ .verifyResults();
+ }
+
@Test
public void testGroupByWithLoadedSegmentsOnFoo() throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]