This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 22443ab87e3 Fix an issue with passing order by and limit to realtime
tasks (#15301)
22443ab87e3 is described below
commit 22443ab87e397981bbebf606558f7c241686d8d7
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Nov 2 11:38:26 2023 +0530
Fix an issue with passing order by and limit to realtime tasks (#15301)
While running queries on real time tasks using MSQ, there is an issue with
queries with certain order by columns.
If the query specifies a non time column, the query is planned as it is
supported by MSQ. However, this throws an exception when passed to real time
tasks once as the native query stack does not support it. This PR resolves this
by removing the ordering from the query before contacting real time tasks.
Fixes a bug with MSQ while reading data from real time tasks with non
time ordering
---
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 24 +++++++-
.../druid/msq/exec/MSQLoadedSegmentTests.java | 66 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 278a9c251de..ff15e116df6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -57,6 +57,7 @@ import org.apache.druid.msq.input.external.ExternalSegment;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.query.Druids;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.scan.ScanQuery;
@@ -78,6 +79,7 @@ import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -172,13 +174,31 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
}).map(List::toArray);
}
+ /**
+ * Prepares the scan query to be sent to a data server.
+ * If the query contains a non-time ordering, removes the ordering and
limit, as the native query stack does not
+ * support it.
+ */
+ private static ScanQuery prepareScanQueryForDataServer(@NotNull ScanQuery
scanQuery)
+ {
+ if (ScanQuery.Order.NONE.equals(scanQuery.getTimeOrder()) &&
!scanQuery.getOrderBys().isEmpty()) {
+ return Druids.ScanQueryBuilder.copy(scanQuery)
+ .orderBy(ImmutableList.of())
+ .limit(0)
+ .build();
+ } else {
+ return scanQuery;
+ }
+ }
+
@Override
protected ReturnOrAwait<Unit> runWithLoadedSegment(final
SegmentWithDescriptor segment) throws IOException
{
if (cursor == null) {
+ ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
final Pair<LoadedSegmentDataProvider.DataServerQueryStatus,
Yielder<Object[]>> statusSequencePair =
segment.fetchRowsFromDataServer(
- query,
+ preparedQuery,
ScanQueryFrameProcessor::mappingFunction,
closer
);
@@ -188,7 +208,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
return runWithSegment(segment);
}
- RowSignature rowSignature = ScanQueryKit.getAndValidateSignature(query,
jsonMapper);
+ RowSignature rowSignature =
ScanQueryKit.getAndValidateSignature(preparedQuery, jsonMapper);
Pair<Cursor, Closeable> cursorFromIterable =
IterableRowsCursorHelper.getCursorFromYielder(
statusSequencePair.rhs,
rowSignature
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
index b2c07e267e4..ae10c16a008 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
@@ -38,6 +38,7 @@ import
org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@@ -50,6 +51,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -57,6 +59,7 @@ import java.io.IOException;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -148,6 +151,69 @@ public class MSQLoadedSegmentTests extends MSQTestBase
.verifyResults();
}
+ @Test
+ public void testSelectWithLoadedSegmentsOnFooWithOrderBy() throws IOException
+ {
+ RowSignature resultSignature = RowSignature.builder()
+ .add("cnt", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .build();
+
+ doAnswer(
+ invocationOnMock -> {
+ ScanQuery query = invocationOnMock.getArgument(0);
+ ScanQuery.verifyOrderByForNativeExecution(query);
+ Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit());
+ return Pair.of(
+ LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS,
+ Yielders.each(
+ Sequences.simple(
+ ImmutableList.of(
+ new Object[]{1L, "qwe"},
+ new Object[]{1L, "tyu"}
+ )
+ )
+ )
+ );
+ }
+
+ )
+ .when(loadedSegmentDataProvider)
+ .fetchRowsFromDataServer(any(), any(), any(), any());
+
+ testSelectQuery()
+ .setSql("select cnt, dim1 from foo order by dim1")
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(
+ newScanQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("cnt", "dim1")
+ .orderBy(ImmutableList.of(new
ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING)))
+
.context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature))
+ .build()
+ )
+ .columnMappings(ColumnMappings.identity(resultSignature))
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(TaskReportMSQDestination.INSTANCE)
+ .build()
+ )
+ .setQueryContext(REALTIME_QUERY_CTX)
+ .setExpectedRowSignature(resultSignature)
+ .setExpectedResultRows(ImmutableList.of(
+ new Object[]{1L, ""},
+ new Object[]{1L, "1"},
+ new Object[]{1L, "10.1"},
+ new Object[]{1L, "2"},
+ new Object[]{1L, "abc"},
+ new Object[]{1L, "def"},
+ new Object[]{1L, "qwe"},
+ new Object[]{1L, "tyu"}
+ ))
+ .verifyResults();
+ }
+
@Test
public void testGroupByWithLoadedSegmentsOnFoo() throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]