This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7ac08622872 Grouping Engine fix when a limit spec with different order
by columns is applied (#16534)
7ac08622872 is described below
commit 7ac0862287270fcb0cfb33b01a31ae2e650621ef
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Thu Jun 20 15:05:58 2024 +0530
Grouping Engine fix when a limit spec with different order by columns is
applied (#16534)
---
.../apache/druid/query/groupby/GroupByQuery.java | 155 ++++++++++-----------
.../apache/druid/query/groupby/GroupingEngine.java | 5 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 31 ++++-
3 files changed, 99 insertions(+), 92 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index cdcf9e3daf4..994705f55e3 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -560,15 +560,20 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return false;
}
- /**
- * When limit push down is applied, the partial results would be sorted by
the ordering specified by the
- * limit/order spec (unlike non-push down case where the results always use
the default natural ascending order),
- * so when merging these partial result streams, the merge needs to use the
same ordering to get correct results.
- */
- private Ordering<ResultRow> getRowOrderingForPushDown(
- final boolean granular,
- final DefaultLimitSpec limitSpec
- )
+ public Ordering<ResultRow> getRowOrdering(final boolean granular)
+ {
+ return getOrderingAndDimensions(granular).getRowOrdering();
+ }
+
+ public List<String> getDimensionNamesInOrder()
+ {
+ return getOrderingAndDimensions(false).getDimensions()
+ .stream()
+ .map(DimensionSpec::getOutputName)
+ .collect(Collectors.toList());
+ }
+
+ public OrderingAndDimensions getOrderingAndDimensions(final boolean granular)
{
final boolean sortByDimsFirst = getContextSortByDimsFirst();
@@ -577,18 +582,30 @@ public class GroupByQuery extends BaseQuery<ResultRow>
final List<Boolean> needsReverseList = new ArrayList<>();
final List<ColumnType> dimensionTypes = new ArrayList<>();
final List<StringComparator> comparators = new ArrayList<>();
+ final List<DimensionSpec> dimensionsInOrder = new ArrayList<>();
- for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
- boolean needsReverse = orderSpec.getDirection() !=
OrderByColumnSpec.Direction.ASCENDING;
- int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec,
dimensions);
- if (dimIndex >= 0) {
- DimensionSpec dim = dimensions.get(dimIndex);
-
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
- dimsInOrderBy.add(dimIndex);
- needsReverseList.add(needsReverse);
- final ColumnType type = dimensions.get(dimIndex).getOutputType();
- dimensionTypes.add(type);
- comparators.add(orderSpec.getDimensionComparator());
+ /*
+ * When limit push down is applied, the partial results would be sorted by
the ordering specified by the
+ * limit/order spec (unlike non-push down case where the results always
use the default natural ascending order),
+ * so when merging these partial result streams, the merge needs to use
the same ordering to get correct results.
+ */
+ if (isApplyLimitPushDown()) {
+ DefaultLimitSpec limitSpec1 = (DefaultLimitSpec) limitSpec;
+ if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields(limitSpec1,
dimensions)) {
+ for (OrderByColumnSpec orderSpec : ((DefaultLimitSpec)
limitSpec).getColumns()) {
+ boolean needsReverse = orderSpec.getDirection() !=
OrderByColumnSpec.Direction.ASCENDING;
+ int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec,
dimensions);
+ if (dimIndex >= 0) {
+ DimensionSpec dim = dimensions.get(dimIndex);
+
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
+ dimsInOrderBy.add(dimIndex);
+ needsReverseList.add(needsReverse);
+ final ColumnType type = dimensions.get(dimIndex).getOutputType();
+ dimensionTypes.add(type);
+ comparators.add(orderSpec.getDimensionComparator());
+ dimensionsInOrder.add(dim);
+ }
+ }
}
}
@@ -599,14 +616,16 @@ public class GroupByQuery extends BaseQuery<ResultRow>
final ColumnType type = dimensions.get(i).getOutputType();
dimensionTypes.add(type);
comparators.add(StringComparators.NATURAL);
+ dimensionsInOrder.add(dimensions.get(i));
}
}
final Comparator<ResultRow> timeComparator = getTimeComparator(granular);
+ Ordering<ResultRow> ordering;
if (timeComparator == null) {
- return Ordering.from(
- (lhs, rhs) -> compareDimsForLimitPushDown(
+ ordering = Ordering.from(
+ (lhs, rhs) -> compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
@@ -616,9 +635,9 @@ public class GroupByQuery extends BaseQuery<ResultRow>
)
);
} else if (sortByDimsFirst) {
- return Ordering.from(
+ ordering = Ordering.from(
(lhs, rhs) -> {
- final int cmp = compareDimsForLimitPushDown(
+ final int cmp = compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
@@ -634,7 +653,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
}
);
} else {
- return Ordering.from(
+ ordering = Ordering.from(
(lhs, rhs) -> {
final int timeCompare = timeComparator.compare(lhs, rhs);
@@ -642,7 +661,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return timeCompare;
}
- return compareDimsForLimitPushDown(
+ return compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
@@ -653,45 +672,8 @@ public class GroupByQuery extends BaseQuery<ResultRow>
}
);
}
- }
-
- public Ordering<ResultRow> getRowOrdering(final boolean granular)
- {
- if (isApplyLimitPushDown()) {
- if
(!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec)
limitSpec, dimensions)) {
- return getRowOrderingForPushDown(granular, (DefaultLimitSpec)
limitSpec);
- }
- }
-
- final boolean sortByDimsFirst = getContextSortByDimsFirst();
- final Comparator<ResultRow> timeComparator = getTimeComparator(granular);
-
- if (timeComparator == null) {
- return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs));
- } else if (sortByDimsFirst) {
- return Ordering.from(
- (lhs, rhs) -> {
- final int cmp = compareDims(dimensions, lhs, rhs);
- if (cmp != 0) {
- return cmp;
- }
-
- return timeComparator.compare(lhs, rhs);
- }
- );
- } else {
- return Ordering.from(
- (lhs, rhs) -> {
- final int timeCompare = timeComparator.compare(lhs, rhs);
- if (timeCompare != 0) {
- return timeCompare;
- }
-
- return compareDims(dimensions, lhs, rhs);
- }
- );
- }
+ return new OrderingAndDimensions(ordering, dimensionsInOrder);
}
@Nullable
@@ -716,25 +698,6 @@ public class GroupByQuery extends BaseQuery<ResultRow>
}
}
- private int compareDims(List<DimensionSpec> dimensions, ResultRow lhs,
ResultRow rhs)
- {
- final int dimensionStart = getResultRowDimensionStart();
-
- for (int i = 0; i < dimensions.size(); i++) {
- DimensionSpec dimension = dimensions.get(i);
- final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
- lhs.get(dimensionStart + i),
- rhs.get(dimensionStart + i),
- dimension.getOutputType()
- );
- if (dimCompare != 0) {
- return dimCompare;
- }
- }
-
- return 0;
- }
-
/**
* Computes the timestamp that will be returned by {@link
#getUniversalTimestamp()}.
*/
@@ -760,12 +723,12 @@ public class GroupByQuery extends BaseQuery<ResultRow>
}
/**
- * Compares the dimensions for limit pushdown.
+ * Compares the dimensions.
*
* Due to legacy reason, the provided StringComparator for the arrays isn't
applied and must be changed once we
* get rid of the StringComparators for array types
*/
- private static int compareDimsForLimitPushDown(
+ private static int compareDims(
final IntList fields,
final List<Boolean> needsReverseList,
final List<ColumnType> dimensionTypes,
@@ -924,6 +887,28 @@ public class GroupByQuery extends BaseQuery<ResultRow>
}
}
+ public static class OrderingAndDimensions
+ {
+ Ordering<ResultRow> rowOrdering;
+ List<DimensionSpec> dimensions;
+
+ public OrderingAndDimensions(Ordering<ResultRow> rowOrdering,
List<DimensionSpec> dimensions)
+ {
+ this.rowOrdering = rowOrdering;
+ this.dimensions = dimensions;
+ }
+
+ public Ordering<ResultRow> getRowOrdering()
+ {
+ return rowOrdering;
+ }
+
+ public List<DimensionSpec> getDimensions()
+ {
+ return dimensions;
+ }
+ }
+
public static class Builder
{
@Nullable
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
index 6451fb9b943..ab1ee1052b4 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
@@ -686,8 +686,7 @@ public class GroupingEngine
processingConfig.intermediateComputeSizeBytes()
);
- List<String> queryDimNames =
baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName)
-
.collect(Collectors.toList());
+ List<String> queryDimNamesInOrder =
baseSubtotalQuery.getDimensionNamesInOrder();
// Only needed to make LimitSpec.filterColumns(..) call later in case
base query has a non default LimitSpec.
Set<String> aggsAndPostAggs = null;
@@ -724,7 +723,7 @@ public class GroupingEngine
.withLimitSpec(subtotalQueryLimitSpec);
final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal =
resultSupplierOne;
- if (Utils.isPrefix(subtotalSpec, queryDimNames)) {
+ if (Utils.isPrefix(subtotalSpec, queryDimNamesInOrder)) {
// Since subtotalSpec is a prefix of base query dimensions, so
results from base query are also sorted
// by subtotalSpec as needed by stream merging.
subtotalsResults.add(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1975f5589e6..9a0a0318210 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -13823,10 +13823,8 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
.build()
),
ImmutableList.<Object[]>builder().add(
- new Object[]{"", null, 2L},
- new Object[]{"a", null, 1L},
- new Object[]{"", null, 1L},
- new Object[]{"a", null, 1L},
+ new Object[]{"", null, 3L},
+ new Object[]{"a", null, 2L},
new Object[]{"abc", null, 1L},
new Object[]{NULL_STRING, null, 6L},
new Object[]{"", timestamp("2000-01-01"), 2L},
@@ -16290,4 +16288,29 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
)
).run();
}
+
+ @SqlTestFrameworkConfig.NumMergeBuffers(3)
+ @Test
+ public void testGroupingSetsWithDifferentOrderLimitSpec()
+ {
+ msqIncompatible();
+ testBuilder()
+ .sql(
+ "SELECT\n"
+ + " isNew, isRobot, COUNT(*) AS \"Cnt\"\n"
+ + "FROM \"wikipedia\"\n"
+ + "GROUP BY GROUPING SETS ((isRobot), (isNew))\n"
+ + "ORDER BY 2, 1\n"
+ + "limit 100"
+ )
+ .expectedResults(
+ ResultMatchMode.RELAX_NULLS,
+ ImmutableList.of(
+ new Object[]{"false", null, 36966L},
+ new Object[]{"true", null, 2278L},
+ new Object[]{null, "false", 23824L},
+ new Object[]{null, "true", 15420L}
+ )
+ ).run();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]