This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ac08622872 Grouping Engine fix when a limit spec with different order 
by columns is applied (#16534)
7ac08622872 is described below

commit 7ac0862287270fcb0cfb33b01a31ae2e650621ef
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Thu Jun 20 15:05:58 2024 +0530

    Grouping Engine fix when a limit spec with different order by columns is 
applied (#16534)
---
 .../apache/druid/query/groupby/GroupByQuery.java   | 155 ++++++++++-----------
 .../apache/druid/query/groupby/GroupingEngine.java |   5 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  31 ++++-
 3 files changed, 99 insertions(+), 92 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index cdcf9e3daf4..994705f55e3 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -560,15 +560,20 @@ public class GroupByQuery extends BaseQuery<ResultRow>
     return false;
   }
 
-  /**
-   * When limit push down is applied, the partial results would be sorted by 
the ordering specified by the
-   * limit/order spec (unlike non-push down case where the results always use 
the default natural ascending order),
-   * so when merging these partial result streams, the merge needs to use the 
same ordering to get correct results.
-   */
-  private Ordering<ResultRow> getRowOrderingForPushDown(
-      final boolean granular,
-      final DefaultLimitSpec limitSpec
-  )
+  public Ordering<ResultRow> getRowOrdering(final boolean granular)
+  {
+    return getOrderingAndDimensions(granular).getRowOrdering();
+  }
+
+  public List<String> getDimensionNamesInOrder()
+  {
+    return getOrderingAndDimensions(false).getDimensions()
+                                          .stream()
+                                          .map(DimensionSpec::getOutputName)
+                                          .collect(Collectors.toList());
+  }
+
+  public OrderingAndDimensions getOrderingAndDimensions(final boolean granular)
   {
     final boolean sortByDimsFirst = getContextSortByDimsFirst();
 
@@ -577,18 +582,30 @@ public class GroupByQuery extends BaseQuery<ResultRow>
     final List<Boolean> needsReverseList = new ArrayList<>();
     final List<ColumnType> dimensionTypes = new ArrayList<>();
     final List<StringComparator> comparators = new ArrayList<>();
+    final List<DimensionSpec> dimensionsInOrder = new ArrayList<>();
 
-    for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
-      boolean needsReverse = orderSpec.getDirection() != 
OrderByColumnSpec.Direction.ASCENDING;
-      int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, 
dimensions);
-      if (dimIndex >= 0) {
-        DimensionSpec dim = dimensions.get(dimIndex);
-        
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
-        dimsInOrderBy.add(dimIndex);
-        needsReverseList.add(needsReverse);
-        final ColumnType type = dimensions.get(dimIndex).getOutputType();
-        dimensionTypes.add(type);
-        comparators.add(orderSpec.getDimensionComparator());
+    /*
+     * When limit push down is applied, the partial results would be sorted by 
the ordering specified by the
+     * limit/order spec (unlike non-push down case where the results always 
use the default natural ascending order),
+     * so when merging these partial result streams, the merge needs to use 
the same ordering to get correct results.
+     */
+    if (isApplyLimitPushDown()) {
+      DefaultLimitSpec limitSpec1 = (DefaultLimitSpec) limitSpec;
+      if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields(limitSpec1, 
dimensions)) {
+        for (OrderByColumnSpec orderSpec : ((DefaultLimitSpec) 
limitSpec).getColumns()) {
+          boolean needsReverse = orderSpec.getDirection() != 
OrderByColumnSpec.Direction.ASCENDING;
+          int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, 
dimensions);
+          if (dimIndex >= 0) {
+            DimensionSpec dim = dimensions.get(dimIndex);
+            
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
+            dimsInOrderBy.add(dimIndex);
+            needsReverseList.add(needsReverse);
+            final ColumnType type = dimensions.get(dimIndex).getOutputType();
+            dimensionTypes.add(type);
+            comparators.add(orderSpec.getDimensionComparator());
+            dimensionsInOrder.add(dim);
+          }
+        }
       }
     }
 
@@ -599,14 +616,16 @@ public class GroupByQuery extends BaseQuery<ResultRow>
         final ColumnType type = dimensions.get(i).getOutputType();
         dimensionTypes.add(type);
         comparators.add(StringComparators.NATURAL);
+        dimensionsInOrder.add(dimensions.get(i));
       }
     }
 
     final Comparator<ResultRow> timeComparator = getTimeComparator(granular);
+    Ordering<ResultRow> ordering;
 
     if (timeComparator == null) {
-      return Ordering.from(
-          (lhs, rhs) -> compareDimsForLimitPushDown(
+      ordering = Ordering.from(
+          (lhs, rhs) -> compareDims(
               orderedFieldNumbers,
               needsReverseList,
               dimensionTypes,
@@ -616,9 +635,9 @@ public class GroupByQuery extends BaseQuery<ResultRow>
           )
       );
     } else if (sortByDimsFirst) {
-      return Ordering.from(
+      ordering = Ordering.from(
           (lhs, rhs) -> {
-            final int cmp = compareDimsForLimitPushDown(
+            final int cmp = compareDims(
                 orderedFieldNumbers,
                 needsReverseList,
                 dimensionTypes,
@@ -634,7 +653,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
           }
       );
     } else {
-      return Ordering.from(
+      ordering = Ordering.from(
           (lhs, rhs) -> {
             final int timeCompare = timeComparator.compare(lhs, rhs);
 
@@ -642,7 +661,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
               return timeCompare;
             }
 
-            return compareDimsForLimitPushDown(
+            return compareDims(
                 orderedFieldNumbers,
                 needsReverseList,
                 dimensionTypes,
@@ -653,45 +672,8 @@ public class GroupByQuery extends BaseQuery<ResultRow>
           }
       );
     }
-  }
-
-  public Ordering<ResultRow> getRowOrdering(final boolean granular)
-  {
-    if (isApplyLimitPushDown()) {
-      if 
(!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) 
limitSpec, dimensions)) {
-        return getRowOrderingForPushDown(granular, (DefaultLimitSpec) 
limitSpec);
-      }
-    }
-
-    final boolean sortByDimsFirst = getContextSortByDimsFirst();
-    final Comparator<ResultRow> timeComparator = getTimeComparator(granular);
-
-    if (timeComparator == null) {
-      return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs));
-    } else if (sortByDimsFirst) {
-      return Ordering.from(
-          (lhs, rhs) -> {
-            final int cmp = compareDims(dimensions, lhs, rhs);
-            if (cmp != 0) {
-              return cmp;
-            }
-
-            return timeComparator.compare(lhs, rhs);
-          }
-      );
-    } else {
-      return Ordering.from(
-          (lhs, rhs) -> {
-            final int timeCompare = timeComparator.compare(lhs, rhs);
 
-            if (timeCompare != 0) {
-              return timeCompare;
-            }
-
-            return compareDims(dimensions, lhs, rhs);
-          }
-      );
-    }
+    return new OrderingAndDimensions(ordering, dimensionsInOrder);
   }
 
   @Nullable
@@ -716,25 +698,6 @@ public class GroupByQuery extends BaseQuery<ResultRow>
     }
   }
 
-  private int compareDims(List<DimensionSpec> dimensions, ResultRow lhs, 
ResultRow rhs)
-  {
-    final int dimensionStart = getResultRowDimensionStart();
-
-    for (int i = 0; i < dimensions.size(); i++) {
-      DimensionSpec dimension = dimensions.get(i);
-      final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
-          lhs.get(dimensionStart + i),
-          rhs.get(dimensionStart + i),
-          dimension.getOutputType()
-      );
-      if (dimCompare != 0) {
-        return dimCompare;
-      }
-    }
-
-    return 0;
-  }
-
   /**
    * Computes the timestamp that will be returned by {@link 
#getUniversalTimestamp()}.
    */
@@ -760,12 +723,12 @@ public class GroupByQuery extends BaseQuery<ResultRow>
   }
 
   /**
-   * Compares the dimensions for limit pushdown.
+   * Compares the dimensions.
    *
    * Due to legacy reason, the provided StringComparator for the arrays isn't 
applied and must be changed once we
    * get rid of the StringComparators for array types
    */
-  private static int compareDimsForLimitPushDown(
+  private static int compareDims(
       final IntList fields,
       final List<Boolean> needsReverseList,
       final List<ColumnType> dimensionTypes,
@@ -924,6 +887,28 @@ public class GroupByQuery extends BaseQuery<ResultRow>
     }
   }
 
+  public static class OrderingAndDimensions
+  {
+    Ordering<ResultRow> rowOrdering;
+    List<DimensionSpec> dimensions;
+
+    public OrderingAndDimensions(Ordering<ResultRow> rowOrdering, 
List<DimensionSpec> dimensions)
+    {
+      this.rowOrdering = rowOrdering;
+      this.dimensions = dimensions;
+    }
+
+    public Ordering<ResultRow> getRowOrdering()
+    {
+      return rowOrdering;
+    }
+
+    public List<DimensionSpec> getDimensions()
+    {
+      return dimensions;
+    }
+  }
+
   public static class Builder
   {
     @Nullable
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
index 6451fb9b943..ab1ee1052b4 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
@@ -686,8 +686,7 @@ public class GroupingEngine
           processingConfig.intermediateComputeSizeBytes()
       );
 
-      List<String> queryDimNames = 
baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName)
-                                                    
.collect(Collectors.toList());
+      List<String> queryDimNamesInOrder = 
baseSubtotalQuery.getDimensionNamesInOrder();
 
       // Only needed to make LimitSpec.filterColumns(..) call later in case 
base query has a non default LimitSpec.
       Set<String> aggsAndPostAggs = null;
@@ -724,7 +723,7 @@ public class GroupingEngine
             .withLimitSpec(subtotalQueryLimitSpec);
 
         final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal = 
resultSupplierOne;
-        if (Utils.isPrefix(subtotalSpec, queryDimNames)) {
+        if (Utils.isPrefix(subtotalSpec, queryDimNamesInOrder)) {
           // Since subtotalSpec is a prefix of base query dimensions, so 
results from base query are also sorted
           // by subtotalSpec as needed by stream merging.
           subtotalsResults.add(
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1975f5589e6..9a0a0318210 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -13823,10 +13823,8 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
                         .build()
         ),
         ImmutableList.<Object[]>builder().add(
-            new Object[]{"", null, 2L},
-            new Object[]{"a", null, 1L},
-            new Object[]{"", null, 1L},
-            new Object[]{"a", null, 1L},
+            new Object[]{"", null, 3L},
+            new Object[]{"a", null, 2L},
             new Object[]{"abc", null, 1L},
             new Object[]{NULL_STRING, null, 6L},
             new Object[]{"", timestamp("2000-01-01"), 2L},
@@ -16290,4 +16288,29 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
             )
         ).run();
   }
+
+  @SqlTestFrameworkConfig.NumMergeBuffers(3)
+  @Test
+  public void testGroupingSetsWithDifferentOrderLimitSpec()
+  {
+    msqIncompatible();
+    testBuilder()
+        .sql(
+            "SELECT\n"
+            + "  isNew, isRobot, COUNT(*) AS \"Cnt\"\n"
+            + "FROM \"wikipedia\"\n"
+            + "GROUP BY GROUPING SETS ((isRobot), (isNew))\n"
+            + "ORDER BY 2, 1\n"
+            + "limit 100"
+        )
+        .expectedResults(
+            ResultMatchMode.RELAX_NULLS,
+            ImmutableList.of(
+                new Object[]{"false", null, 36966L},
+                new Object[]{"true", null, 2278L},
+                new Object[]{null, "false", 23824L},
+                new Object[]{null, "true", 15420L}
+            )
+        ).run();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to