This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 22443ab87e3 Fix an issue with passing order by and limit to realtime 
tasks (#15301)
22443ab87e3 is described below

commit 22443ab87e397981bbebf606558f7c241686d8d7
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Nov 2 11:38:26 2023 +0530

    Fix an issue with passing order by and limit to realtime tasks (#15301)
    
    While running queries on real time tasks using MSQ, there is an issue with 
queries with certain order by columns.
    
    If the query specifies a non time column, the query is planned as it is 
supported by MSQ. However, this throws an exception when passed to real time 
tasks once as the native query stack does not support it. This PR resolves this 
by removing the ordering from the query before contacting real time tasks.
    
        Fixes a bug with MSQ while reading data from real time tasks with non 
time ordering
---
 .../msq/querykit/scan/ScanQueryFrameProcessor.java | 24 +++++++-
 .../druid/msq/exec/MSQLoadedSegmentTests.java      | 66 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 2 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 278a9c251de..ff15e116df6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -57,6 +57,7 @@ import org.apache.druid.msq.input.external.ExternalSegment;
 import org.apache.druid.msq.input.table.SegmentWithDescriptor;
 import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
 import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.query.Druids;
 import org.apache.druid.query.IterableRowsCursorHelper;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.scan.ScanQuery;
@@ -78,6 +79,7 @@ import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -172,13 +174,31 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
     }).map(List::toArray);
   }
 
+  /**
+   * Prepares the scan query to be sent to a data server.
+   * If the query contains a non-time ordering, removes the ordering and 
limit, as the native query stack does not
+   * support it.
+   */
+  private static ScanQuery prepareScanQueryForDataServer(@NotNull ScanQuery 
scanQuery)
+  {
+    if (ScanQuery.Order.NONE.equals(scanQuery.getTimeOrder()) && 
!scanQuery.getOrderBys().isEmpty()) {
+      return Druids.ScanQueryBuilder.copy(scanQuery)
+                                    .orderBy(ImmutableList.of())
+                                    .limit(0)
+                                    .build();
+    } else {
+      return scanQuery;
+    }
+  }
+
   @Override
   protected ReturnOrAwait<Unit> runWithLoadedSegment(final 
SegmentWithDescriptor segment) throws IOException
   {
     if (cursor == null) {
+      ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
       final Pair<LoadedSegmentDataProvider.DataServerQueryStatus, 
Yielder<Object[]>> statusSequencePair =
           segment.fetchRowsFromDataServer(
-              query,
+              preparedQuery,
               ScanQueryFrameProcessor::mappingFunction,
               closer
           );
@@ -188,7 +208,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         return runWithSegment(segment);
       }
 
-      RowSignature rowSignature = ScanQueryKit.getAndValidateSignature(query, 
jsonMapper);
+      RowSignature rowSignature = 
ScanQueryKit.getAndValidateSignature(preparedQuery, jsonMapper);
       Pair<Cursor, Closeable> cursorFromIterable = 
IterableRowsCursorHelper.getCursorFromYielder(
           statusSequencePair.rhs,
           rowSignature
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
index b2c07e267e4..ae10c16a008 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
@@ -38,6 +38,7 @@ import 
org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
@@ -50,6 +51,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -57,6 +59,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 
@@ -148,6 +151,69 @@ public class MSQLoadedSegmentTests extends MSQTestBase
         .verifyResults();
   }
 
+  @Test
+  public void testSelectWithLoadedSegmentsOnFooWithOrderBy() throws IOException
+  {
+    RowSignature resultSignature = RowSignature.builder()
+                                               .add("cnt", ColumnType.LONG)
+                                               .add("dim1", ColumnType.STRING)
+                                               .build();
+
+    doAnswer(
+        invocationOnMock -> {
+          ScanQuery query = invocationOnMock.getArgument(0);
+          ScanQuery.verifyOrderByForNativeExecution(query);
+          Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit());
+          return Pair.of(
+              LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS,
+              Yielders.each(
+                  Sequences.simple(
+                      ImmutableList.of(
+                          new Object[]{1L, "qwe"},
+                          new Object[]{1L, "tyu"}
+                      )
+                  )
+              )
+          );
+        }
+
+    )
+        .when(loadedSegmentDataProvider)
+        .fetchRowsFromDataServer(any(), any(), any(), any());
+
+    testSelectQuery()
+        .setSql("select cnt, dim1 from foo order by dim1")
+        .setExpectedMSQSpec(
+            MSQSpec.builder()
+                   .query(
+                       newScanQueryBuilder()
+                           .dataSource(CalciteTests.DATASOURCE1)
+                           .intervals(querySegmentSpec(Filtration.eternity()))
+                           .columns("cnt", "dim1")
+                           .orderBy(ImmutableList.of(new 
ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING)))
+                           
.context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature))
+                           .build()
+                   )
+                   .columnMappings(ColumnMappings.identity(resultSignature))
+                   .tuningConfig(MSQTuningConfig.defaultConfig())
+                   .destination(TaskReportMSQDestination.INSTANCE)
+                   .build()
+        )
+        .setQueryContext(REALTIME_QUERY_CTX)
+        .setExpectedRowSignature(resultSignature)
+        .setExpectedResultRows(ImmutableList.of(
+            new Object[]{1L, ""},
+            new Object[]{1L, "1"},
+            new Object[]{1L, "10.1"},
+            new Object[]{1L, "2"},
+            new Object[]{1L, "abc"},
+            new Object[]{1L, "def"},
+            new Object[]{1L, "qwe"},
+            new Object[]{1L, "tyu"}
+        ))
+        .verifyResults();
+  }
+
   @Test
   public void testGroupByWithLoadedSegmentsOnFoo() throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to