This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a83a7fbaa1 optimize `order by sorted ASC, unsorted` and `order by
DESC` cases (#8979)
a83a7fbaa1 is described below
commit a83a7fbaa117da873b5fc042842d6622cefce4de
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Oct 20 19:14:11 2022 +0200
optimize `order by sorted ASC, unsorted` and `order by DESC` cases (#8979)
---
.../request/context/OrderByExpressionContext.java | 4 +
.../AcquireReleaseColumnsSegmentOperator.java | 1 +
.../blocks/results/SelectionResultsBlock.java | 33 +-
.../core/operator/combine/BaseCombineOperator.java | 18 +-
...xValueBasedSelectionOrderByCombineOperator.java | 31 +-
.../combine/SelectionOrderByCombineOperator.java | 9 +-
.../core/operator/query/DistinctOperator.java | 1 +
.../query/LinearSelectionOrderByOperator.java | 426 +++++++++++++++++++++
.../core/operator/query/SelectionOnlyOperator.java | 1 +
.../operator/query/SelectionOrderByOperator.java | 188 +--------
.../SelectionPartiallyOrderedByAscOperator.java | 82 ++++
.../SelectionPartiallyOrderedByDescOperation.java | 105 +++++
.../apache/pinot/core/plan/SelectionPlanNode.java | 171 ++++++---
.../core/query/utils/OrderByComparatorFactory.java | 155 ++++++++
.../apache/pinot/core/util/QueryOptionsUtils.java | 5 +
.../combine/SelectionCombineOperatorTest.java | 6 +-
.../query/LinearSelectionOrderByOperatorTest.java | 142 +++++++
...InnerSegmentSelectionMultiValueQueriesTest.java | 4 +-
...erSegmentSelectionMultiValueRawQueriesTest.java | 4 +-
...nnerSegmentSelectionSingleValueQueriesTest.java | 179 ++++++++-
.../apache/pinot/perf/BenchmarkOrderByQueries.java | 256 +++++++++++++
.../segment/spi/datasource/DataSourceMetadata.java | 2 +
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
23 files changed, 1565 insertions(+), 260 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
index f47ed35b63..fbe1a83882 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
@@ -43,6 +43,10 @@ public class OrderByExpressionContext {
return _isAsc;
}
+ public boolean isDesc() {
+ return !_isAsc;
+ }
+
/**
* Adds the columns (IDENTIFIER expressions) in the order-by expression to
the given set.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index b581a3fca4..4d79eeea4a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -92,6 +92,7 @@ public class AcquireReleaseColumnsSegmentOperator extends
BaseOperator<BaseResul
return Collections.singletonList(_childOperator);
}
+ @Override
public IndexSegment getIndexSegment() {
return _indexSegment;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 2a7fd5847c..025fc091af 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -18,8 +18,13 @@
*/
package org.apache.pinot.core.operator.blocks.results;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -32,10 +37,21 @@ import
org.apache.pinot.core.query.selection.SelectionOperatorUtils;
public class SelectionResultsBlock extends BaseResultsBlock {
private final DataSchema _dataSchema;
private final Collection<Object[]> _rows;
+ private final Comparator<? super Object[]> _comparator;
- public SelectionResultsBlock(DataSchema dataSchema, Collection<Object[]>
rows) {
+ public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) {
+ this(dataSchema, rows, null);
+ }
+
+ public SelectionResultsBlock(DataSchema dataSchema, PriorityQueue<Object[]>
rows) {
+ this(dataSchema, rows, rows.comparator());
+ }
+
+ public SelectionResultsBlock(DataSchema dataSchema, Collection<Object[]>
rows,
+ @Nullable Comparator<? super Object[]> comparator) {
_dataSchema = dataSchema;
_rows = rows;
+ _comparator = comparator;
}
public DataSchema getDataSchema() {
@@ -56,6 +72,21 @@ public class SelectionResultsBlock extends BaseResultsBlock {
return _rows;
}
+ public SelectionResultsBlock convertToPriorityQueueBased() {
+ if (_rows instanceof PriorityQueue) {
+ return this;
+ }
+ Preconditions.checkState(_comparator != null, "No comparator specified in
the results block");
+ PriorityQueue<Object[]> result = new PriorityQueue<>(_comparator);
+ result.addAll(_rows);
+ return new SelectionResultsBlock(_dataSchema, result);
+ }
+
+ public PriorityQueue<Object[]> getRowsAsPriorityQueue() {
+ Preconditions.checkState(_rows instanceof PriorityQueue, "The results
block is not backed by a priority queue");
+ return (PriorityQueue<Object[]>) _rows;
+ }
+
@Override
public DataTable getDataTable(QueryContext queryContext)
throws IOException {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 59ae72430d..b99b12d334 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -168,6 +168,7 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
((AcquireReleaseColumnsSegmentOperator) operator).release();
}
}
+
if (isQuerySatisfied(resultsBlock)) {
// Query is satisfied, skip processing the remaining segments
_blockingQueue.offer(resultsBlock);
@@ -216,7 +217,7 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
return blockToMerge;
}
if (mergedBlock == null) {
- mergedBlock = (T) blockToMerge;
+ mergedBlock = convertToMergeableBlock((T) blockToMerge);
} else {
mergeResultsBlocks(mergedBlock, (T) blockToMerge);
}
@@ -237,19 +238,30 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
}
/**
- * Can be overridden for early termination.
+ * Can be overridden for early termination. The input results block might
not be mergeable.
*/
protected boolean isQuerySatisfied(T resultsBlock) {
return false;
}
/**
- * Merge an IntermediateResultsBlock into the main IntermediateResultsBlock.
+ * Merges a results block into the main mergeable results block.
* <p>NOTE: {@code blockToMerge} should contain the result for a segment
without any exception. The errored segment
* result is already handled.
+ *
+ * @param mergedBlock The block that accumulates previous results. It should
be modified to add the information of the
+ * other block.
+ * @param blockToMerge The new block that needs to be merged into the
mergedBlock.
*/
protected abstract void mergeResultsBlocks(T mergedBlock, T blockToMerge);
+ /**
+ * Converts the given results block into a mergeable results block if
necessary.
+ */
+ protected T convertToMergeableBlock(T resultsBlock) {
+ return resultsBlock;
+ }
+
@Override
public List<Operator> getChildOperators() {
return _operators;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index e818a6dde7..e73d272280 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -211,11 +211,21 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
((AcquireReleaseColumnsSegmentOperator) operator).release();
}
}
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
- if (selectionResult != null && selectionResult.size() == _numRowsToKeep)
{
+ Collection<Object[]> rows = resultsBlock.getRows();
+ if (rows != null && rows.size() >= _numRowsToKeep) {
// Segment result has enough rows, update the boundary value
- assert selectionResult.peek() != null;
- Comparable segmentBoundaryValue = (Comparable)
selectionResult.peek()[0];
+
+ Comparable segmentBoundaryValue;
+ if (rows instanceof PriorityQueue) {
+ // Results from SelectionOrderByOperator
+ assert ((PriorityQueue<Object[]>) rows).peek() != null;
+ segmentBoundaryValue = (Comparable) ((PriorityQueue<Object[]>)
rows).peek()[0];
+ } else {
+ // Results from LinearSelectionOrderByOperator
+ assert rows instanceof List;
+ segmentBoundaryValue = (Comparable) ((List<Object[]>)
rows).get(rows.size() - 1)[0];
+ }
+
if (boundaryValue == null) {
boundaryValue = segmentBoundaryValue;
} else {
@@ -274,14 +284,14 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
return blockToMerge;
}
if (mergedBlock == null) {
- mergedBlock = (SelectionResultsBlock) blockToMerge;
+ mergedBlock = convertToMergeableBlock((SelectionResultsBlock)
blockToMerge);
} else {
mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
}
numBlocksMerged++;
// Update the boundary value if enough rows are collected
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
mergedBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
mergedBlock.getRowsAsPriorityQueue();
if (selectionResult != null && selectionResult.size() == _numRowsToKeep)
{
assert selectionResult.peek() != null;
_globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
@@ -306,12 +316,19 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
return;
}
- PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>)
mergedBlock.getRows();
+ PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
Collection<Object[]> rowsToMerge = blockToMerge.getRows();
assert mergedRows != null && rowsToMerge != null;
SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge,
_numRowsToKeep);
}
+ @Override
+ protected SelectionResultsBlock
convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
+ // This may create a copy or return the same instance. Anyway, this
operator is the owner of the
+ // value now, so it can mutate it.
+ return resultsBlock.convertToPriorityQueueBased();
+ }
+
private static class MinMaxValueContext {
final Operator<BaseResultsBlock> _operator;
final Comparable _minValue;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index 098587cafd..d9bdb04475 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -104,9 +104,16 @@ public class SelectionOrderByCombineOperator extends
BaseCombineOperator<Selecti
return;
}
- PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>)
mergedBlock.getRows();
+ PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
Collection<Object[]> rowsToMerge = blockToMerge.getRows();
assert mergedRows != null && rowsToMerge != null;
SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge,
_numRowsToKeep);
}
+
+ @Override
+ protected SelectionResultsBlock
convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
+ // This may create a copy or return the same instance. Anyway, this
operator is the owner of the
+ // value now, so it can mutate it.
+ return resultsBlock.convertToPriorityQueueBased();
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
index 939800f1be..9ba1a65e86 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
@@ -72,6 +72,7 @@ public class DistinctOperator extends
BaseOperator<DistinctResultsBlock> {
return Collections.singletonList(_transformOperator);
}
+ @Override
public IndexSegment getIndexSegment() {
return _indexSegment;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
new file mode 100644
index 0000000000..e9fe801262
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * A selection Operator used when the first expressions in the order by are
identifier expressions of columns that are
+ * already sorted (either ascendingly or descendingly), even if the tail of
order by expressions are not sorted.
+ *
+ * ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column DESC
LIMIT 10 OFFSET 5
+ * or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column,
not_sorted LIMIT 10 OFFSET 5
+ * but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted,
sorted_column LIMIT 10 OFFSET 5
+ *
+ * Operators that derives from this class are going to have an almost linear
cost instead of the usual NlogN when actual
+ * sorting must be done, where N is the number of rows in the segment.
+ * There is a degraded scenario when the cost is actually NlogL (where L is
the limit of the query) when all the first L
+ * rows have the exact same value for the prefix of the sorted columns. Even
in that case, L should be quite smaller
+ * than N, so this implementation is algorithmically better than the normal
solution.
+ */
+public abstract class LinearSelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock> {
+ protected final IndexSegment _indexSegment;
+
+ protected final boolean _nullHandlingEnabled;
+ // Deduped order-by expressions followed by output expressions from
SelectionOperatorUtils.extractExpressions()
+ protected final List<ExpressionContext> _expressions;
+ protected final List<ExpressionContext> _alreadySorted;
+ protected final List<ExpressionContext> _toSort;
+
+ protected final TransformOperator _transformOperator;
+ protected final List<OrderByExpressionContext> _orderByExpressions;
+ protected final TransformResultMetadata[] _expressionsMetadata;
+ protected final int _numRowsToKeep;
+ private final Supplier<ListBuilder> _listBuilderSupplier;
+ protected boolean _used = false;
+ /**
+ * The comparator used to build the resulting {@link SelectionResultsBlock},
which sorts rows in reverse order to the
+ * one specified in the query.
+ */
+ protected Comparator<Object[]> _comparator;
+
+ /**
+ * @param expressions Order-by expressions must be at the head of the list.
+ * @param numSortedExpressions Number of expressions in the order-by
expressions that are sorted.
+ */
+ public LinearSelectionOrderByOperator(IndexSegment indexSegment,
QueryContext queryContext,
+ List<ExpressionContext> expressions, TransformOperator
transformOperator, int numSortedExpressions) {
+ _indexSegment = indexSegment;
+ _nullHandlingEnabled = queryContext.isNullHandlingEnabled();
+ _expressions = expressions;
+ _transformOperator = transformOperator;
+
+ _orderByExpressions = queryContext.getOrderByExpressions();
+ assert _orderByExpressions != null;
+ int numOrderByExpressions = _orderByExpressions.size();
+
+ _alreadySorted = expressions.subList(0, numSortedExpressions);
+ _toSort = expressions.subList(numSortedExpressions, numOrderByExpressions);
+
+ _expressionsMetadata = new TransformResultMetadata[_expressions.size()];
+ for (int i = 0; i < _expressionsMetadata.length; i++) {
+ ExpressionContext expression = _expressions.get(i);
+ _expressionsMetadata[i] =
_transformOperator.getResultMetadata(expression);
+ }
+
+ _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+
+ if (_toSort.isEmpty()) {
+ _listBuilderSupplier = () -> new
TotallySortedListBuilder(_numRowsToKeep);
+ } else {
+ Comparator<Object[]> sortedComparator =
+ OrderByComparatorFactory.getComparator(_orderByExpressions,
_expressionsMetadata, false, _nullHandlingEnabled,
+ 0, numSortedExpressions);
+ Comparator<Object[]> unsortedComparator =
+ OrderByComparatorFactory.getComparator(_orderByExpressions,
_expressionsMetadata, true, _nullHandlingEnabled,
+ numSortedExpressions, numOrderByExpressions);
+ _listBuilderSupplier = () -> new
PartiallySortedListBuilder(_numRowsToKeep, sortedComparator,
unsortedComparator);
+ }
+
+ _comparator =
+ OrderByComparatorFactory.getComparator(_orderByExpressions,
_expressionsMetadata, true, _nullHandlingEnabled);
+ }
+
+ @Override
+ public IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ int numDocsScanned = getNumDocsScanned();
+ long numEntriesScannedPostFilter = (long) numDocsScanned *
_transformOperator.getNumColumnsProjected();
+ int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+ return new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ numTotalDocs);
+ }
+
+ protected IntFunction<Object[]> fetchBlock(TransformBlock transformBlock,
BlockValSet[] blockValSets) {
+ int numExpressions = _expressions.size();
+
+ for (int i = 0; i < numExpressions; i++) {
+ ExpressionContext expression = _expressions.get(i);
+ blockValSets[i] = transformBlock.getBlockValueSet(expression);
+ }
+ RowBasedBlockValueFetcher blockValueFetcher = new
RowBasedBlockValueFetcher(blockValSets);
+
+ if (!_nullHandlingEnabled) {
+ return blockValueFetcher::getRow;
+ }
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numExpressions];
+ for (int i = 0; i < numExpressions; i++) {
+ nullBitmaps[i] = blockValSets[i].getNullBitmap();
+ }
+ return (docId) -> {
+ Object[] row = blockValueFetcher.getRow(docId);
+ for (int colId = 0; colId < nullBitmaps.length; colId++) {
+ if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(docId)) {
+ row[colId] = null;
+ }
+ }
+ return row;
+ };
+ }
+
+ protected abstract int getNumDocsScanned();
+
+ /**
+ * Returns a list of rows sorted that:
+ * <ul>
+ * <li>At least contains all the rows that fulfill the predicate</li>
+ * <li>Rows are sorted in a way that is compatible with the given list
builder supplier</li>
+ * </ul>
+ *
+ * That means that the result may contain more rows than required.
+ *
+ * @param listBuilderSupplier a {@link ListBuilder} supplier that should be
used to create the result. Each time is
+ * called a new {@link ListBuilder} will be
returned. All returned instances use the same
+ * comparator logic.
+ */
+ protected abstract List<Object[]> fetch(Supplier<ListBuilder>
listBuilderSupplier);
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return Collections.singletonList(_transformOperator);
+ }
+
+ protected abstract String getExplainName();
+
+ @Override
+ public String toExplainString() {
+ StringBuilder sb = new StringBuilder(getExplainName());
+
+ sb.append("(sortedList: ");
+ concatList(sb, _alreadySorted);
+
+ sb.append(", unsortedList: ");
+ concatList(sb, _toSort);
+
+ sb.append(", rest: ");
+ concatList(sb, _expressions.subList(_alreadySorted.size() +
_toSort.size(), _expressions.size()));
+
+ sb.append(')');
+ return sb.toString();
+ }
+
+ private void concatList(StringBuilder sb, List<?> list) {
+ sb.append('(');
+ Iterator<?> it = list.iterator();
+ if (it.hasNext()) {
+ sb.append(it.next());
+ while (it.hasNext()) {
+ sb.append(", ").append(it.next());
+ }
+ }
+ sb.append(')');
+ }
+
+ @Override
+ protected SelectionResultsBlock getNextBlock() {
+ Preconditions.checkState(!_used, "nextBlock was called more than once");
+ _used = true;
+ List<Object[]> list = fetch(_listBuilderSupplier);
+
+ DataSchema dataSchema = createDataSchema();
+
+ if (list.size() > _numRowsToKeep) {
+ list = new ArrayList<>(list.subList(0, _numRowsToKeep));
+ }
+
+ return new SelectionResultsBlock(dataSchema, list, _comparator);
+ }
+
+ protected DataSchema createDataSchema() {
+ int numExpressions = _expressions.size();
+
+ // Create the data schema
+ String[] columnNames = new String[numExpressions];
+ DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[numExpressions];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnNames[i] = _expressions.get(i).toString();
+ }
+ for (int i = 0; i < numExpressions; i++) {
+ TransformResultMetadata expressionMetadata = _expressionsMetadata[i];
+ columnDataTypes[i] =
+
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(),
expressionMetadata.isSingleValue());
+ }
+ return new DataSchema(columnNames, columnDataTypes);
+ }
+
+ /**
+ * A private class used to build a partially sorted list by adding partially
sorted data.
+ *
+ * Specifically, this class has been designed to receive successive calls to
{@link #add(Object[])} follow by a single
+ * call to {@link #build()}.
+ *
+ * Rows must be inserted in ascending order accordingly to the partial order
specified by a comparator.
+ * This comparator will define <i>partitions</i> of rows. All the rows in
the same partition are considered equal
+ * by that comparator.
+ *
+ * When calling {@link #add(Object[])} with a row that doesn't belong to the
current partition, the previous partition
+ * is <em>closed</em> and a new one is started.
+ */
+ protected interface ListBuilder {
+
+ /**
+ * Adds the given row to this object. The new column must be equal or
higher than previous inserted elements
+ * according to the partition comparator. No more rows should be added
once enough rows have been collected.
+ *
+ * @param row The row to add. The values of the already sorted columns
must be equal or higher than the last added
+ * row, if any.
+ * @return true if and only if enough rows have been collected
+ */
+ boolean add(Object[] row);
+
+ /**
+ * Builds the sorted list. The current partition will be <em>closed</em>.
Once this method is called, the builder
+ * should not be used.
+ */
+ List<Object[]> build();
+ }
+
+ /**
+ * This is the faster {@link ListBuilder} but also the most restrictive one.
It can only be used when data is inserted
+ * in total order and therefore each element belong to its own partition.
+ *
+ * This builder cannot be used to implement order-by queries where there is
at least one expression
+ * that is not sorted. In such case {@link PartiallySortedListBuilder}
should be used.
+ *
+ * This implementation is just a wrapper over an ArrayList and therefore the
average costs of its methods is constant.
+ */
+ @VisibleForTesting
+ static class TotallySortedListBuilder implements ListBuilder {
+ private final ArrayList<Object[]> _list;
+ private final int _maxNumRows;
+
+ public TotallySortedListBuilder(int maxNumRows) {
+ _maxNumRows = maxNumRows;
+ _list = new ArrayList<>(Integer.min(maxNumRows,
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+ }
+
+ @Override
+ public boolean add(Object[] row) {
+ _list.add(row);
+ return _list.size() == _maxNumRows;
+ }
+
+ @Override
+ public List<Object[]> build() {
+ return _list;
+ }
+ }
+
+ /**
+ * This {@link ListBuilder} is size bound and requires two comparators: The
first defines the partitions and the
+ * second defines an order inside each partition.
+ *
+ * This class does never store more than the requested number of elements.
In case more elements are inserted:
+ * <ul>
+ * <li>If the new element belongs to a higher partition, it is
discarded.</li>
+ * <li>If the new element belongs to the last included partition, the last
partition is treated as a priority queue
+ * sorted by the in-partition comparator. If the new element is lower than
some of the already inserted elements,
+ * the new replace the older.</li>
+ * </ul>
+ *
+ * This class can be used to implement order-by queries that include one or
more not sorted expressions.
+ * In cases where all expressions are sorted, {@link
TotallySortedListBuilder} should be used because its performance
+ * is better.
+ *
+ * As usual, elements are sorted by partition and there is no order
guarantee inside each partition. The second
+ * comparator is only used to keep the lower elements in the last partition.
+ */
+ @VisibleForTesting
+ static class PartiallySortedListBuilder implements ListBuilder {
+ /**
+ * A list with all the elements that have been already sorted.
+ */
+ private final ArrayList<Object[]> _sorted;
+ /**
+ * This attribute is used to store the last partition when the builder
already contains {@link #_maxNumRows} rows.
+ */
+ private PriorityQueue<Object[]> _lastPartitionQueue;
+ /**
+ * The comparator that defines the partitions and the one that impose in
which order add has to be called.
+ */
+ private final Comparator<Object[]> _partitionComparator;
+ /**
+ * The comparator that sorts different rows on each partition, which sorts
rows in reverse order to the one
+ * specified in the query.
+ */
+ private final Comparator<Object[]> _unsortedComparator;
+
+ private final int _maxNumRows;
+
+ private Object[] _lastPartitionRow;
+ private int _numSortedRows;
+
+ public PartiallySortedListBuilder(int maxNumRows, Comparator<Object[]>
partitionComparator,
+ Comparator<Object[]> unsortedComparator) {
+ _maxNumRows = maxNumRows;
+ _sorted = new ArrayList<>(Integer.min(maxNumRows,
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+ _partitionComparator = partitionComparator;
+ _unsortedComparator = unsortedComparator;
+ }
+
+ @Override
+ public boolean add(Object[] row) {
+ if (_lastPartitionRow == null) {
+ _lastPartitionRow = row;
+ _sorted.add(row);
+ return false;
+ }
+ int cmp = _partitionComparator.compare(row, _lastPartitionRow);
+ if (cmp < 0) {
+ throw new IllegalArgumentException(
+ "Row with docId " + _sorted.size() + " is not sorted compared to
the previous one");
+ }
+
+ boolean newPartition = cmp > 0;
+ if (_sorted.size() < _maxNumRows) {
+ // we don't have enough rows yet
+ if (newPartition) {
+ _lastPartitionRow = row;
+ _numSortedRows = _sorted.size();
+ }
+ // just add the new row to the result list
+ _sorted.add(row);
+ return false;
+ }
+
+ // enough rows have been collected
+ assert _sorted.size() == _maxNumRows;
+ if (newPartition) { // and the new element belongs to a new partition,
so we can just ignore it
+ return true;
+ }
+ // new element doesn't belong to a new partition, so we may need to add
it
+ if (_lastPartitionQueue == null) { // we have exactly _numRows rows, and
the new belongs to the last partition
+ // we need to prepare the priority queue
+ int numRowsInPriorityQueue = _maxNumRows - _numSortedRows;
+ _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue,
_unsortedComparator);
+ _lastPartitionQueue.addAll(_sorted.subList(_numSortedRows,
_maxNumRows));
+ }
+ // add the new element if it is lower than the greatest element stored
in the partition
+ if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) > 0) {
+ _lastPartitionQueue.poll();
+ _lastPartitionQueue.offer(row);
+ }
+ return false;
+ }
+
+ @Override
+ public List<Object[]> build() {
+ if (_lastPartitionQueue != null) {
+ assert _lastPartitionQueue.size() == _maxNumRows - _numSortedRows;
+ Iterator<Object[]> lastPartitionIt = _lastPartitionQueue.iterator();
+ for (int i = _numSortedRows; i < _maxNumRows; i++) {
+ _sorted.set(i, lastPartitionIt.next());
+ }
+ }
+ return _sorted;
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index edf88a0ee5..6db1f4e352 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -134,6 +134,7 @@ public class SelectionOnlyOperator extends
BaseOperator<SelectionResultsBlock> {
return Collections.singletonList(_transformOperator);
}
+ @Override
public IndexSegment getIndexSegment() {
return _indexSegment;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 47410a85ff..d45652bba8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.operator.query;
-import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -44,11 +43,9 @@ import
org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
@@ -83,18 +80,16 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
private final TransformResultMetadata[] _orderByExpressionMetadata;
private final int _numRowsToKeep;
private final PriorityQueue<Object[]> _rows;
- private final boolean _allOrderByColsPreSorted;
private int _numDocsScanned = 0;
private long _numEntriesScannedPostFilter = 0;
public SelectionOrderByOperator(IndexSegment indexSegment, QueryContext
queryContext,
- List<ExpressionContext> expressions, TransformOperator
transformOperator, boolean allOrderByColsPreSorted) {
+ List<ExpressionContext> expressions, TransformOperator
transformOperator) {
_indexSegment = indexSegment;
_nullHandlingEnabled = queryContext.isNullHandlingEnabled();
_expressions = expressions;
_transformOperator = transformOperator;
- _allOrderByColsPreSorted = allOrderByColsPreSorted;
_orderByExpressions = queryContext.getOrderByExpressions();
assert _orderByExpressions != null;
@@ -106,8 +101,11 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
}
_numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+ Comparator<Object[]> comparator =
+ OrderByComparatorFactory.getComparator(_orderByExpressions,
_orderByExpressionMetadata, true,
+ _nullHandlingEnabled);
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep,
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
- getComparator());
+ comparator);
}
@Override
@@ -122,186 +120,15 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
return stringBuilder.append(')').toString();
}
- private Comparator<Object[]> getComparator() {
- // Compare all single-value columns
- int numOrderByExpressions = _orderByExpressions.size();
- List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
- for (int i = 0; i < numOrderByExpressions; i++) {
- if (_orderByExpressionMetadata[i].isSingleValue()) {
- valueIndexList.add(i);
- } else {
- // MV columns should not be part of the selection order by only list
- throw new BadQueryRequestException(
- String.format("MV expression: %s should not be included in the
ORDER-BY clause",
- _orderByExpressions.get(i)));
- }
- }
-
- int numValuesToCompare = valueIndexList.size();
- int[] valueIndices = new int[numValuesToCompare];
- DataType[] storedTypes = new DataType[numValuesToCompare];
- // Use multiplier -1 or 1 to control ascending/descending order
- int[] multipliers = new int[numValuesToCompare];
- for (int i = 0; i < numValuesToCompare; i++) {
- int valueIndex = valueIndexList.get(i);
- valueIndices[i] = valueIndex;
- storedTypes[i] =
_orderByExpressionMetadata[valueIndex].getDataType().getStoredType();
- multipliers[i] = _orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
- }
-
- if (_nullHandlingEnabled) {
- return (Object[] o1, Object[] o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
-
- // TODO: Evaluate the performance of casting to Comparable and avoid
the switch
- Object v1 = o1[index];
- Object v2 = o2[index];
- if (v1 == null) {
- // The default null ordering is: 'NULLS LAST', regardless of the
ordering direction.
- return v2 == null ? 0 : -multipliers[i];
- } else if (v2 == null) {
- return multipliers[i];
- }
- int result;
- switch (storedTypes[i]) {
- case INT:
- result = ((Integer) v1).compareTo((Integer) v2);
- break;
- case LONG:
- result = ((Long) v1).compareTo((Long) v2);
- break;
- case FLOAT:
- result = ((Float) v1).compareTo((Float) v2);
- break;
- case DOUBLE:
- result = ((Double) v1).compareTo((Double) v2);
- break;
- case BIG_DECIMAL:
- result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
- break;
- case STRING:
- result = ((String) v1).compareTo((String) v2);
- break;
- case BYTES:
- result = ((ByteArray) v1).compareTo((ByteArray) v2);
- break;
- // NOTE: Multi-value columns are not comparable, so we should not
reach here
- default:
- throw new IllegalStateException();
- }
- if (result != 0) {
- return result * multipliers[i];
- }
- }
- return 0;
- };
- } else {
- return (Object[] o1, Object[] o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
-
- // TODO: Evaluate the performance of casting to Comparable and avoid
the switch
- Object v1 = o1[index];
- Object v2 = o2[index];
- int result;
- switch (storedTypes[i]) {
- case INT:
- result = ((Integer) v1).compareTo((Integer) v2);
- break;
- case LONG:
- result = ((Long) v1).compareTo((Long) v2);
- break;
- case FLOAT:
- result = ((Float) v1).compareTo((Float) v2);
- break;
- case DOUBLE:
- result = ((Double) v1).compareTo((Double) v2);
- break;
- case BIG_DECIMAL:
- result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
- break;
- case STRING:
- result = ((String) v1).compareTo((String) v2);
- break;
- case BYTES:
- result = ((ByteArray) v1).compareTo((ByteArray) v2);
- break;
- // NOTE: Multi-value columns are not comparable, so we should not
reach here
- default:
- throw new IllegalStateException();
- }
- if (result != 0) {
- return result * multipliers[i];
- }
- }
- return 0;
- };
- }
- }
-
@Override
protected SelectionResultsBlock getNextBlock() {
- if (_allOrderByColsPreSorted) {
- return computeAllPreSorted();
- } else if (_expressions.size() == _orderByExpressions.size()) {
+ if (_expressions.size() == _orderByExpressions.size()) {
return computeAllOrdered();
} else {
return computePartiallyOrdered();
}
}
- private SelectionResultsBlock computeAllPreSorted() {
- int numExpressions = _expressions.size();
-
- // Fetch all the expressions and insert them into the priority queue
- BlockValSet[] blockValSets = new BlockValSet[numExpressions];
- int numColumnsProjected = _transformOperator.getNumColumnsProjected();
- TransformBlock transformBlock;
- while (_numDocsScanned < _numRowsToKeep && (transformBlock =
_transformOperator.nextBlock()) != null) {
- for (int i = 0; i < numExpressions; i++) {
- ExpressionContext expression = _expressions.get(i);
- blockValSets[i] = transformBlock.getBlockValueSet(expression);
- }
- RowBasedBlockValueFetcher blockValueFetcher = new
RowBasedBlockValueFetcher(blockValSets);
- int numDocsFetched = transformBlock.getNumDocs();
- if (_nullHandlingEnabled) {
- RoaringBitmap[] nullBitmaps = new RoaringBitmap[numExpressions];
- for (int i = 0; i < numExpressions; i++) {
- nullBitmaps[i] = blockValSets[i].getNullBitmap();
- }
- for (int rowId = 0; rowId < numDocsFetched && (_numDocsScanned <
_numRowsToKeep); rowId++) {
- Object[] row = blockValueFetcher.getRow(rowId);
- for (int colId = 0; colId < numExpressions; colId++) {
- if (nullBitmaps[colId] != null &&
nullBitmaps[colId].contains(rowId)) {
- row[colId] = null;
- }
- }
- }
- }
- for (int i = 0; i < numDocsFetched && (_numDocsScanned <
_numRowsToKeep); i++) {
- SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i),
_rows, _numRowsToKeep);
- _numDocsScanned++;
- }
- }
- _numEntriesScannedPostFilter = (long) _numDocsScanned *
numColumnsProjected;
-
- // Create the data schema
- String[] columnNames = new String[numExpressions];
- DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[numExpressions];
- for (int i = 0; i < numExpressions; i++) {
- ExpressionContext expression = _expressions.get(i);
- columnNames[i] = expression.toString();
- TransformResultMetadata expressionMetadata =
_transformOperator.getResultMetadata(expression);
- columnDataTypes[i] =
-
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(),
expressionMetadata.isSingleValue());
- }
-
- DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-
- return new SelectionResultsBlock(dataSchema, _rows);
- }
-
/**
* Helper method to compute the result when all the output expressions are
ordered.
*/
@@ -481,6 +308,7 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
return Collections.singletonList(_transformOperator);
}
+ @Override
public IndexSegment getIndexSegment() {
return _indexSegment;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
new file mode 100644
index 0000000000..62de4824cd
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * An operator for order-by queries ASC that are partially sorted over the
sorting keys.
+ * @see LinearSelectionOrderByOperator
+ */
+public class SelectionPartiallyOrderedByAscOperator extends
LinearSelectionOrderByOperator {
+
+ private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_ASC";
+
+ private int _numDocsScanned = 0;
+
+ public SelectionPartiallyOrderedByAscOperator(IndexSegment indexSegment,
QueryContext queryContext,
+ List<ExpressionContext> expressions, TransformOperator
transformOperator, int numSortedExpressions) {
+ super(indexSegment, queryContext, expressions, transformOperator,
numSortedExpressions);
+ Preconditions.checkArgument(queryContext.getOrderByExpressions().stream()
+ .filter(expr -> expr.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("The query is not
order by identifiers"))
+ .isAsc(),
+ "%s can only be used when the first column in order by is ASC",
EXPLAIN_NAME);
+ }
+
+ @Override
+ protected List<Object[]> fetch(Supplier<ListBuilder> listBuilderSupplier) {
+ int numExpressions = _expressions.size();
+ BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+ ListBuilder listBuilder = listBuilderSupplier.get();
+ TransformBlock transformBlock;
+ while ((transformBlock = _transformOperator.nextBlock()) != null) {
+ IntFunction<Object[]> rowFetcher = fetchBlock(transformBlock,
blockValSets);
+ int numDocsFetched = transformBlock.getNumDocs();
+ _numDocsScanned += numDocsFetched;
+ for (int i = 0; i < numDocsFetched; i++) {
+ if (listBuilder.add(rowFetcher.apply(i))) {
+ return listBuilder.build();
+ }
+ }
+ }
+ return listBuilder.build();
+ }
+
+ @Override
+ public int getNumDocsScanned() {
+ return _numDocsScanned;
+ }
+
+ @Override
+ protected String getExplainName() {
+ return EXPLAIN_NAME;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
new file mode 100644
index 0000000000..5322e0a6e6
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * An operator for order-by queries DESC that are partially sorted over the
sorting keys.
+ *
+ * @see LinearSelectionOrderByOperator
+ */
+public class SelectionPartiallyOrderedByDescOperation extends
LinearSelectionOrderByOperator {
+
+ private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_DESC";
+
+ private int _numDocsScanned = 0;
+
+ public SelectionPartiallyOrderedByDescOperation(IndexSegment indexSegment,
QueryContext queryContext,
+ List<ExpressionContext> expressions, TransformOperator
transformOperator, int numSortedExpressions) {
+ super(indexSegment, queryContext, expressions, transformOperator,
numSortedExpressions);
+ assert queryContext.getOrderByExpressions() != null;
+ Preconditions.checkArgument(queryContext.getOrderByExpressions().stream()
+ .filter(expr -> expr.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER).findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("The query is not
order by identifiers")).isDesc(),
+ "%s can only be used when the first column in order by is DESC",
EXPLAIN_NAME);
+ }
+
+ @Override
+ protected List<Object[]> fetch(Supplier<ListBuilder> listBuilderSupplier) {
+
+ // Ideally we would use a descending cursor, but we don't actually have
them
+ // Alternatively, we could store all blocks in a list and iterate them in
reverse order, but ProjectionBlocks share
+ // the same DataBlockCache, so they may be ephemeral and being overridden
by the next block.
+ // The only alternative we have right now is to retrieve the last LIMIT
elements from each block
+
+ int numExpressions = _expressions.size();
+ BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+ List<Object[]> localBestRows = new ArrayList<>();
+ TransformBlock transformBlock;
+ while ((transformBlock = _transformOperator.nextBlock()) != null) {
+ IntFunction<Object[]> rowFetcher = fetchBlock(transformBlock,
blockValSets);
+ int numDocsFetched = transformBlock.getNumDocs();
+ _numDocsScanned += numDocsFetched;
+ ListBuilder listBuilder = listBuilderSupplier.get();
+
+ // first, calculate the best rows on this block
+ boolean enoughRowsCollected = false;
+ for (int docId = numDocsFetched - 1; docId >= 0; docId--) {
+ enoughRowsCollected = listBuilder.add(rowFetcher.apply(docId));
+ if (enoughRowsCollected) {
+ break;
+ }
+ }
+ // then try to add the best rows from previous block
+ if (!enoughRowsCollected) {
+ Iterator<Object[]> localBestRowIt = localBestRows.iterator();
+ while (!enoughRowsCollected && localBestRowIt.hasNext()) {
+ enoughRowsCollected = listBuilder.add(localBestRowIt.next());
+ }
+ }
+ // finally update bestRowsFromPrevBloc
+ localBestRows = listBuilder.build();
+ }
+ // Once we finished, localBestRows contains the global best rows
+ return localBestRows;
+ }
+
+ @Override
+ protected int getNumDocsScanned() {
+ return _numDocsScanned;
+ }
+
+ @Override
+ protected String getExplainName() {
+ return EXPLAIN_NAME;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index 5d40398df8..9ed48c5983 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.plan;
import java.util.ArrayList;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.core.common.Operator;
@@ -27,10 +28,15 @@ import
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.operator.query.EmptySelectionOperator;
import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
+import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.QueryOptionsUtils;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
/**
@@ -50,67 +56,132 @@ public class SelectionPlanNode implements PlanNode {
List<ExpressionContext> expressions =
SelectionOperatorUtils.extractExpressions(_queryContext, _indexSegment);
int limit = _queryContext.getLimit();
- if (limit > 0) {
- List<OrderByExpressionContext> orderByExpressions =
_queryContext.getOrderByExpressions();
- if (orderByExpressions == null) {
- // Selection only
- TransformOperator transformOperator = new
TransformPlanNode(_indexSegment, _queryContext, expressions,
- Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
- return new SelectionOnlyOperator(_indexSegment, _queryContext,
expressions, transformOperator);
- } else {
- // Selection order-by
- if (isAllOrderByColumnsSorted(orderByExpressions)) {
- // All order-by columns are sorted, no need to sort the records
- TransformOperator transformOperator = new
TransformPlanNode(_indexSegment, _queryContext, expressions,
- Math.min(limit + _queryContext.getOffset(),
DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
- return new SelectionOrderByOperator(_indexSegment, _queryContext,
expressions, transformOperator, true);
- } else if (orderByExpressions.size() == expressions.size()) {
- // All output expressions are ordered
- TransformOperator transformOperator =
- new TransformPlanNode(_indexSegment, _queryContext, expressions,
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
- return new SelectionOrderByOperator(_indexSegment, _queryContext,
expressions, transformOperator, false);
- } else {
- // Not all output expressions are ordered, only fetch the order-by
expressions and docId to avoid the
- // unnecessary data fetch
- List<ExpressionContext> expressionsToTransform = new
ArrayList<>(orderByExpressions.size());
- for (OrderByExpressionContext orderByExpression :
orderByExpressions) {
- expressionsToTransform.add(orderByExpression.getExpression());
- }
- TransformOperator transformOperator =
- new TransformPlanNode(_indexSegment, _queryContext,
expressionsToTransform,
- DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
- return new SelectionOrderByOperator(_indexSegment, _queryContext,
expressions, transformOperator, false);
- }
- }
- } else {
+ if (limit == 0) {
// Empty selection (LIMIT 0)
TransformOperator transformOperator = new
TransformPlanNode(_indexSegment, _queryContext, expressions, 0).run();
return new EmptySelectionOperator(_indexSegment, expressions,
transformOperator);
}
+
+ List<OrderByExpressionContext> orderByExpressions =
_queryContext.getOrderByExpressions();
+ if (orderByExpressions == null) {
+ // Selection only
+ // ie: SELECT ... FROM Table WHERE ... LIMIT 10
+ int maxDocsPerCall = Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+ TransformPlanNode planNode = new TransformPlanNode(_indexSegment,
_queryContext, expressions, maxDocsPerCall);
+ TransformOperator transformOperator = planNode.run();
+
+ return new SelectionOnlyOperator(_indexSegment, _queryContext,
expressions, transformOperator);
+ }
+ int numOrderByExpressions = orderByExpressions.size();
+ // Although it is a break of abstraction, some code, specially merging,
assumes that if there is an order by
+ // expression the operator will return a block whose selection result is a
priority queue.
+ int sortedColumnsPrefixSize = getSortedColumnsPrefix(orderByExpressions,
_queryContext.isNullHandlingEnabled());
+ OrderByAlgorithm orderByAlgorithm =
OrderByAlgorithm.fromQueryContext(_queryContext);
+ if (sortedColumnsPrefixSize > 0 && orderByAlgorithm !=
OrderByAlgorithm.NAIVE) {
+ int maxDocsPerCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
+ // The first order by expressions are sorted (either asc or desc).
+ // ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column
DESC LIMIT 10 OFFSET 5
+ // or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column,
not_sorted LIMIT 10 OFFSET 5
+ // but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted,
sorted_column LIMIT 10 OFFSET 5
+ if (orderByExpressions.get(0).isAsc()) {
+ if (sortedColumnsPrefixSize == orderByExpressions.size()) {
+ maxDocsPerCall = Math.min(limit + _queryContext.getOffset(),
DocIdSetPlanNode.MAX_DOC_PER_CALL);
+ }
+ TransformPlanNode planNode = new TransformPlanNode(_indexSegment,
_queryContext, expressions, maxDocsPerCall);
+ TransformOperator transformOperator = planNode.run();
+ return new SelectionPartiallyOrderedByAscOperator(_indexSegment,
_queryContext, expressions, transformOperator,
+ sortedColumnsPrefixSize);
+ } else {
+ TransformPlanNode planNode = new TransformPlanNode(_indexSegment,
_queryContext, expressions, maxDocsPerCall);
+ TransformOperator transformOperator = planNode.run();
+ return new SelectionPartiallyOrderedByDescOperation(_indexSegment,
_queryContext, expressions,
+ transformOperator, sortedColumnsPrefixSize);
+ }
+ }
+ if (numOrderByExpressions == expressions.size()) {
+ // All output expressions are ordered
+ // ie: SELECT not_sorted1, not_sorted2 FROM Table WHERE ... ORDER BY
not_sorted1, not_sorted2 LIMIT 10 OFFSET 5
+ TransformOperator transformOperator =
+ new TransformPlanNode(_indexSegment, _queryContext, expressions,
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ return new SelectionOrderByOperator(_indexSegment, _queryContext,
expressions, transformOperator);
+ }
+ // Not all output expressions are ordered, only fetch the order-by
expressions and docId to avoid the
+ // unnecessary data fetch
+ // ie: SELECT ... FROM Table WHERE ... ORDER BY not_sorted1, not_sorted2
LIMIT 10
+ List<ExpressionContext> expressionsToTransform = new
ArrayList<>(numOrderByExpressions);
+ for (OrderByExpressionContext orderByExpression : orderByExpressions) {
+ expressionsToTransform.add(orderByExpression.getExpression());
+ }
+ TransformOperator transformOperator = new TransformPlanNode(_indexSegment,
_queryContext, expressionsToTransform,
+ DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+ return new SelectionOrderByOperator(_indexSegment, _queryContext,
expressions, transformOperator);
}
/**
- * This function checks whether all columns in order by clause are
pre-sorted.
- * This is used to optimize order by limit clauses.
- * For eg:
- * A query like "select * from table order by col1, col2 limit 10"
- * will take all the n matching rows and add it to a priority queue of size
10.
- * This is nlogk operation which can be quite expensive for a large n.
- * In the above example, if the docs in the segment are already sorted by
col1 and col2 then there is no need for
- * sorting at all (only limit is needed).
- * @return true is all columns in order by clause are sorted . False
otherwise
+ * This functions returns the number of expressions that are sorted by the
implicit order in the index.
+ *
+ * This means that query that uses these expressions in its order by doesn't
actually need to sort from 0 to the
+ * given number (excluded) of expressions, as they are returned in the
correct order.
+ *
+ * This method supports ASC and DESC order and ensures that all prefix
expressions follow the same order. For example,
+ * ORDER BY sorted_col1 ASC, sorted_col2 ASC and ORDER BY sorted_col1 DESC,
sorted_col2 DESC will return 2 but
+ * ORDER BY sorted_col1 DESC, sorted_col2 ASC and ORDER BY sorted_col1 ASC,
sorted_col2 DESC will return 1 while
+ * ORDER BY not_sorted, sorted_col1 will return 0 because the first column
is not sorted.
+ *
+ * It doesn't make sense to add literal expressions in an order by
expression, but if they are included, they are
+ * considered sorted and its ASC/DESC is ignored.
+ *
+ * @return the max number that guarantees that from the first expression to
the returned number, the index is already
+ * sorted.
*/
- private boolean isAllOrderByColumnsSorted(List<OrderByExpressionContext>
orderByExpressions) {
- for (OrderByExpressionContext orderByExpression : orderByExpressions) {
- if (!(orderByExpression.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER)
- || !orderByExpression.isAsc()) {
- return false;
+ private int getSortedColumnsPrefix(List<OrderByExpressionContext>
orderByExpressions, boolean isNullHandlingEnabled) {
+ boolean asc = orderByExpressions.get(0).isAsc();
+ for (int i = 0; i < orderByExpressions.size(); i++) {
+ if (!isSorted(orderByExpressions.get(i), asc, isNullHandlingEnabled)) {
+ return i;
+ }
+ }
+ // If we reach here, all are sorted
+ return orderByExpressions.size();
+ }
+
+ private boolean isSorted(OrderByExpressionContext orderByExpression, boolean
asc, boolean isNullHandlingEnabled) {
+ switch (orderByExpression.getExpression().getType()) {
+ case LITERAL: {
+ return true;
+ }
+ case IDENTIFIER: {
+ if (!orderByExpression.isAsc() == asc) {
+ return false;
+ }
+ String column = orderByExpression.getExpression().getIdentifier();
+ DataSource dataSource = _indexSegment.getDataSource(column);
+ // If there are null values, we cannot trust
DataSourceMetadata.isSorted
+ if (isNullHandlingEnabled) {
+ NullValueVectorReader nullValueVector =
dataSource.getNullValueVector();
+ if (nullValueVector != null &&
!nullValueVector.getNullBitmap().isEmpty()) {
+ return false;
+ }
+ }
+ return dataSource.getDataSourceMetadata().isSorted();
}
- String column = orderByExpression.getExpression().getIdentifier();
- if
(!_indexSegment.getDataSource(column).getDataSourceMetadata().isSorted()) {
+ case FUNCTION: // we could optimize monotonically increasing functions
+ default: {
return false;
}
}
- return true;
+ }
+
+ public enum OrderByAlgorithm {
+ NAIVE;
+
+ @Nullable
+ public static OrderByAlgorithm fromQueryContext(QueryContext queryContext)
{
+ String orderByAlgorithm =
QueryOptionsUtils.getOrderByAlgorithm(queryContext.getQueryOptions());
+ if (orderByAlgorithm == null) {
+ return null;
+ }
+ return OrderByAlgorithm.valueOf(orderByAlgorithm.toUpperCase());
+ }
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
new file mode 100644
index 0000000000..4258317e76
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * A utility class used to create comparators that should be used by operators
that implements order by semantics.
+ */
+public class OrderByComparatorFactory {
+ private OrderByComparatorFactory() {
+ }
+
+ public static Comparator<Object[]>
getComparator(List<OrderByExpressionContext> orderByExpressions,
+ TransformResultMetadata[] orderByExpressionMetadata, boolean reverse,
boolean nullHandlingEnabled) {
+ return getComparator(orderByExpressions, orderByExpressionMetadata,
reverse, nullHandlingEnabled, 0,
+ orderByExpressions.size());
+ }
+
+ /**
+ * @param reverse if false, the comparator will order in the direction
indicated by the
+ * {@link OrderByExpressionContext#isAsc()}. Otherwise, it will be in the
opposite direction.
+ */
+ public static Comparator<Object[]>
getComparator(List<OrderByExpressionContext> orderByExpressions,
+ TransformResultMetadata[] orderByExpressionMetadata, boolean reverse,
boolean nullHandlingEnabled, int from,
+ int to) {
+ Preconditions.checkArgument(to <= orderByExpressions.size(),
+ "Trying to access %sth position of orderByExpressions with size %s",
to, orderByExpressions.size());
+ Preconditions.checkArgument(to <= orderByExpressionMetadata.length,
+ "Trying to access %sth position of orderByExpressionMetadata with size
%s", to,
+ orderByExpressionMetadata.length);
+ Preconditions.checkArgument(from < to, "FROM (%s) must be lower than TO
(%s)", from, to);
+
+ // Compare all single-value columns
+ int numOrderByExpressions = to - from;
+ List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+ for (int i = from; i < to; i++) {
+ if (orderByExpressionMetadata[i].isSingleValue()) {
+ valueIndexList.add(i);
+ } else {
+ // MV columns should not be part of the selection order by only list
+ throw new BadQueryRequestException(
+ String.format("MV expression: %s should not be included in the
ORDER-BY clause",
+ orderByExpressions.get(i)));
+ }
+ }
+
+ int numValuesToCompare = valueIndexList.size();
+ int[] valueIndices = new int[numValuesToCompare];
+ FieldSpec.DataType[] storedTypes = new
FieldSpec.DataType[numValuesToCompare];
+ // Use multiplier -1 or 1 to control ascending/descending order
+ int[] multipliers = new int[numValuesToCompare];
+ int ascMult = reverse ? -1 : 1;
+ int descMult = reverse ? 1 : -1;
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int valueIndex = valueIndexList.get(i);
+ valueIndices[i] = valueIndex;
+ storedTypes[i] =
orderByExpressionMetadata[valueIndex].getDataType().getStoredType();
+ multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? ascMult :
descMult;
+ }
+
+ if (nullHandlingEnabled) {
+ return (Object[] o1, Object[] o2) -> {
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int index = valueIndices[i];
+ // TODO: Evaluate the performance of casting to Comparable and avoid
the switch
+ Object v1 = o1[index];
+ Object v2 = o2[index];
+ if (v1 == null) {
+ // The default null ordering is: 'NULLS LAST', regardless of the
ordering direction.
+ return v2 == null ? 0 : -multipliers[i];
+ } else if (v2 == null) {
+ return multipliers[i];
+ }
+ int result = compareCols(v1, v2, storedTypes[i], multipliers[i]);
+ if (result != 0) {
+ return result;
+ }
+ }
+ return 0;
+ };
+ } else {
+ return (Object[] o1, Object[] o2) -> {
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int index = valueIndices[i];
+ // TODO: Evaluate the performance of casting to Comparable and avoid
the switch
+ int result = compareCols(o1[index], o2[index], storedTypes[i],
multipliers[i]);
+ if (result != 0) {
+ return result;
+ }
+ }
+ return 0;
+ };
+ }
+ }
+
+ private static int compareCols(Object v1, Object v2, FieldSpec.DataType
type, int multiplier) {
+
+ // TODO: Evaluate the performance of casting to Comparable and avoid the
switch
+ int result;
+ switch (type) {
+ case INT:
+ result = ((Integer) v1).compareTo((Integer) v2);
+ break;
+ case LONG:
+ result = ((Long) v1).compareTo((Long) v2);
+ break;
+ case FLOAT:
+ result = ((Float) v1).compareTo((Float) v2);
+ break;
+ case DOUBLE:
+ result = ((Double) v1).compareTo((Double) v2);
+ break;
+ case BIG_DECIMAL:
+ result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
+ break;
+ case STRING:
+ result = ((String) v1).compareTo((String) v2);
+ break;
+ case BYTES:
+ result = ((ByteArray) v1).compareTo((ByteArray) v2);
+ break;
+ // NOTE: Multi-value columns are not comparable, so we should not reach
here
+ default:
+ throw new IllegalStateException();
+ }
+ return result * multiplier;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
index 6190373295..4a0d6e5d8c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
@@ -100,4 +100,9 @@ public class QueryOptionsUtils {
public static boolean isServerReturnFinalResult(Map<String, String>
queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT));
}
+
+ @Nullable
+ public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
+ return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index e8db2051bf..88b81d5b88 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -224,7 +224,7 @@ public class SelectionCombineOperatorTest {
SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM
testTable ORDER BY intColumn");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
combineResult.getRows();
+ PriorityQueue<Object[]> selectionResult =
combineResult.getRowsAsPriorityQueue();
assertNotNull(selectionResult);
assertEquals(selectionResult.size(), 10);
int expectedValue = 9;
@@ -248,7 +248,7 @@ public class SelectionCombineOperatorTest {
combineResult = getCombineResult("SELECT * FROM testTable ORDER BY
intColumn DESC");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- selectionResult = (PriorityQueue<Object[]>) combineResult.getRows();
+ selectionResult = combineResult.getRowsAsPriorityQueue();
assertNotNull(selectionResult);
assertEquals(selectionResult.size(), 10);
expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40;
@@ -272,7 +272,7 @@ public class SelectionCombineOperatorTest {
combineResult = getCombineResult("SELECT * FROM testTable ORDER BY
intColumn DESC LIMIT 10000");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- selectionResult = (PriorityQueue<Object[]>) combineResult.getRows();
+ selectionResult = combineResult.getRowsAsPriorityQueue();
assertNotNull(selectionResult);
assertEquals(selectionResult.size(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
// Should not early-terminate
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
new file mode 100644
index 0000000000..b7e2a2b6de
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import
org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.PartiallySortedListBuilder;
+import
org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.TotallySortedListBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class LinearSelectionOrderByOperatorTest {
+
+ @Test
+ public void testTotallySortedListBuilder() {
+ int maxNumRows = 10;
+
+ // Enough rows collected
+ TotallySortedListBuilder listBuilder = new
TotallySortedListBuilder(maxNumRows);
+ for (int i = 0; i < maxNumRows; i++) {
+ Object[] row = new Object[]{i / 2};
+ boolean enoughRowsCollected = listBuilder.add(row);
+ assertEquals(enoughRowsCollected, i == maxNumRows - 1);
+ }
+ List<Object[]> rows = listBuilder.build();
+ assertEquals(rows.size(), maxNumRows);
+ for (int i = 0; i < maxNumRows; i++) {
+ assertEquals(rows.get(i), new Object[]{i / 2});
+ }
+
+ // Not enough rows collected
+ listBuilder = new TotallySortedListBuilder(maxNumRows);
+ for (int i = 0; i < maxNumRows - 1; i++) {
+ Object[] row = new Object[]{i / 2};
+ assertFalse(listBuilder.add(row));
+ }
+ rows = listBuilder.build();
+ assertEquals(rows.size(), maxNumRows - 1);
+ for (int i = 0; i < maxNumRows - 1; i++) {
+ assertEquals(rows.get(i), new Object[]{i / 2});
+ }
+ }
+
+ @Test
+ public void testPartiallySortedListBuilder() {
+ int maxNumRows = 10;
+ Comparator<Object[]> partitionComparator = Comparator.comparingInt(row ->
(Integer) row[0]);
+ Comparator<Object[]> unsortedComparator = (row1, row2) ->
Integer.compare((Integer) row2[1], (Integer) row1[1]);
+
+ // Enough rows collected without tie rows
+ PartiallySortedListBuilder listBuilder =
+ new PartiallySortedListBuilder(maxNumRows, partitionComparator,
unsortedComparator);
+ for (int i = 0; i < maxNumRows; i++) {
+ Object[] row = new Object[]{i / 2, maxNumRows - i};
+ assertFalse(listBuilder.add(row));
+ }
+ int lastPartitionValue = (maxNumRows - 1) / 2;
+ assertTrue(listBuilder.add(new Object[]{lastPartitionValue + 1, 0}));
+ List<Object[]> rows = listBuilder.build();
+ assertEquals(rows.size(), maxNumRows);
+ for (int i = 0; i < maxNumRows; i++) {
+ assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+ }
+
+ // Enough rows collected with tie rows
+ listBuilder = new PartiallySortedListBuilder(maxNumRows,
partitionComparator, unsortedComparator);
+ for (int i = 0; i < maxNumRows; i++) {
+ Object[] row = new Object[]{i / 2, maxNumRows - i};
+ assertFalse(listBuilder.add(row));
+ }
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 0}));
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 2}));
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 4}));
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 6}));
+ assertTrue(listBuilder.add(new Object[]{lastPartitionValue + 1, 0}));
+ rows = listBuilder.build();
+ assertEquals(rows.size(), maxNumRows);
+ // For the last partition, should contain unsorted value 0 and 1
+ Set<Integer> unsortedValues = new HashSet<>();
+ for (int i = 0; i < maxNumRows; i++) {
+ if (i / 2 != lastPartitionValue) {
+ assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+ } else {
+ Object[] row = rows.get(i);
+ assertEquals(row[0], lastPartitionValue);
+ int unsortedValue = (int) row[1];
+ assertTrue(unsortedValue == 0 || unsortedValue == 1);
+ unsortedValues.add(unsortedValue);
+ }
+ }
+ assertEquals(unsortedValues.size(), 2);
+
+ // Not enough rows collected with tie rows
+ listBuilder = new PartiallySortedListBuilder(maxNumRows,
partitionComparator, unsortedComparator);
+ for (int i = 0; i < maxNumRows; i++) {
+ Object[] row = new Object[]{i / 2, maxNumRows - i};
+ assertFalse(listBuilder.add(row));
+ }
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 0}));
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 2}));
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 4}));
+ assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 6}));
+ rows = listBuilder.build();
+ assertEquals(rows.size(), maxNumRows);
+ // For the last partition, should contain unsorted value 0 and 1
+ unsortedValues = new HashSet<>();
+ for (int i = 0; i < maxNumRows; i++) {
+ if (i / 2 != lastPartitionValue) {
+ assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+ } else {
+ Object[] row = rows.get(i);
+ assertEquals(row[0], lastPartitionValue);
+ int unsortedValue = (int) row[1];
+ assertTrue(unsortedValue == 0 || unsortedValue == 1);
+ unsortedValues.add(unsortedValue);
+ }
+ }
+ assertEquals(unsortedValues.size(), 2);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index af88db06c7..876baf7a98 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -203,7 +203,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest
extends BaseMultiValueQu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
Object[] lastRow = selectionResult.peek();
assertEquals(lastRow.length, 4);
@@ -228,7 +228,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest
extends BaseMultiValueQu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+ selectionResult = resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
assertEquals(lastRow.length, 4);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
index e4de729912..9e3abdc929 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
@@ -203,7 +203,7 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest
extends BaseMultiValu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
Object[] lastRow = selectionResult.peek();
assertEquals(lastRow.length, 4);
@@ -228,7 +228,7 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest
extends BaseMultiValu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+ selectionResult = resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
assertEquals(lastRow.length, 4);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index f4f6a86678..b81065dfab 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -30,7 +30,8 @@ import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
+import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -88,7 +89,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
String query = "SELECT daysSinceEpoch FROM testTable WHERE "
+ "dateTimeConvert(daysSinceEpoch, '1:DAYS:EPOCH',
'1:MILLISECONDS:EPOCH', '1:MILLISECONDS') > ago('P1D') "
+ "ORDER BY daysSinceEpoch LIMIT 10";
- SelectionOrderByOperator selectionOrderByOperator = getOperator(query);
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
verifySelectionOrderByAgoFunctionResult(resultsBlock);
@@ -107,7 +108,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("daysSinceEpoch"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("daysSinceEpoch")),
ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
for (Object[] row : selectionResult) {
assertEquals(row.length, 1);
@@ -231,7 +232,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
Object[] lastRow = selectionResult.peek();
assertEquals(lastRow.length, 4);
@@ -255,7 +256,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
ColumnDataType.INT);
- selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+ selectionResult = resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
assertEquals(lastRow.length, 4);
@@ -263,6 +264,162 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertEquals(((Integer)
lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
}
+ @Test
+ public void testSelectionOrderBySortedColumn() {
+ // Test query order by single sorted column in ascending order
+ String orderBy = " ORDER BY column5";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(SELECTION_QUERY + orderBy);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 10 * (3 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column1", "column11"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING});
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ assertEquals(selectionResult.size(), 10);
+ Object[] lastRow = selectionResult.peek();
+ assertEquals(lastRow.length, 3);
+ assertEquals(lastRow[0], "gFuH");
+
+ // Test query order by single sorted column in descending order
+ orderBy = " ORDER BY column5 DESC";
+ selectionOrderByOperator = getOperator(SELECTION_QUERY + orderBy);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByDescOperation);
+ resultsBlock = selectionOrderByOperator.nextBlock();
+ executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 30000 * (3 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column1", "column11"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING});
+ selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ assertEquals(selectionResult.size(), 10);
+ lastRow = selectionResult.peek();
+ assertEquals(lastRow.length, 3);
+ assertEquals(lastRow[0], "gFuH");
+
+ // Test query order by all sorted columns in ascending order
+ String query = "SELECT column5, daysSinceEpoch FROM testTable ORDER BY
column5, daysSinceEpoch";
+ selectionOrderByOperator = getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
+ resultsBlock = selectionOrderByOperator.nextBlock();
+ executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 10 * (2 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 20L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"daysSinceEpoch"});
+ assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+ selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ assertEquals(selectionResult.size(), 10);
+ lastRow = selectionResult.peek();
+ assertEquals(lastRow.length, 2);
+ assertEquals(lastRow[0], "gFuH");
+ assertEquals(lastRow[1], 126164076);
+
+ // Test query order by all sorted columns in descending order
+ query = "SELECT column5 FROM testTable ORDER BY column5 DESC,
daysSinceEpoch DESC";
+ selectionOrderByOperator = getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByDescOperation);
+ resultsBlock = selectionOrderByOperator.nextBlock();
+ executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 30000 * (2 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"daysSinceEpoch"});
+ assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+ selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ assertEquals(selectionResult.size(), 10);
+ lastRow = selectionResult.peek();
+ assertEquals(lastRow.length, 2);
+ assertEquals(lastRow[0], "gFuH");
+ assertEquals(lastRow[1], 167572854);
+
+ // Test query order by one sorted column in ascending order, the other
sorted column in descending order
+ query = "SELECT daysSinceEpoch FROM testTable ORDER BY column5,
daysSinceEpoch DESC";
+ selectionOrderByOperator = getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
+ resultsBlock = selectionOrderByOperator.nextBlock();
+ executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 30000 * (2 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"daysSinceEpoch"});
+ assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+ selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ assertEquals(selectionResult.size(), 10);
+ lastRow = selectionResult.peek();
+ assertEquals(lastRow.length, 2);
+ assertEquals(lastRow[0], "gFuH");
+ assertEquals(lastRow[1], 167572854);
+
+ // Test query order by one sorted column in ascending order, and some
unsorted columns
+ query = "SELECT column1 FROM testTable ORDER BY column5, column6, column1";
+ selectionOrderByOperator = getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
+ resultsBlock = selectionOrderByOperator.nextBlock();
+ executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 30000 * (3 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column6", "column1"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.INT});
+ selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ assertEquals(selectionResult.size(), 10);
+ lastRow = selectionResult.peek();
+ assertEquals(lastRow.length, 3);
+ assertEquals(lastRow[0], "gFuH");
+ // Unsorted column values should be the same as ordering by their own
+ assertEquals(lastRow[1], 6043515);
+ assertEquals(lastRow[2], 10542595);
+
+ // Test query order by one sorted column in descending order, and some
unsorted columns
+ query = "SELECT column6 FROM testTable ORDER BY column5 DESC, column6,
column1";
+ selectionOrderByOperator = getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByDescOperation);
+ resultsBlock = selectionOrderByOperator.nextBlock();
+ executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 30000 * (3 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column6", "column1"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.INT});
+ selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ assertEquals(selectionResult.size(), 10);
+ lastRow = selectionResult.peek();
+ assertEquals(lastRow.length, 3);
+ assertEquals(lastRow[0], "gFuH");
+ // Unsorted column values should be the same as ordering by their own
+ assertEquals(lastRow[1], 6043515);
+ assertEquals(lastRow[2], 10542595);
+ }
+
@Test
public void testSelectStarOrderBy() {
// Test query without filter
@@ -283,7 +440,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
Object[] lastRow = selectionResult.peek();
assertEquals(lastRow.length, 11);
@@ -308,7 +465,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
ColumnDataType.INT);
- selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+ selectionResult = resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
assertEquals(lastRow.length, 11);
@@ -336,7 +493,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertEquals(selectionDataSchema.size(), 11);
assertTrue(columnIndexMap.containsKey("column5"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")),
ColumnDataType.STRING);
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
Object[] lastRow = selectionResult.peek();
assertEquals(lastRow.length, 11);
@@ -358,7 +515,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertEquals(selectionDataSchema.size(), 11);
assertTrue(columnIndexMap.containsKey("column5"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")),
ColumnDataType.STRING);
- selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+ selectionResult =
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
assertEquals(lastRow.length, 11);
@@ -387,7 +544,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>)
resultsBlock.getRows();
+ PriorityQueue<Object[]> selectionResult =
resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 12000);
Object[] lastRow = selectionResult.peek();
assertEquals(lastRow.length, 11);
@@ -412,7 +569,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
ColumnDataType.INT);
- selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+ selectionResult = resultsBlock.getRowsAsPriorityQueue();
assertEquals(selectionResult.size(), 6129);
lastRow = selectionResult.peek();
assertEquals(lastRow.length, 11);
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
new file mode 100644
index 0000000000..308c7c3761
--- /dev/null
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.queries.BaseQueriesTest;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkOrderByQueries extends BaseQueriesTest {
+
+ public static void main(String[] args)
+ throws Exception {
+ ChainedOptionsBuilder opt = new
OptionsBuilder().include(BenchmarkOrderByQueries.class.getSimpleName());
+ new Runner(opt.build()).run();
+ }
+
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"FilteredAggregationsTest");
+ private static final String TABLE_NAME = "MyTable";
+ private static final String FIRST_SEGMENT_NAME = "firstTestSegment";
+ private static final String SECOND_SEGMENT_NAME = "secondTestSegment";
+ private static final String INT_COL_NAME = "INT_COL";
+ private static final String SORTED_COL_NAME = "SORTED_COL";
+ private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
+ private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
+ private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
+ private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
+ private static final String LOW_CARDINALITY_STRING_COL =
"LOW_CARDINALITY_STRING_COL";
+
+ @Param("1500000")
+ private int _numRows;
+ @Param({"naive", "null"})
+ private String _orderByAlgorithm;
+ @Param({"EXP(0.5)"})
+ String _scenario;
+ @Param({"1", "1000"})
+ int _primaryRepetitions;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+ private LongSupplier _supplier;
+
+ @Setup
+ public void setUp()
+ throws Exception {
+ _supplier = Distribution.createLongSupplier(42, _scenario);
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ buildSegment(FIRST_SEGMENT_NAME);
+ buildSegment(SECOND_SEGMENT_NAME);
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+
+ Set<String> invertedIndexCols = new HashSet<>();
+ invertedIndexCols.add(INT_COL_NAME);
+ invertedIndexCols.add(LOW_CARDINALITY_STRING_COL);
+
+ indexLoadingConfig.setRangeIndexColumns(invertedIndexCols);
+ indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
+
+ ImmutableSegment firstImmutableSegment =
+ ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME),
indexLoadingConfig);
+ ImmutableSegment secondImmutableSegment =
+ ImmutableSegmentLoader.load(new File(INDEX_DIR, SECOND_SEGMENT_NAME),
indexLoadingConfig);
+ _indexSegment = firstImmutableSegment;
+ _indexSegments = Arrays.asList(firstImmutableSegment,
secondImmutableSegment);
+ }
+
+ @TearDown
+ public void tearDown() {
+ for (IndexSegment indexSegment : _indexSegments) {
+ indexSegment.destroy();
+ }
+
+ FileUtils.deleteQuietly(INDEX_DIR);
+ EXECUTOR_SERVICE.shutdownNow();
+ }
+
+ private List<GenericRow> createTestData(int numRows) {
+ Map<Integer, String> strings = new HashMap<>();
+ List<GenericRow> rows = new ArrayList<>();
+ String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i ->
"value" + i)
+ .toArray(String[]::new);
+ for (int i = 0; i < numRows; i += _primaryRepetitions) {
+ for (int j = 0; j < _primaryRepetitions; j++) {
+ GenericRow row = new GenericRow();
+ row.putValue(SORTED_COL_NAME, i);
+ row.putValue(INT_COL_NAME, (int) _supplier.getAsLong());
+ row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong());
+ row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong());
+ row.putValue(RAW_STRING_COL_NAME, strings.computeIfAbsent(
+ (int) _supplier.getAsLong(), k -> UUID.randomUUID().toString()));
+ row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME));
+ row.putValue(LOW_CARDINALITY_STRING_COL, lowCardinalityValues[(i + j)
% lowCardinalityValues.length]);
+ rows.add(row);
+ }
+ }
+ return rows;
+ }
+
+ private void buildSegment(String segmentName)
+ throws Exception {
+ List<GenericRow> rows = createTestData(_numRows);
+ List<FieldConfig> fieldConfigs = new ArrayList<>();
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setInvertedIndexColumns(Collections.singletonList(INT_COL_NAME))
+ .setFieldConfigList(fieldConfigs)
+ .setNoDictionaryColumns(Arrays.asList(RAW_INT_COL_NAME,
RAW_STRING_COL_NAME))
+ .setSortedColumn(SORTED_COL_NAME)
+ .setStarTreeIndexConfigs(Collections.singletonList(new
StarTreeIndexConfig(
+ Arrays.asList(SORTED_COL_NAME, INT_COL_NAME), null,
Collections.singletonList(
+ new AggregationFunctionColumnPair(AggregationFunctionType.SUM,
RAW_INT_COL_NAME).toColumnName()),
+ Integer.MAX_VALUE)))
+ .build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(RAW_STRING_COL_NAME,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(NO_INDEX_STRING_COL,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LOW_CARDINALITY_STRING_COL,
FieldSpec.DataType.STRING)
+ .build();
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(segmentName);
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
+ }
+
+ @Benchmark
+ public BrokerResponseNative sortedAsc() {
+ return getBrokerResponse(
+ "SELECT SORTED_COL "
+ + "FROM MyTable "
+ + "ORDER BY SORTED_COL ASC "
+ + "LIMIT 1052 "
+ + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+ }
+ @Benchmark
+ public BrokerResponseNative sortedAscPartially() {
+ return getBrokerResponse(
+ "SELECT SORTED_COL "
+ + "FROM MyTable "
+ + "ORDER BY SORTED_COL ASC, LOW_CARDINALITY_STRING_COL "
+ + "LIMIT 1052 "
+ + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+ }
+
+ @Benchmark
+ public BrokerResponseNative sortedDesc() {
+ return getBrokerResponse(
+ "SELECT SORTED_COL "
+ + "FROM MyTable "
+ + "ORDER BY SORTED_COL DESC "
+ + "LIMIT 1052 "
+ + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+ }
+
+ @Benchmark
+ public BrokerResponseNative sortedDescPartially() {
+ return getBrokerResponse(
+ "SELECT SORTED_COL "
+ + "FROM MyTable "
+ + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
+ + "LIMIT 1052 "
+ + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+ }
+
+ @Override
+ protected String getFilter() {
+ return null;
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
index 94efe0f9dd..494129f0a1 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
@@ -50,6 +50,8 @@ public interface DataSourceMetadata {
/**
* Returns {@code true} if the column is sorted, {@code false} otherwise.
+ *
+ * The result of this method cannot be trusted if null handling is enabled.
*/
boolean isSorted();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7c33c8233b..fa2218ffb7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -289,6 +289,8 @@ public class CommonConstants {
// Reorder scan based predicates based on cardinality and number of
selected values
public static final String AND_SCAN_REORDERING = "AndScanReordering";
+ public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";
+
// TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
@Deprecated
public static final String PRESERVE_TYPE = "preserveType";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]