This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a83a7fbaa1 optimize `order by sorted ASC, unsorted` and `order by 
DESC` cases (#8979)
a83a7fbaa1 is described below

commit a83a7fbaa117da873b5fc042842d6622cefce4de
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Oct 20 19:14:11 2022 +0200

    optimize `order by sorted ASC, unsorted` and `order by DESC` cases (#8979)
---
 .../request/context/OrderByExpressionContext.java  |   4 +
 .../AcquireReleaseColumnsSegmentOperator.java      |   1 +
 .../blocks/results/SelectionResultsBlock.java      |  33 +-
 .../core/operator/combine/BaseCombineOperator.java |  18 +-
 ...xValueBasedSelectionOrderByCombineOperator.java |  31 +-
 .../combine/SelectionOrderByCombineOperator.java   |   9 +-
 .../core/operator/query/DistinctOperator.java      |   1 +
 .../query/LinearSelectionOrderByOperator.java      | 426 +++++++++++++++++++++
 .../core/operator/query/SelectionOnlyOperator.java |   1 +
 .../operator/query/SelectionOrderByOperator.java   | 188 +--------
 .../SelectionPartiallyOrderedByAscOperator.java    |  82 ++++
 .../SelectionPartiallyOrderedByDescOperation.java  | 105 +++++
 .../apache/pinot/core/plan/SelectionPlanNode.java  | 171 ++++++---
 .../core/query/utils/OrderByComparatorFactory.java | 155 ++++++++
 .../apache/pinot/core/util/QueryOptionsUtils.java  |   5 +
 .../combine/SelectionCombineOperatorTest.java      |   6 +-
 .../query/LinearSelectionOrderByOperatorTest.java  | 142 +++++++
 ...InnerSegmentSelectionMultiValueQueriesTest.java |   4 +-
 ...erSegmentSelectionMultiValueRawQueriesTest.java |   4 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java | 179 ++++++++-
 .../apache/pinot/perf/BenchmarkOrderByQueries.java | 256 +++++++++++++
 .../segment/spi/datasource/DataSourceMetadata.java |   2 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 23 files changed, 1565 insertions(+), 260 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
index f47ed35b63..fbe1a83882 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/OrderByExpressionContext.java
@@ -43,6 +43,10 @@ public class OrderByExpressionContext {
     return _isAsc;
   }
 
+  public boolean isDesc() {
+    return !_isAsc;
+  }
+
   /**
    * Adds the columns (IDENTIFIER expressions) in the order-by expression to 
the given set.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index b581a3fca4..4d79eeea4a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -92,6 +92,7 @@ public class AcquireReleaseColumnsSegmentOperator extends 
BaseOperator<BaseResul
     return Collections.singletonList(_childOperator);
   }
 
+  @Override
   public IndexSegment getIndexSegment() {
     return _indexSegment;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 2a7fd5847c..025fc091af 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -18,8 +18,13 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -32,10 +37,21 @@ import 
org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 public class SelectionResultsBlock extends BaseResultsBlock {
   private final DataSchema _dataSchema;
   private final Collection<Object[]> _rows;
+  private final Comparator<? super Object[]> _comparator;
 
-  public SelectionResultsBlock(DataSchema dataSchema, Collection<Object[]> 
rows) {
+  public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) {
+    this(dataSchema, rows, null);
+  }
+
+  public SelectionResultsBlock(DataSchema dataSchema, PriorityQueue<Object[]> 
rows) {
+    this(dataSchema, rows, rows.comparator());
+  }
+
+  public SelectionResultsBlock(DataSchema dataSchema, Collection<Object[]> 
rows,
+      @Nullable Comparator<? super Object[]> comparator) {
     _dataSchema = dataSchema;
     _rows = rows;
+    _comparator = comparator;
   }
 
   public DataSchema getDataSchema() {
@@ -56,6 +72,21 @@ public class SelectionResultsBlock extends BaseResultsBlock {
     return _rows;
   }
 
+  public SelectionResultsBlock convertToPriorityQueueBased() {
+    if (_rows instanceof PriorityQueue) {
+      return this;
+    }
+    Preconditions.checkState(_comparator != null, "No comparator specified in 
the results block");
+    PriorityQueue<Object[]> result = new PriorityQueue<>(_comparator);
+    result.addAll(_rows);
+    return new SelectionResultsBlock(_dataSchema, result);
+  }
+
+  public PriorityQueue<Object[]> getRowsAsPriorityQueue() {
+    Preconditions.checkState(_rows instanceof PriorityQueue, "The results 
block is not backed by a priority queue");
+    return (PriorityQueue<Object[]>) _rows;
+  }
+
   @Override
   public DataTable getDataTable(QueryContext queryContext)
       throws IOException {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 59ae72430d..b99b12d334 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -168,6 +168,7 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
           ((AcquireReleaseColumnsSegmentOperator) operator).release();
         }
       }
+
       if (isQuerySatisfied(resultsBlock)) {
         // Query is satisfied, skip processing the remaining segments
         _blockingQueue.offer(resultsBlock);
@@ -216,7 +217,7 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
         return blockToMerge;
       }
       if (mergedBlock == null) {
-        mergedBlock = (T) blockToMerge;
+        mergedBlock = convertToMergeableBlock((T) blockToMerge);
       } else {
         mergeResultsBlocks(mergedBlock, (T) blockToMerge);
       }
@@ -237,19 +238,30 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
   }
 
   /**
-   * Can be overridden for early termination.
+   * Can be overridden for early termination. The input results block might 
not be mergeable.
    */
   protected boolean isQuerySatisfied(T resultsBlock) {
     return false;
   }
 
   /**
-   * Merge an IntermediateResultsBlock into the main IntermediateResultsBlock.
+   * Merges a results block into the main mergeable results block.
    * <p>NOTE: {@code blockToMerge} should contain the result for a segment 
without any exception. The errored segment
    * result is already handled.
+   *
+   * @param mergedBlock The block that accumulates previous results. It should 
be modified to add the information of the
+   *                    other block.
+   * @param blockToMerge The new block that needs to be merged into the 
mergedBlock.
    */
   protected abstract void mergeResultsBlocks(T mergedBlock, T blockToMerge);
 
+  /**
+   * Converts the given results block into a mergeable results block if 
necessary.
+   */
+  protected T convertToMergeableBlock(T resultsBlock) {
+    return resultsBlock;
+  }
+
   @Override
   public List<Operator> getChildOperators() {
     return _operators;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index e818a6dde7..e73d272280 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -211,11 +211,21 @@ public class 
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
           ((AcquireReleaseColumnsSegmentOperator) operator).release();
         }
       }
-      PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
-      if (selectionResult != null && selectionResult.size() == _numRowsToKeep) 
{
+      Collection<Object[]> rows = resultsBlock.getRows();
+      if (rows != null && rows.size() >= _numRowsToKeep) {
         // Segment result has enough rows, update the boundary value
-        assert selectionResult.peek() != null;
-        Comparable segmentBoundaryValue = (Comparable) 
selectionResult.peek()[0];
+
+        Comparable segmentBoundaryValue;
+        if (rows instanceof PriorityQueue) {
+          // Results from SelectionOrderByOperator
+          assert ((PriorityQueue<Object[]>) rows).peek() != null;
+          segmentBoundaryValue = (Comparable) ((PriorityQueue<Object[]>) 
rows).peek()[0];
+        } else {
+          // Results from LinearSelectionOrderByOperator
+          assert rows instanceof List;
+          segmentBoundaryValue = (Comparable) ((List<Object[]>) 
rows).get(rows.size() - 1)[0];
+        }
+
         if (boundaryValue == null) {
           boundaryValue = segmentBoundaryValue;
         } else {
@@ -274,14 +284,14 @@ public class 
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
         return blockToMerge;
       }
       if (mergedBlock == null) {
-        mergedBlock = (SelectionResultsBlock) blockToMerge;
+        mergedBlock = convertToMergeableBlock((SelectionResultsBlock) 
blockToMerge);
       } else {
         mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
       }
       numBlocksMerged++;
 
       // Update the boundary value if enough rows are collected
-      PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
mergedBlock.getRows();
+      PriorityQueue<Object[]> selectionResult = 
mergedBlock.getRowsAsPriorityQueue();
       if (selectionResult != null && selectionResult.size() == _numRowsToKeep) 
{
         assert selectionResult.peek() != null;
         _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
@@ -306,12 +316,19 @@ public class 
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
       return;
     }
 
-    PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>) 
mergedBlock.getRows();
+    PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
     Collection<Object[]> rowsToMerge = blockToMerge.getRows();
     assert mergedRows != null && rowsToMerge != null;
     SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, 
_numRowsToKeep);
   }
 
+  @Override
+  protected SelectionResultsBlock 
convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
+    // This may create a copy or return the same instance. Anyway, this 
operator is the owner of the
+    // value now, so it can mutate it.
+    return resultsBlock.convertToPriorityQueueBased();
+  }
+
   private static class MinMaxValueContext {
     final Operator<BaseResultsBlock> _operator;
     final Comparable _minValue;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index 098587cafd..d9bdb04475 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -104,9 +104,16 @@ public class SelectionOrderByCombineOperator extends 
BaseCombineOperator<Selecti
       return;
     }
 
-    PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>) 
mergedBlock.getRows();
+    PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
     Collection<Object[]> rowsToMerge = blockToMerge.getRows();
     assert mergedRows != null && rowsToMerge != null;
     SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, 
_numRowsToKeep);
   }
+
+  @Override
+  protected SelectionResultsBlock 
convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
+    // This may create a copy or return the same instance. Anyway, this 
operator is the owner of the
+    // value now, so it can mutate it.
+    return resultsBlock.convertToPriorityQueueBased();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
index 939800f1be..9ba1a65e86 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
@@ -72,6 +72,7 @@ public class DistinctOperator extends 
BaseOperator<DistinctResultsBlock> {
     return Collections.singletonList(_transformOperator);
   }
 
+  @Override
   public IndexSegment getIndexSegment() {
     return _indexSegment;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
new file mode 100644
index 0000000000..e9fe801262
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * A selection Operator used when the first expressions in the order by are 
identifier expressions of columns that are
+ * already sorted (either ascendingly or descendingly), even if the tail of 
order by expressions are not sorted.
+ *
+ * ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column DESC 
LIMIT 10 OFFSET 5
+ * or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column, 
not_sorted LIMIT 10 OFFSET 5
+ * but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted, 
sorted_column LIMIT 10 OFFSET 5
+ *
+ * Operators that derives from this class are going to have an almost linear 
cost instead of the usual NlogN when actual
+ * sorting must be done, where N is the number of rows in the segment.
+ * There is a degraded scenario when the cost is actually NlogL (where L is 
the limit of the query) when all the first L
+ * rows have the exact same value for the prefix of the sorted columns. Even 
in that case, L should be quite smaller
+ * than N, so this implementation is algorithmically better than the normal 
solution.
+ */
+public abstract class LinearSelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock> {
+  protected final IndexSegment _indexSegment;
+
+  protected final boolean _nullHandlingEnabled;
+  // Deduped order-by expressions followed by output expressions from 
SelectionOperatorUtils.extractExpressions()
+  protected final List<ExpressionContext> _expressions;
+  protected final List<ExpressionContext> _alreadySorted;
+  protected final List<ExpressionContext> _toSort;
+
+  protected final TransformOperator _transformOperator;
+  protected final List<OrderByExpressionContext> _orderByExpressions;
+  protected final TransformResultMetadata[] _expressionsMetadata;
+  protected final int _numRowsToKeep;
+  private final Supplier<ListBuilder> _listBuilderSupplier;
+  protected boolean _used = false;
+  /**
+   * The comparator used to build the resulting {@link SelectionResultsBlock}, 
which sorts rows in reverse order to the
+   * one specified in the query.
+   */
+  protected Comparator<Object[]> _comparator;
+
+  /**
+   * @param expressions Order-by expressions must be at the head of the list.
+   * @param numSortedExpressions Number of expressions in the order-by 
expressions that are sorted.
+   */
+  public LinearSelectionOrderByOperator(IndexSegment indexSegment, 
QueryContext queryContext,
+      List<ExpressionContext> expressions, TransformOperator 
transformOperator, int numSortedExpressions) {
+    _indexSegment = indexSegment;
+    _nullHandlingEnabled = queryContext.isNullHandlingEnabled();
+    _expressions = expressions;
+    _transformOperator = transformOperator;
+
+    _orderByExpressions = queryContext.getOrderByExpressions();
+    assert _orderByExpressions != null;
+    int numOrderByExpressions = _orderByExpressions.size();
+
+    _alreadySorted = expressions.subList(0, numSortedExpressions);
+    _toSort = expressions.subList(numSortedExpressions, numOrderByExpressions);
+
+    _expressionsMetadata = new TransformResultMetadata[_expressions.size()];
+    for (int i = 0; i < _expressionsMetadata.length; i++) {
+      ExpressionContext expression = _expressions.get(i);
+      _expressionsMetadata[i] = 
_transformOperator.getResultMetadata(expression);
+    }
+
+    _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+
+    if (_toSort.isEmpty()) {
+      _listBuilderSupplier = () -> new 
TotallySortedListBuilder(_numRowsToKeep);
+    } else {
+      Comparator<Object[]> sortedComparator =
+          OrderByComparatorFactory.getComparator(_orderByExpressions, 
_expressionsMetadata, false, _nullHandlingEnabled,
+              0, numSortedExpressions);
+      Comparator<Object[]> unsortedComparator =
+          OrderByComparatorFactory.getComparator(_orderByExpressions, 
_expressionsMetadata, true, _nullHandlingEnabled,
+              numSortedExpressions, numOrderByExpressions);
+      _listBuilderSupplier = () -> new 
PartiallySortedListBuilder(_numRowsToKeep, sortedComparator, 
unsortedComparator);
+    }
+
+    _comparator =
+        OrderByComparatorFactory.getComparator(_orderByExpressions, 
_expressionsMetadata, true, _nullHandlingEnabled);
+  }
+
+  @Override
+  public IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    int numDocsScanned = getNumDocsScanned();
+    long numEntriesScannedPostFilter = (long) numDocsScanned * 
_transformOperator.getNumColumnsProjected();
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    return new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        numTotalDocs);
+  }
+
+  protected IntFunction<Object[]> fetchBlock(TransformBlock transformBlock, 
BlockValSet[] blockValSets) {
+    int numExpressions = _expressions.size();
+
+    for (int i = 0; i < numExpressions; i++) {
+      ExpressionContext expression = _expressions.get(i);
+      blockValSets[i] = transformBlock.getBlockValueSet(expression);
+    }
+    RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
+
+    if (!_nullHandlingEnabled) {
+      return blockValueFetcher::getRow;
+    }
+    RoaringBitmap[] nullBitmaps = new RoaringBitmap[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      nullBitmaps[i] = blockValSets[i].getNullBitmap();
+    }
+    return (docId) -> {
+      Object[] row = blockValueFetcher.getRow(docId);
+      for (int colId = 0; colId < nullBitmaps.length; colId++) {
+        if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(docId)) {
+          row[colId] = null;
+        }
+      }
+      return row;
+    };
+  }
+
+  protected abstract int getNumDocsScanned();
+
+  /**
+   * Returns a list of rows sorted that:
+   * <ul>
+   *   <li>At least contains all the rows that fulfill the predicate</li>
+   *   <li>Rows are sorted in a way that is compatible with the given list 
builder supplier</li>
+   * </ul>
+   *
+   * That means that the result may contain more rows than required.
+   *
+   * @param listBuilderSupplier a {@link ListBuilder} supplier that should be 
used to create the result. Each time is
+   *                            called a new {@link ListBuilder} will be 
returned. All returned instances use the same
+   *                            comparator logic.
+   */
+  protected abstract List<Object[]> fetch(Supplier<ListBuilder> 
listBuilderSupplier);
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return Collections.singletonList(_transformOperator);
+  }
+
+  protected abstract String getExplainName();
+
+  @Override
+  public String toExplainString() {
+    StringBuilder sb = new StringBuilder(getExplainName());
+
+    sb.append("(sortedList: ");
+    concatList(sb, _alreadySorted);
+
+    sb.append(", unsortedList: ");
+    concatList(sb, _toSort);
+
+    sb.append(", rest: ");
+    concatList(sb, _expressions.subList(_alreadySorted.size() + 
_toSort.size(), _expressions.size()));
+
+    sb.append(')');
+    return sb.toString();
+  }
+
+  private void concatList(StringBuilder sb, List<?> list) {
+    sb.append('(');
+    Iterator<?> it = list.iterator();
+    if (it.hasNext()) {
+      sb.append(it.next());
+      while (it.hasNext()) {
+        sb.append(", ").append(it.next());
+      }
+    }
+    sb.append(')');
+  }
+
+  @Override
+  protected SelectionResultsBlock getNextBlock() {
+    Preconditions.checkState(!_used, "nextBlock was called more than once");
+    _used = true;
+    List<Object[]> list = fetch(_listBuilderSupplier);
+
+    DataSchema dataSchema = createDataSchema();
+
+    if (list.size() > _numRowsToKeep) {
+      list = new ArrayList<>(list.subList(0, _numRowsToKeep));
+    }
+
+    return new SelectionResultsBlock(dataSchema, list, _comparator);
+  }
+
+  protected DataSchema createDataSchema() {
+    int numExpressions = _expressions.size();
+
+    // Create the data schema
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < columnNames.length; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+    }
+    for (int i = 0; i < numExpressions; i++) {
+      TransformResultMetadata expressionMetadata = _expressionsMetadata[i];
+      columnDataTypes[i] =
+          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    }
+    return new DataSchema(columnNames, columnDataTypes);
+  }
+
+  /**
+   * A private class used to build a partially sorted list by adding partially 
sorted data.
+   *
+   * Specifically, this class has been designed to receive successive calls to 
{@link #add(Object[])} follow by a single
+   * call to {@link #build()}.
+   *
+   * Rows must be inserted in ascending order accordingly to the partial order 
specified by a comparator.
+   * This comparator will define <i>partitions</i> of rows. All the rows in 
the same partition are considered equal
+   * by that comparator.
+   *
+   * When calling {@link #add(Object[])} with a row that doesn't belong to the 
current partition, the previous partition
+   * is <em>closed</em> and a new one is started.
+   */
+  protected interface ListBuilder {
+
+    /**
+     * Adds the given row to this object. The new column must be equal or 
higher than previous inserted elements
+     * according to the partition comparator. No more rows should be added 
once enough rows have been collected.
+     *
+     * @param row The row to add. The values of the already sorted columns 
must be equal or higher than the last added
+     *           row, if any.
+     * @return true if and only if enough rows have been collected
+     */
+    boolean add(Object[] row);
+
+    /**
+     * Builds the sorted list. The current partition will be <em>closed</em>. 
Once this method is called, the builder
+     * should not be used.
+     */
+    List<Object[]> build();
+  }
+
+  /**
+   * This is the faster {@link ListBuilder} but also the most restrictive one. 
It can only be used when data is inserted
+   * in total order and therefore each element belong to its own partition.
+   *
+   * This builder cannot be used to implement order-by queries where there is 
at least one expression
+   * that is not sorted. In such case {@link PartiallySortedListBuilder} 
should be used.
+   *
+   * This implementation is just a wrapper over an ArrayList and therefore the 
average costs of its methods is constant.
+   */
+  @VisibleForTesting
+  static class TotallySortedListBuilder implements ListBuilder {
+    private final ArrayList<Object[]> _list;
+    private final int _maxNumRows;
+
+    public TotallySortedListBuilder(int maxNumRows) {
+      _maxNumRows = maxNumRows;
+      _list = new ArrayList<>(Integer.min(maxNumRows, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+    }
+
+    @Override
+    public boolean add(Object[] row) {
+      _list.add(row);
+      return _list.size() == _maxNumRows;
+    }
+
+    @Override
+    public List<Object[]> build() {
+      return _list;
+    }
+  }
+
+  /**
+   * This {@link ListBuilder} is size bound and requires two comparators: The 
first defines the partitions and the
+   * second defines an order inside each partition.
+   *
+   * This class does never store more than the requested number of elements. 
In case more elements are inserted:
+   * <ul>
+   *   <li>If the new element belongs to a higher partition, it is 
discarded.</li>
+   *   <li>If the new element belongs to the last included partition, the last 
partition is treated as a priority queue
+   *   sorted by the in-partition comparator. If the new element is lower than 
some of the already inserted elements,
+   *   the new replace the older.</li>
+   * </ul>
+   *
+   * This class can be used to implement order-by queries that include one or 
more not sorted expressions.
+   * In cases where all expressions are sorted, {@link 
TotallySortedListBuilder} should be used because its performance
+   * is better.
+   *
+   * As usual, elements are sorted by partition and there is no order 
guarantee inside each partition. The second
+   * comparator is only used to keep the lower elements in the last partition.
+   */
+  @VisibleForTesting
+  static class PartiallySortedListBuilder implements ListBuilder {
+    /**
+     * A list with all the elements that have been already sorted.
+     */
+    private final ArrayList<Object[]> _sorted;
+    /**
+     * This attribute is used to store the last partition when the builder 
already contains {@link #_maxNumRows} rows.
+     */
+    private PriorityQueue<Object[]> _lastPartitionQueue;
+    /**
+     * The comparator that defines the partitions and the one that impose in 
which order add has to be called.
+     */
+    private final Comparator<Object[]> _partitionComparator;
+    /**
+     * The comparator that sorts different rows on each partition, which sorts 
rows in reverse order to the one
+     * specified in the query.
+     */
+    private final Comparator<Object[]> _unsortedComparator;
+
+    private final int _maxNumRows;
+
+    private Object[] _lastPartitionRow;
+    private int _numSortedRows;
+
+    public PartiallySortedListBuilder(int maxNumRows, Comparator<Object[]> 
partitionComparator,
+        Comparator<Object[]> unsortedComparator) {
+      _maxNumRows = maxNumRows;
+      _sorted = new ArrayList<>(Integer.min(maxNumRows, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+      _partitionComparator = partitionComparator;
+      _unsortedComparator = unsortedComparator;
+    }
+
+    @Override
+    public boolean add(Object[] row) {
+      if (_lastPartitionRow == null) {
+        _lastPartitionRow = row;
+        _sorted.add(row);
+        return false;
+      }
+      int cmp = _partitionComparator.compare(row, _lastPartitionRow);
+      if (cmp < 0) {
+        throw new IllegalArgumentException(
+            "Row with docId " + _sorted.size() + " is not sorted compared to 
the previous one");
+      }
+
+      boolean newPartition = cmp > 0;
+      if (_sorted.size() < _maxNumRows) {
+        // we don't have enough rows yet
+        if (newPartition) {
+          _lastPartitionRow = row;
+          _numSortedRows = _sorted.size();
+        }
+        // just add the new row to the result list
+        _sorted.add(row);
+        return false;
+      }
+
+      // enough rows have been collected
+      assert _sorted.size() == _maxNumRows;
+      if (newPartition) { // and the new element belongs to a new partition, 
so we can just ignore it
+        return true;
+      }
+      // new element doesn't belong to a new partition, so we may need to add 
it
+      if (_lastPartitionQueue == null) { // we have exactly _numRows rows, and 
the new belongs to the last partition
+        // we need to prepare the priority queue
+        int numRowsInPriorityQueue = _maxNumRows - _numSortedRows;
+        _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue, 
_unsortedComparator);
+        _lastPartitionQueue.addAll(_sorted.subList(_numSortedRows, 
_maxNumRows));
+      }
+      // add the new element if it is lower than the greatest element stored 
in the partition
+      if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) > 0) {
+        _lastPartitionQueue.poll();
+        _lastPartitionQueue.offer(row);
+      }
+      return false;
+    }
+
+    @Override
+    public List<Object[]> build() {
+      if (_lastPartitionQueue != null) {
+        assert _lastPartitionQueue.size() == _maxNumRows - _numSortedRows;
+        Iterator<Object[]> lastPartitionIt = _lastPartitionQueue.iterator();
+        for (int i = _numSortedRows; i < _maxNumRows; i++) {
+          _sorted.set(i, lastPartitionIt.next());
+        }
+      }
+      return _sorted;
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index edf88a0ee5..6db1f4e352 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -134,6 +134,7 @@ public class SelectionOnlyOperator extends 
BaseOperator<SelectionResultsBlock> {
     return Collections.singletonList(_transformOperator);
   }
 
+  @Override
   public IndexSegment getIndexSegment() {
     return _indexSegment;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 47410a85ff..d45652bba8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.operator.query;
 
-import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -44,11 +43,9 @@ import 
org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -83,18 +80,16 @@ public class SelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock
   private final TransformResultMetadata[] _orderByExpressionMetadata;
   private final int _numRowsToKeep;
   private final PriorityQueue<Object[]> _rows;
-  private final boolean _allOrderByColsPreSorted;
 
   private int _numDocsScanned = 0;
   private long _numEntriesScannedPostFilter = 0;
 
   public SelectionOrderByOperator(IndexSegment indexSegment, QueryContext 
queryContext,
-      List<ExpressionContext> expressions, TransformOperator 
transformOperator, boolean allOrderByColsPreSorted) {
+      List<ExpressionContext> expressions, TransformOperator 
transformOperator) {
     _indexSegment = indexSegment;
     _nullHandlingEnabled = queryContext.isNullHandlingEnabled();
     _expressions = expressions;
     _transformOperator = transformOperator;
-    _allOrderByColsPreSorted = allOrderByColsPreSorted;
 
     _orderByExpressions = queryContext.getOrderByExpressions();
     assert _orderByExpressions != null;
@@ -106,8 +101,11 @@ public class SelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock
     }
 
     _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+    Comparator<Object[]> comparator =
+        OrderByComparatorFactory.getComparator(_orderByExpressions, 
_orderByExpressionMetadata, true,
+            _nullHandlingEnabled);
     _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
-        getComparator());
+        comparator);
   }
 
   @Override
@@ -122,186 +120,15 @@ public class SelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock
     return stringBuilder.append(')').toString();
   }
 
-  private Comparator<Object[]> getComparator() {
-    // Compare all single-value columns
-    int numOrderByExpressions = _orderByExpressions.size();
-    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
-    for (int i = 0; i < numOrderByExpressions; i++) {
-      if (_orderByExpressionMetadata[i].isSingleValue()) {
-        valueIndexList.add(i);
-      } else {
-        // MV columns should not be part of the selection order by only list
-        throw new BadQueryRequestException(
-            String.format("MV expression: %s should not be included in the 
ORDER-BY clause",
-                _orderByExpressions.get(i)));
-      }
-    }
-
-    int numValuesToCompare = valueIndexList.size();
-    int[] valueIndices = new int[numValuesToCompare];
-    DataType[] storedTypes = new DataType[numValuesToCompare];
-    // Use multiplier -1 or 1 to control ascending/descending order
-    int[] multipliers = new int[numValuesToCompare];
-    for (int i = 0; i < numValuesToCompare; i++) {
-      int valueIndex = valueIndexList.get(i);
-      valueIndices[i] = valueIndex;
-      storedTypes[i] = 
_orderByExpressionMetadata[valueIndex].getDataType().getStoredType();
-      multipliers[i] = _orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
-    }
-
-    if (_nullHandlingEnabled) {
-      return (Object[] o1, Object[] o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-
-          // TODO: Evaluate the performance of casting to Comparable and avoid 
the switch
-          Object v1 = o1[index];
-          Object v2 = o2[index];
-          if (v1 == null) {
-            // The default null ordering is: 'NULLS LAST', regardless of the 
ordering direction.
-            return v2 == null ? 0 : -multipliers[i];
-          } else if (v2 == null) {
-            return multipliers[i];
-          }
-          int result;
-          switch (storedTypes[i]) {
-            case INT:
-              result = ((Integer) v1).compareTo((Integer) v2);
-              break;
-            case LONG:
-              result = ((Long) v1).compareTo((Long) v2);
-              break;
-            case FLOAT:
-              result = ((Float) v1).compareTo((Float) v2);
-              break;
-            case DOUBLE:
-              result = ((Double) v1).compareTo((Double) v2);
-              break;
-            case BIG_DECIMAL:
-              result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
-              break;
-            case STRING:
-              result = ((String) v1).compareTo((String) v2);
-              break;
-            case BYTES:
-              result = ((ByteArray) v1).compareTo((ByteArray) v2);
-              break;
-            // NOTE: Multi-value columns are not comparable, so we should not 
reach here
-            default:
-              throw new IllegalStateException();
-          }
-          if (result != 0) {
-            return result * multipliers[i];
-          }
-        }
-        return 0;
-      };
-    } else {
-      return (Object[] o1, Object[] o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-
-          // TODO: Evaluate the performance of casting to Comparable and avoid 
the switch
-          Object v1 = o1[index];
-          Object v2 = o2[index];
-          int result;
-          switch (storedTypes[i]) {
-            case INT:
-              result = ((Integer) v1).compareTo((Integer) v2);
-              break;
-            case LONG:
-              result = ((Long) v1).compareTo((Long) v2);
-              break;
-            case FLOAT:
-              result = ((Float) v1).compareTo((Float) v2);
-              break;
-            case DOUBLE:
-              result = ((Double) v1).compareTo((Double) v2);
-              break;
-            case BIG_DECIMAL:
-              result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
-              break;
-            case STRING:
-              result = ((String) v1).compareTo((String) v2);
-              break;
-            case BYTES:
-              result = ((ByteArray) v1).compareTo((ByteArray) v2);
-              break;
-            // NOTE: Multi-value columns are not comparable, so we should not 
reach here
-            default:
-              throw new IllegalStateException();
-          }
-          if (result != 0) {
-            return result * multipliers[i];
-          }
-        }
-        return 0;
-      };
-    }
-  }
-
   @Override
   protected SelectionResultsBlock getNextBlock() {
-    if (_allOrderByColsPreSorted) {
-      return computeAllPreSorted();
-    } else if (_expressions.size() == _orderByExpressions.size()) {
+    if (_expressions.size() == _orderByExpressions.size()) {
       return computeAllOrdered();
     } else {
       return computePartiallyOrdered();
     }
   }
 
-  private SelectionResultsBlock computeAllPreSorted() {
-    int numExpressions = _expressions.size();
-
-    // Fetch all the expressions and insert them into the priority queue
-    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
-    int numColumnsProjected = _transformOperator.getNumColumnsProjected();
-    TransformBlock transformBlock;
-    while (_numDocsScanned < _numRowsToKeep && (transformBlock = 
_transformOperator.nextBlock()) != null) {
-      for (int i = 0; i < numExpressions; i++) {
-        ExpressionContext expression = _expressions.get(i);
-        blockValSets[i] = transformBlock.getBlockValueSet(expression);
-      }
-      RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
-      int numDocsFetched = transformBlock.getNumDocs();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap[] nullBitmaps = new RoaringBitmap[numExpressions];
-        for (int i = 0; i < numExpressions; i++) {
-          nullBitmaps[i] = blockValSets[i].getNullBitmap();
-        }
-        for (int rowId = 0; rowId < numDocsFetched && (_numDocsScanned < 
_numRowsToKeep); rowId++) {
-          Object[] row = blockValueFetcher.getRow(rowId);
-          for (int colId = 0; colId < numExpressions; colId++) {
-            if (nullBitmaps[colId] != null && 
nullBitmaps[colId].contains(rowId)) {
-              row[colId] = null;
-            }
-          }
-        }
-      }
-      for (int i = 0; i < numDocsFetched && (_numDocsScanned < 
_numRowsToKeep); i++) {
-        SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), 
_rows, _numRowsToKeep);
-        _numDocsScanned++;
-      }
-    }
-    _numEntriesScannedPostFilter = (long) _numDocsScanned * 
numColumnsProjected;
-
-    // Create the data schema
-    String[] columnNames = new String[numExpressions];
-    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
-    for (int i = 0; i < numExpressions; i++) {
-      ExpressionContext expression = _expressions.get(i);
-      columnNames[i] = expression.toString();
-      TransformResultMetadata expressionMetadata = 
_transformOperator.getResultMetadata(expression);
-      columnDataTypes[i] =
-          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
-    }
-
-    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-
-    return new SelectionResultsBlock(dataSchema, _rows);
-  }
-
   /**
    * Helper method to compute the result when all the output expressions are 
ordered.
    */
@@ -481,6 +308,7 @@ public class SelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock
     return Collections.singletonList(_transformOperator);
   }
 
+  @Override
   public IndexSegment getIndexSegment() {
     return _indexSegment;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
new file mode 100644
index 0000000000..62de4824cd
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * An operator for order-by queries ASC that are partially sorted over the 
sorting keys.
+ * @see LinearSelectionOrderByOperator
+ */
+public class SelectionPartiallyOrderedByAscOperator extends 
LinearSelectionOrderByOperator {
+
+  private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_ASC";
+
+  private int _numDocsScanned = 0;
+
+  public SelectionPartiallyOrderedByAscOperator(IndexSegment indexSegment, 
QueryContext queryContext,
+      List<ExpressionContext> expressions, TransformOperator 
transformOperator, int numSortedExpressions) {
+    super(indexSegment, queryContext, expressions, transformOperator, 
numSortedExpressions);
+    Preconditions.checkArgument(queryContext.getOrderByExpressions().stream()
+            .filter(expr -> expr.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER)
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("The query is not 
order by identifiers"))
+            .isAsc(),
+        "%s can only be used when the first column in order by is ASC", 
EXPLAIN_NAME);
+  }
+
+  @Override
+  protected List<Object[]> fetch(Supplier<ListBuilder> listBuilderSupplier) {
+    int numExpressions = _expressions.size();
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+    ListBuilder listBuilder = listBuilderSupplier.get();
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      IntFunction<Object[]> rowFetcher = fetchBlock(transformBlock, 
blockValSets);
+      int numDocsFetched = transformBlock.getNumDocs();
+      _numDocsScanned += numDocsFetched;
+      for (int i = 0; i < numDocsFetched; i++) {
+        if (listBuilder.add(rowFetcher.apply(i))) {
+          return listBuilder.build();
+        }
+      }
+    }
+    return listBuilder.build();
+  }
+
+  @Override
+  public int getNumDocsScanned() {
+    return _numDocsScanned;
+  }
+
+  @Override
+  protected String getExplainName() {
+    return EXPLAIN_NAME;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
new file mode 100644
index 0000000000..5322e0a6e6
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * An operator for order-by queries DESC that are partially sorted over the 
sorting keys.
+ *
+ * @see LinearSelectionOrderByOperator
+ */
+public class SelectionPartiallyOrderedByDescOperation extends 
LinearSelectionOrderByOperator {
+
+  private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_DESC";
+
+  private int _numDocsScanned = 0;
+
+  public SelectionPartiallyOrderedByDescOperation(IndexSegment indexSegment, 
QueryContext queryContext,
+      List<ExpressionContext> expressions, TransformOperator 
transformOperator, int numSortedExpressions) {
+    super(indexSegment, queryContext, expressions, transformOperator, 
numSortedExpressions);
+    assert queryContext.getOrderByExpressions() != null;
+    Preconditions.checkArgument(queryContext.getOrderByExpressions().stream()
+            .filter(expr -> expr.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER).findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("The query is not 
order by identifiers")).isDesc(),
+        "%s can only be used when the first column in order by is DESC", 
EXPLAIN_NAME);
+  }
+
+  @Override
+  protected List<Object[]> fetch(Supplier<ListBuilder> listBuilderSupplier) {
+
+    // Ideally we would use a descending cursor, but we don't actually have 
them
+    // Alternatively, we could store all blocks in a list and iterate them in 
reverse order, but ProjectionBlocks share
+    // the same DataBlockCache, so they may be ephemeral and being overridden 
by the next block.
+    // The only alternative we have right now is to retrieve the last LIMIT 
elements from each block
+
+    int numExpressions = _expressions.size();
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+    List<Object[]> localBestRows = new ArrayList<>();
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      IntFunction<Object[]> rowFetcher = fetchBlock(transformBlock, 
blockValSets);
+      int numDocsFetched = transformBlock.getNumDocs();
+      _numDocsScanned += numDocsFetched;
+      ListBuilder listBuilder = listBuilderSupplier.get();
+
+      // first, calculate the best rows on this block
+      boolean enoughRowsCollected = false;
+      for (int docId = numDocsFetched - 1; docId >= 0; docId--) {
+        enoughRowsCollected = listBuilder.add(rowFetcher.apply(docId));
+        if (enoughRowsCollected) {
+          break;
+        }
+      }
+      // then try to add the best rows from previous block
+      if (!enoughRowsCollected) {
+        Iterator<Object[]> localBestRowIt = localBestRows.iterator();
+        while (!enoughRowsCollected && localBestRowIt.hasNext()) {
+          enoughRowsCollected = listBuilder.add(localBestRowIt.next());
+        }
+      }
+      // finally update bestRowsFromPrevBloc
+      localBestRows = listBuilder.build();
+    }
+    // Once we finished, localBestRows contains the global best rows
+    return localBestRows;
+  }
+
+  @Override
+  protected int getNumDocsScanned() {
+    return _numDocsScanned;
+  }
+
+  @Override
+  protected String getExplainName() {
+    return EXPLAIN_NAME;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index 5d40398df8..9ed48c5983 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.plan;
 
 import java.util.ArrayList;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.common.Operator;
@@ -27,10 +28,15 @@ import 
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
 import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
 import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
+import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.QueryOptionsUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 
 
 /**
@@ -50,67 +56,132 @@ public class SelectionPlanNode implements PlanNode {
     List<ExpressionContext> expressions = 
SelectionOperatorUtils.extractExpressions(_queryContext, _indexSegment);
     int limit = _queryContext.getLimit();
 
-    if (limit > 0) {
-      List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
-      if (orderByExpressions == null) {
-        // Selection only
-        TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-            Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-        return new SelectionOnlyOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
-      } else {
-        // Selection order-by
-        if (isAllOrderByColumnsSorted(orderByExpressions)) {
-          // All order-by columns are sorted, no need to sort the records
-          TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-              Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, true);
-        } else if (orderByExpressions.size() == expressions.size()) {
-          // All output expressions are ordered
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        } else {
-          // Not all output expressions are ordered, only fetch the order-by 
expressions and docId to avoid the
-          // unnecessary data fetch
-          List<ExpressionContext> expressionsToTransform = new 
ArrayList<>(orderByExpressions.size());
-          for (OrderByExpressionContext orderByExpression : 
orderByExpressions) {
-            expressionsToTransform.add(orderByExpression.getExpression());
-          }
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform,
-                  DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        }
-      }
-    } else {
+    if (limit == 0) {
       // Empty selection (LIMIT 0)
       TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions, 0).run();
       return new EmptySelectionOperator(_indexSegment, expressions, 
transformOperator);
     }
+
+    List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
+    if (orderByExpressions == null) {
+      // Selection only
+      // ie: SELECT ... FROM Table WHERE ... LIMIT 10
+      int maxDocsPerCall = Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+      TransformPlanNode planNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressions, maxDocsPerCall);
+      TransformOperator transformOperator = planNode.run();
+
+      return new SelectionOnlyOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
+    }
+    int numOrderByExpressions = orderByExpressions.size();
+    // Although it is a break of abstraction, some code, specially merging, 
assumes that if there is an order by
+    // expression the operator will return a block whose selection result is a 
priority queue.
+    int sortedColumnsPrefixSize = getSortedColumnsPrefix(orderByExpressions, 
_queryContext.isNullHandlingEnabled());
+    OrderByAlgorithm orderByAlgorithm = 
OrderByAlgorithm.fromQueryContext(_queryContext);
+    if (sortedColumnsPrefixSize > 0 && orderByAlgorithm != 
OrderByAlgorithm.NAIVE) {
+      int maxDocsPerCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
+      // The first order by expressions are sorted (either asc or desc).
+      // ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column 
DESC LIMIT 10 OFFSET 5
+      // or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column, 
not_sorted LIMIT 10 OFFSET 5
+      // but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted, 
sorted_column LIMIT 10 OFFSET 5
+      if (orderByExpressions.get(0).isAsc()) {
+        if (sortedColumnsPrefixSize == orderByExpressions.size()) {
+          maxDocsPerCall = Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
+        }
+        TransformPlanNode planNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressions, maxDocsPerCall);
+        TransformOperator transformOperator = planNode.run();
+        return new SelectionPartiallyOrderedByAscOperator(_indexSegment, 
_queryContext, expressions, transformOperator,
+            sortedColumnsPrefixSize);
+      } else {
+        TransformPlanNode planNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressions, maxDocsPerCall);
+        TransformOperator transformOperator = planNode.run();
+        return new SelectionPartiallyOrderedByDescOperation(_indexSegment, 
_queryContext, expressions,
+            transformOperator, sortedColumnsPrefixSize);
+      }
+    }
+    if (numOrderByExpressions == expressions.size()) {
+      // All output expressions are ordered
+      // ie: SELECT not_sorted1, not_sorted2 FROM Table WHERE ... ORDER BY 
not_sorted1, not_sorted2 LIMIT 10 OFFSET 5
+      TransformOperator transformOperator =
+          new TransformPlanNode(_indexSegment, _queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+      return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
+    }
+    // Not all output expressions are ordered, only fetch the order-by 
expressions and docId to avoid the
+    // unnecessary data fetch
+    // ie: SELECT ... FROM Table WHERE ... ORDER BY not_sorted1, not_sorted2 
LIMIT 10
+    List<ExpressionContext> expressionsToTransform = new 
ArrayList<>(numOrderByExpressions);
+    for (OrderByExpressionContext orderByExpression : orderByExpressions) {
+      expressionsToTransform.add(orderByExpression.getExpression());
+    }
+    TransformOperator transformOperator = new TransformPlanNode(_indexSegment, 
_queryContext, expressionsToTransform,
+        DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+    return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
   }
 
   /**
-   *  This function checks whether all columns in order by clause are 
pre-sorted.
-   *  This is used to optimize order by limit clauses.
-   *  For eg:
-   *  A query like "select * from table order by col1, col2 limit 10"
-   *  will take all the n matching rows and add it to a priority queue of size 
10.
-   *  This is nlogk operation which can be quite expensive for a large n.
-   *  In the above example, if the docs in the segment are already sorted by 
col1 and col2 then there is no need for
-   *  sorting at all (only limit is needed).
-   * @return true is all columns in order by clause are sorted . False 
otherwise
+   * This functions returns the number of expressions that are sorted by the 
implicit order in the index.
+   *
+   * This means that query that uses these expressions in its order by doesn't 
actually need to sort from 0 to the
+   * given number (excluded) of expressions, as they are returned in the 
correct order.
+   *
+   * This method supports ASC and DESC order and ensures that all prefix 
expressions follow the same order. For example,
+   * ORDER BY sorted_col1 ASC, sorted_col2 ASC and ORDER BY sorted_col1 DESC, 
sorted_col2 DESC will return 2 but
+   * ORDER BY sorted_col1 DESC, sorted_col2 ASC and ORDER BY sorted_col1 ASC, 
sorted_col2 DESC will return 1 while
+   * ORDER BY not_sorted, sorted_col1 will return 0 because the first column 
is not sorted.
+   *
+   * It doesn't make sense to add literal expressions in an order by 
expression, but if they are included, they are
+   * considered sorted and its ASC/DESC is ignored.
+   *
+   * @return the max number that guarantees that from the first expression to 
the returned number, the index is already
+   * sorted.
    */
-  private boolean isAllOrderByColumnsSorted(List<OrderByExpressionContext> 
orderByExpressions) {
-    for (OrderByExpressionContext orderByExpression : orderByExpressions) {
-      if (!(orderByExpression.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER)
-          || !orderByExpression.isAsc()) {
-        return false;
+  private int getSortedColumnsPrefix(List<OrderByExpressionContext> 
orderByExpressions, boolean isNullHandlingEnabled) {
+    boolean asc = orderByExpressions.get(0).isAsc();
+    for (int i = 0; i < orderByExpressions.size(); i++) {
+      if (!isSorted(orderByExpressions.get(i), asc, isNullHandlingEnabled)) {
+        return i;
+      }
+    }
+    // If we reach here, all are sorted
+    return orderByExpressions.size();
+  }
+
+  private boolean isSorted(OrderByExpressionContext orderByExpression, boolean 
asc, boolean isNullHandlingEnabled) {
+    switch (orderByExpression.getExpression().getType()) {
+      case LITERAL: {
+        return true;
+      }
+      case IDENTIFIER: {
+        if (!orderByExpression.isAsc() == asc) {
+          return false;
+        }
+        String column = orderByExpression.getExpression().getIdentifier();
+        DataSource dataSource = _indexSegment.getDataSource(column);
+        // If there are null values, we cannot trust 
DataSourceMetadata.isSorted
+        if (isNullHandlingEnabled) {
+          NullValueVectorReader nullValueVector = 
dataSource.getNullValueVector();
+          if (nullValueVector != null && 
!nullValueVector.getNullBitmap().isEmpty()) {
+            return false;
+          }
+        }
+        return dataSource.getDataSourceMetadata().isSorted();
       }
-      String column = orderByExpression.getExpression().getIdentifier();
-      if 
(!_indexSegment.getDataSource(column).getDataSourceMetadata().isSorted()) {
+      case FUNCTION: // we could optimize monotonically increasing functions
+      default: {
         return false;
       }
     }
-    return true;
+  }
+
+  public enum OrderByAlgorithm {
+    NAIVE;
+
+    @Nullable
+    public static OrderByAlgorithm fromQueryContext(QueryContext queryContext) 
{
+      String orderByAlgorithm = 
QueryOptionsUtils.getOrderByAlgorithm(queryContext.getQueryOptions());
+      if (orderByAlgorithm == null) {
+        return null;
+      }
+      return OrderByAlgorithm.valueOf(orderByAlgorithm.toUpperCase());
+    }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
new file mode 100644
index 0000000000..4258317e76
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * A utility class used to create comparators that should be used by operators 
that implements order by semantics.
+ */
+public class OrderByComparatorFactory {
+  private OrderByComparatorFactory() {
+  }
+
+  public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext> orderByExpressions,
+      TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, 
boolean nullHandlingEnabled) {
+    return getComparator(orderByExpressions, orderByExpressionMetadata, 
reverse, nullHandlingEnabled, 0,
+        orderByExpressions.size());
+  }
+
+  /**
+   * @param reverse if false, the comparator will order in the direction 
indicated by the
+   * {@link OrderByExpressionContext#isAsc()}. Otherwise, it will be in the 
opposite direction.
+   */
+  public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext> orderByExpressions,
+      TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, 
boolean nullHandlingEnabled, int from,
+      int to) {
+    Preconditions.checkArgument(to <= orderByExpressions.size(),
+        "Trying to access %sth position of orderByExpressions with size %s", 
to, orderByExpressions.size());
+    Preconditions.checkArgument(to <= orderByExpressionMetadata.length,
+        "Trying to access %sth position of orderByExpressionMetadata with size 
%s", to,
+        orderByExpressionMetadata.length);
+    Preconditions.checkArgument(from < to, "FROM (%s) must be lower than TO 
(%s)", from, to);
+
+    // Compare all single-value columns
+    int numOrderByExpressions = to - from;
+    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+    for (int i = from; i < to; i++) {
+      if (orderByExpressionMetadata[i].isSingleValue()) {
+        valueIndexList.add(i);
+      } else {
+        // MV columns should not be part of the selection order by only list
+        throw new BadQueryRequestException(
+            String.format("MV expression: %s should not be included in the 
ORDER-BY clause",
+                orderByExpressions.get(i)));
+      }
+    }
+
+    int numValuesToCompare = valueIndexList.size();
+    int[] valueIndices = new int[numValuesToCompare];
+    FieldSpec.DataType[] storedTypes = new 
FieldSpec.DataType[numValuesToCompare];
+    // Use multiplier -1 or 1 to control ascending/descending order
+    int[] multipliers = new int[numValuesToCompare];
+    int ascMult = reverse ? -1 : 1;
+    int descMult = reverse ? 1 : -1;
+    for (int i = 0; i < numValuesToCompare; i++) {
+      int valueIndex = valueIndexList.get(i);
+      valueIndices[i] = valueIndex;
+      storedTypes[i] = 
orderByExpressionMetadata[valueIndex].getDataType().getStoredType();
+      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? ascMult : 
descMult;
+    }
+
+    if (nullHandlingEnabled) {
+      return (Object[] o1, Object[] o2) -> {
+        for (int i = 0; i < numValuesToCompare; i++) {
+          int index = valueIndices[i];
+          // TODO: Evaluate the performance of casting to Comparable and avoid 
the switch
+          Object v1 = o1[index];
+          Object v2 = o2[index];
+          if (v1 == null) {
+            // The default null ordering is: 'NULLS LAST', regardless of the 
ordering direction.
+            return v2 == null ? 0 : -multipliers[i];
+          } else if (v2 == null) {
+            return multipliers[i];
+          }
+          int result = compareCols(v1, v2, storedTypes[i], multipliers[i]);
+          if (result != 0) {
+            return result;
+          }
+        }
+        return 0;
+      };
+    } else {
+      return (Object[] o1, Object[] o2) -> {
+        for (int i = 0; i < numValuesToCompare; i++) {
+          int index = valueIndices[i];
+          // TODO: Evaluate the performance of casting to Comparable and avoid 
the switch
+          int result = compareCols(o1[index], o2[index], storedTypes[i], 
multipliers[i]);
+          if (result != 0) {
+            return result;
+          }
+        }
+        return 0;
+      };
+    }
+  }
+
+  private static int compareCols(Object v1, Object v2, FieldSpec.DataType 
type, int multiplier) {
+
+    // TODO: Evaluate the performance of casting to Comparable and avoid the 
switch
+    int result;
+    switch (type) {
+      case INT:
+        result = ((Integer) v1).compareTo((Integer) v2);
+        break;
+      case LONG:
+        result = ((Long) v1).compareTo((Long) v2);
+        break;
+      case FLOAT:
+        result = ((Float) v1).compareTo((Float) v2);
+        break;
+      case DOUBLE:
+        result = ((Double) v1).compareTo((Double) v2);
+        break;
+      case BIG_DECIMAL:
+        result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
+        break;
+      case STRING:
+        result = ((String) v1).compareTo((String) v2);
+        break;
+      case BYTES:
+        result = ((ByteArray) v1).compareTo((ByteArray) v2);
+        break;
+      // NOTE: Multi-value columns are not comparable, so we should not reach 
here
+      default:
+        throw new IllegalStateException();
+    }
+    return result * multiplier;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
index 6190373295..4a0d6e5d8c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
@@ -100,4 +100,9 @@ public class QueryOptionsUtils {
   public static boolean isServerReturnFinalResult(Map<String, String> 
queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT));
   }
+
+  @Nullable
+  public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
+    return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index e8db2051bf..88b81d5b88 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -224,7 +224,7 @@ public class SelectionCombineOperatorTest {
     SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM 
testTable ORDER BY intColumn");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
combineResult.getRows();
+    PriorityQueue<Object[]> selectionResult = 
combineResult.getRowsAsPriorityQueue();
     assertNotNull(selectionResult);
     assertEquals(selectionResult.size(), 10);
     int expectedValue = 9;
@@ -248,7 +248,7 @@ public class SelectionCombineOperatorTest {
     combineResult = getCombineResult("SELECT * FROM testTable ORDER BY 
intColumn DESC");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    selectionResult = (PriorityQueue<Object[]>) combineResult.getRows();
+    selectionResult = combineResult.getRowsAsPriorityQueue();
     assertNotNull(selectionResult);
     assertEquals(selectionResult.size(), 10);
     expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40;
@@ -272,7 +272,7 @@ public class SelectionCombineOperatorTest {
     combineResult = getCombineResult("SELECT * FROM testTable ORDER BY 
intColumn DESC LIMIT 10000");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    selectionResult = (PriorityQueue<Object[]>) combineResult.getRows();
+    selectionResult = combineResult.getRowsAsPriorityQueue();
     assertNotNull(selectionResult);
     assertEquals(selectionResult.size(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
     // Should not early-terminate
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
new file mode 100644
index 0000000000..b7e2a2b6de
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import 
org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.PartiallySortedListBuilder;
+import 
org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.TotallySortedListBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class LinearSelectionOrderByOperatorTest {
+
+  @Test
+  public void testTotallySortedListBuilder() {
+    int maxNumRows = 10;
+
+    // Enough rows collected
+    TotallySortedListBuilder listBuilder = new 
TotallySortedListBuilder(maxNumRows);
+    for (int i = 0; i < maxNumRows; i++) {
+      Object[] row = new Object[]{i / 2};
+      boolean enoughRowsCollected = listBuilder.add(row);
+      assertEquals(enoughRowsCollected, i == maxNumRows - 1);
+    }
+    List<Object[]> rows = listBuilder.build();
+    assertEquals(rows.size(), maxNumRows);
+    for (int i = 0; i < maxNumRows; i++) {
+      assertEquals(rows.get(i), new Object[]{i / 2});
+    }
+
+    // Not enough rows collected
+    listBuilder = new TotallySortedListBuilder(maxNumRows);
+    for (int i = 0; i < maxNumRows - 1; i++) {
+      Object[] row = new Object[]{i / 2};
+      assertFalse(listBuilder.add(row));
+    }
+    rows = listBuilder.build();
+    assertEquals(rows.size(), maxNumRows - 1);
+    for (int i = 0; i < maxNumRows - 1; i++) {
+      assertEquals(rows.get(i), new Object[]{i / 2});
+    }
+  }
+
+  @Test
+  public void testPartiallySortedListBuilder() {
+    int maxNumRows = 10;
+    Comparator<Object[]> partitionComparator = Comparator.comparingInt(row -> 
(Integer) row[0]);
+    Comparator<Object[]> unsortedComparator = (row1, row2) -> 
Integer.compare((Integer) row2[1], (Integer) row1[1]);
+
+    // Enough rows collected without tie rows
+    PartiallySortedListBuilder listBuilder =
+        new PartiallySortedListBuilder(maxNumRows, partitionComparator, 
unsortedComparator);
+    for (int i = 0; i < maxNumRows; i++) {
+      Object[] row = new Object[]{i / 2, maxNumRows - i};
+      assertFalse(listBuilder.add(row));
+    }
+    int lastPartitionValue = (maxNumRows - 1) / 2;
+    assertTrue(listBuilder.add(new Object[]{lastPartitionValue + 1, 0}));
+    List<Object[]> rows = listBuilder.build();
+    assertEquals(rows.size(), maxNumRows);
+    for (int i = 0; i < maxNumRows; i++) {
+      assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+    }
+
+    // Enough rows collected with tie rows
+    listBuilder = new PartiallySortedListBuilder(maxNumRows, 
partitionComparator, unsortedComparator);
+    for (int i = 0; i < maxNumRows; i++) {
+      Object[] row = new Object[]{i / 2, maxNumRows - i};
+      assertFalse(listBuilder.add(row));
+    }
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 0}));
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 2}));
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 4}));
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 6}));
+    assertTrue(listBuilder.add(new Object[]{lastPartitionValue + 1, 0}));
+    rows = listBuilder.build();
+    assertEquals(rows.size(), maxNumRows);
+    // For the last partition, should contain unsorted value 0 and 1
+    Set<Integer> unsortedValues = new HashSet<>();
+    for (int i = 0; i < maxNumRows; i++) {
+      if (i / 2 != lastPartitionValue) {
+        assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+      } else {
+        Object[] row = rows.get(i);
+        assertEquals(row[0], lastPartitionValue);
+        int unsortedValue = (int) row[1];
+        assertTrue(unsortedValue == 0 || unsortedValue == 1);
+        unsortedValues.add(unsortedValue);
+      }
+    }
+    assertEquals(unsortedValues.size(), 2);
+
+    // Not enough rows collected with tie rows
+    listBuilder = new PartiallySortedListBuilder(maxNumRows, 
partitionComparator, unsortedComparator);
+    for (int i = 0; i < maxNumRows; i++) {
+      Object[] row = new Object[]{i / 2, maxNumRows - i};
+      assertFalse(listBuilder.add(row));
+    }
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 0}));
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 2}));
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 4}));
+    assertFalse(listBuilder.add(new Object[]{lastPartitionValue, 6}));
+    rows = listBuilder.build();
+    assertEquals(rows.size(), maxNumRows);
+    // For the last partition, should contain unsorted value 0 and 1
+    unsortedValues = new HashSet<>();
+    for (int i = 0; i < maxNumRows; i++) {
+      if (i / 2 != lastPartitionValue) {
+        assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+      } else {
+        Object[] row = rows.get(i);
+        assertEquals(row[0], lastPartitionValue);
+        int unsortedValue = (int) row[1];
+        assertTrue(unsortedValue == 0 || unsortedValue == 1);
+        unsortedValues.add(unsortedValue);
+      }
+    }
+    assertEquals(unsortedValues.size(), 2);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index af88db06c7..876baf7a98 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -203,7 +203,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 DataSchema.ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     Object[] lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 4);
@@ -228,7 +228,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 DataSchema.ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+    selectionResult = resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 4);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
index e4de729912..9e3abdc929 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
@@ -203,7 +203,7 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest 
extends BaseMultiValu
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 DataSchema.ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     Object[] lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 4);
@@ -228,7 +228,7 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest 
extends BaseMultiValu
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 DataSchema.ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+    selectionResult = resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 4);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index f4f6a86678..b81065dfab 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -30,7 +30,8 @@ import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
+import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -88,7 +89,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     String query = "SELECT daysSinceEpoch FROM testTable WHERE "
         + "dateTimeConvert(daysSinceEpoch, '1:DAYS:EPOCH', 
'1:MILLISECONDS:EPOCH', '1:MILLISECONDS') > ago('P1D') "
         + "ORDER BY daysSinceEpoch LIMIT 10";
-    SelectionOrderByOperator selectionOrderByOperator = getOperator(query);
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
     SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     verifySelectionOrderByAgoFunctionResult(resultsBlock);
 
@@ -107,7 +108,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("daysSinceEpoch"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("daysSinceEpoch")),
 ColumnDataType.INT);
 
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     for (Object[] row : selectionResult) {
       assertEquals(row.length, 1);
@@ -231,7 +232,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
 ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 ColumnDataType.INT);
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     Object[] lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 4);
@@ -255,7 +256,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
 ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 ColumnDataType.INT);
-    selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+    selectionResult = resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 4);
@@ -263,6 +264,162 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertEquals(((Integer) 
lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
   }
 
+  @Test
+  public void testSelectionOrderBySortedColumn() {
+    // Test query order by single sorted column in ascending order
+    String orderBy = " ORDER BY column5";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(SELECTION_QUERY + orderBy);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 10 * (3 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    DataSchema dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column1", "column11"});
+    assertEquals(dataSchema.getColumnDataTypes(),
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.STRING});
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    assertEquals(selectionResult.size(), 10);
+    Object[] lastRow = selectionResult.peek();
+    assertEquals(lastRow.length, 3);
+    assertEquals(lastRow[0], "gFuH");
+
+    // Test query order by single sorted column in descending order
+    orderBy = " ORDER BY column5 DESC";
+    selectionOrderByOperator = getOperator(SELECTION_QUERY + orderBy);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByDescOperation);
+    resultsBlock = selectionOrderByOperator.nextBlock();
+    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 30000 * (3 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column1", "column11"});
+    assertEquals(dataSchema.getColumnDataTypes(),
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.STRING});
+    selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    assertEquals(selectionResult.size(), 10);
+    lastRow = selectionResult.peek();
+    assertEquals(lastRow.length, 3);
+    assertEquals(lastRow[0], "gFuH");
+
+    // Test query order by all sorted columns in ascending order
+    String query = "SELECT column5, daysSinceEpoch FROM testTable ORDER BY 
column5, daysSinceEpoch";
+    selectionOrderByOperator = getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
+    resultsBlock = selectionOrderByOperator.nextBlock();
+    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 10 * (2 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 20L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"daysSinceEpoch"});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+    selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    assertEquals(selectionResult.size(), 10);
+    lastRow = selectionResult.peek();
+    assertEquals(lastRow.length, 2);
+    assertEquals(lastRow[0], "gFuH");
+    assertEquals(lastRow[1], 126164076);
+
+    // Test query order by all sorted columns in descending order
+    query = "SELECT column5 FROM testTable ORDER BY column5 DESC, 
daysSinceEpoch DESC";
+    selectionOrderByOperator = getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByDescOperation);
+    resultsBlock = selectionOrderByOperator.nextBlock();
+    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 30000 * (2 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"daysSinceEpoch"});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+    selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    assertEquals(selectionResult.size(), 10);
+    lastRow = selectionResult.peek();
+    assertEquals(lastRow.length, 2);
+    assertEquals(lastRow[0], "gFuH");
+    assertEquals(lastRow[1], 167572854);
+
+    // Test query order by one sorted column in ascending order, the other 
sorted column in descending order
+    query = "SELECT daysSinceEpoch FROM testTable ORDER BY column5, 
daysSinceEpoch DESC";
+    selectionOrderByOperator = getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
+    resultsBlock = selectionOrderByOperator.nextBlock();
+    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 30000 * (2 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"daysSinceEpoch"});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+    selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    assertEquals(selectionResult.size(), 10);
+    lastRow = selectionResult.peek();
+    assertEquals(lastRow.length, 2);
+    assertEquals(lastRow[0], "gFuH");
+    assertEquals(lastRow[1], 167572854);
+
+    // Test query order by one sorted column in ascending order, and some 
unsorted columns
+    query = "SELECT column1 FROM testTable ORDER BY column5, column6, column1";
+    selectionOrderByOperator = getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
+    resultsBlock = selectionOrderByOperator.nextBlock();
+    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 30000 * (3 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column6", "column1"});
+    assertEquals(dataSchema.getColumnDataTypes(),
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.INT});
+    selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    assertEquals(selectionResult.size(), 10);
+    lastRow = selectionResult.peek();
+    assertEquals(lastRow.length, 3);
+    assertEquals(lastRow[0], "gFuH");
+    // Unsorted column values should be the same as ordering by their own
+    assertEquals(lastRow[1], 6043515);
+    assertEquals(lastRow[2], 10542595);
+
+    // Test query order by one sorted column in descending order, and some 
unsorted columns
+    query = "SELECT column6 FROM testTable ORDER BY column5 DESC, column6, 
column1";
+    selectionOrderByOperator = getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByDescOperation);
+    resultsBlock = selectionOrderByOperator.nextBlock();
+    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 30000 * (3 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column6", "column1"});
+    assertEquals(dataSchema.getColumnDataTypes(),
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.INT});
+    selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    assertEquals(selectionResult.size(), 10);
+    lastRow = selectionResult.peek();
+    assertEquals(lastRow.length, 3);
+    assertEquals(lastRow[0], "gFuH");
+    // Unsorted column values should be the same as ordering by their own
+    assertEquals(lastRow[1], 6043515);
+    assertEquals(lastRow[2], 10542595);
+  }
+
   @Test
   public void testSelectStarOrderBy() {
     // Test query without filter
@@ -283,7 +440,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
 ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 ColumnDataType.INT);
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     Object[] lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 11);
@@ -308,7 +465,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
 ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 ColumnDataType.INT);
-    selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+    selectionResult = resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 11);
@@ -336,7 +493,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertEquals(selectionDataSchema.size(), 11);
     assertTrue(columnIndexMap.containsKey("column5"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")),
 ColumnDataType.STRING);
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     Object[] lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 11);
@@ -358,7 +515,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertEquals(selectionDataSchema.size(), 11);
     assertTrue(columnIndexMap.containsKey("column5"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")),
 ColumnDataType.STRING);
-    selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+    selectionResult = 
resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 11);
@@ -387,7 +544,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
 ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 ColumnDataType.INT);
-    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getRows();
+    PriorityQueue<Object[]> selectionResult = 
resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 12000);
     Object[] lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 11);
@@ -412,7 +569,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
 ColumnDataType.INT);
     
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
 ColumnDataType.INT);
-    selectionResult = (PriorityQueue<Object[]>) resultsBlock.getRows();
+    selectionResult = resultsBlock.getRowsAsPriorityQueue();
     assertEquals(selectionResult.size(), 6129);
     lastRow = selectionResult.peek();
     assertEquals(lastRow.length, 11);
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
new file mode 100644
index 0000000000..308c7c3761
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.queries.BaseQueriesTest;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkOrderByQueries extends BaseQueriesTest {
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new 
OptionsBuilder().include(BenchmarkOrderByQueries.class.getSimpleName());
+    new Runner(opt.build()).run();
+  }
+
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"FilteredAggregationsTest");
+  private static final String TABLE_NAME = "MyTable";
+  private static final String FIRST_SEGMENT_NAME = "firstTestSegment";
+  private static final String SECOND_SEGMENT_NAME = "secondTestSegment";
+  private static final String INT_COL_NAME = "INT_COL";
+  private static final String SORTED_COL_NAME = "SORTED_COL";
+  private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
+  private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
+  private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
+  private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
+  private static final String LOW_CARDINALITY_STRING_COL = 
"LOW_CARDINALITY_STRING_COL";
+
+  @Param("1500000")
+  private int _numRows;
+  @Param({"naive", "null"})
+  private String _orderByAlgorithm;
+  @Param({"EXP(0.5)"})
+  String _scenario;
+  @Param({"1", "1000"})
+  int _primaryRepetitions;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+  private LongSupplier _supplier;
+
+  @Setup
+  public void setUp()
+      throws Exception {
+    _supplier = Distribution.createLongSupplier(42, _scenario);
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    buildSegment(FIRST_SEGMENT_NAME);
+    buildSegment(SECOND_SEGMENT_NAME);
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+
+    Set<String> invertedIndexCols = new HashSet<>();
+    invertedIndexCols.add(INT_COL_NAME);
+    invertedIndexCols.add(LOW_CARDINALITY_STRING_COL);
+
+    indexLoadingConfig.setRangeIndexColumns(invertedIndexCols);
+    indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
+
+    ImmutableSegment firstImmutableSegment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME), 
indexLoadingConfig);
+    ImmutableSegment secondImmutableSegment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, SECOND_SEGMENT_NAME), 
indexLoadingConfig);
+    _indexSegment = firstImmutableSegment;
+    _indexSegments = Arrays.asList(firstImmutableSegment, 
secondImmutableSegment);
+  }
+
+  @TearDown
+  public void tearDown() {
+    for (IndexSegment indexSegment : _indexSegments) {
+      indexSegment.destroy();
+    }
+
+    FileUtils.deleteQuietly(INDEX_DIR);
+    EXECUTOR_SERVICE.shutdownNow();
+  }
+
+  private List<GenericRow> createTestData(int numRows) {
+    Map<Integer, String> strings = new HashMap<>();
+    List<GenericRow> rows = new ArrayList<>();
+    String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i -> 
"value" + i)
+        .toArray(String[]::new);
+    for (int i = 0; i < numRows; i += _primaryRepetitions) {
+      for (int j = 0; j < _primaryRepetitions; j++) {
+        GenericRow row = new GenericRow();
+        row.putValue(SORTED_COL_NAME, i);
+        row.putValue(INT_COL_NAME, (int) _supplier.getAsLong());
+        row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong());
+        row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong());
+        row.putValue(RAW_STRING_COL_NAME, strings.computeIfAbsent(
+            (int) _supplier.getAsLong(), k -> UUID.randomUUID().toString()));
+        row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME));
+        row.putValue(LOW_CARDINALITY_STRING_COL, lowCardinalityValues[(i + j) 
% lowCardinalityValues.length]);
+        rows.add(row);
+      }
+    }
+    return rows;
+  }
+
+  private void buildSegment(String segmentName)
+      throws Exception {
+    List<GenericRow> rows = createTestData(_numRows);
+    List<FieldConfig> fieldConfigs = new ArrayList<>();
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setInvertedIndexColumns(Collections.singletonList(INT_COL_NAME))
+        .setFieldConfigList(fieldConfigs)
+        .setNoDictionaryColumns(Arrays.asList(RAW_INT_COL_NAME, 
RAW_STRING_COL_NAME))
+        .setSortedColumn(SORTED_COL_NAME)
+        .setStarTreeIndexConfigs(Collections.singletonList(new 
StarTreeIndexConfig(
+            Arrays.asList(SORTED_COL_NAME, INT_COL_NAME), null, 
Collections.singletonList(
+            new AggregationFunctionColumnPair(AggregationFunctionType.SUM, 
RAW_INT_COL_NAME).toColumnName()),
+            Integer.MAX_VALUE)))
+        .build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(RAW_STRING_COL_NAME, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(NO_INDEX_STRING_COL, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LOW_CARDINALITY_STRING_COL, 
FieldSpec.DataType.STRING)
+        .build();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(segmentName);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  @Benchmark
+  public BrokerResponseNative sortedAsc() {
+    return getBrokerResponse(
+        "SELECT SORTED_COL "
+            + "FROM MyTable "
+            + "ORDER BY SORTED_COL ASC "
+            + "LIMIT 1052 "
+            + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+  }
+  @Benchmark
+  public BrokerResponseNative sortedAscPartially() {
+    return getBrokerResponse(
+        "SELECT SORTED_COL "
+            + "FROM MyTable "
+            + "ORDER BY SORTED_COL ASC, LOW_CARDINALITY_STRING_COL "
+            + "LIMIT 1052 "
+            + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+  }
+
+  @Benchmark
+  public BrokerResponseNative sortedDesc() {
+    return getBrokerResponse(
+        "SELECT SORTED_COL "
+            + "FROM MyTable "
+            + "ORDER BY SORTED_COL DESC "
+            + "LIMIT 1052 "
+            + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+  }
+
+  @Benchmark
+  public BrokerResponseNative sortedDescPartially() {
+    return getBrokerResponse(
+        "SELECT SORTED_COL "
+            + "FROM MyTable "
+            + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
+            + "LIMIT 1052 "
+            + "option(orderByAlgorithm=" + _orderByAlgorithm + ")");
+  }
+
+  @Override
+  protected String getFilter() {
+    return null;
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
index 94efe0f9dd..494129f0a1 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
@@ -50,6 +50,8 @@ public interface DataSourceMetadata {
 
   /**
    * Returns {@code true} if the column is sorted, {@code false} otherwise.
+   *
+   * The result of this method cannot be trusted if null handling is enabled.
    */
   boolean isSorted();
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7c33c8233b..fa2218ffb7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -289,6 +289,8 @@ public class CommonConstants {
         // Reorder scan based predicates based on cardinality and number of 
selected values
         public static final String AND_SCAN_REORDERING = "AndScanReordering";
 
+        public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";
+
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated
         public static final String PRESERVE_TYPE = "preserveType";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to