This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad0d2b173a Enhance Broker reducer to handle expression format change 
(#11762)
ad0d2b173a is described below

commit ad0d2b173a67ac19bc39120179f48192c80520e4
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 11 18:21:29 2023 -0700

    Enhance Broker reducer to handle expression format change (#11762)
---
 .../query/reduce/AggregationDataTableReducer.java  |  61 +++---
 .../core/query/reduce/BrokerReduceService.java     |  46 ++++-
 .../query/reduce/DistinctDataTableReducer.java     | 121 +++++------
 .../core/query/reduce/GroupByDataTableReducer.java |   5 +-
 .../core/query/reduce/ReducerDataSchemaUtils.java  | 108 ++++++++++
 .../query/reduce/SelectionDataTableReducer.java    |  70 ++-----
 .../reduce/SelectionOnlyStreamingReducer.java      |  24 ++-
 .../query/selection/SelectionOperatorService.java  |  53 ++---
 .../query/selection/SelectionOperatorUtils.java    | 221 ++++++++++++---------
 .../query/reduce/ReducerDataSchemaUtilsTest.java   | 107 ++++++++++
 .../selection/SelectionOperatorUtilsTest.java      | 210 ++++++++++++++++++++
 .../apache/pinot/queries/DistinctQueriesTest.java  |  10 +-
 .../LeafStageTransferableBlockOperator.java        |  29 ++-
 13 files changed, 736 insertions(+), 329 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index 89e28d1924..d3c2711e81 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -23,10 +23,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -47,9 +45,12 @@ import org.roaringbitmap.RoaringBitmap;
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class AggregationDataTableReducer implements DataTableReducer {
   private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
 
-  AggregationDataTableReducer(QueryContext queryContext) {
+  public AggregationDataTableReducer(QueryContext queryContext) {
     _queryContext = queryContext;
+    _aggregationFunctions = _queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
   }
 
   /**
@@ -59,11 +60,11 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
   public void reduceAndSetResults(String tableName, DataSchema dataSchema,
       Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
       DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
-    assert dataSchema != null;
+    dataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext, 
dataSchema);
 
     if (dataTableMap.isEmpty()) {
       DataSchema resultTableSchema =
-          new PostAggregationHandler(_queryContext, 
getPrePostAggregationDataSchema()).getResultDataSchema();
+          new PostAggregationHandler(_queryContext, 
getPrePostAggregationDataSchema(dataSchema)).getResultDataSchema();
       brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, 
Collections.emptyList()));
       return;
     }
@@ -78,9 +79,7 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
 
   private void reduceWithIntermediateResult(DataSchema dataSchema, 
Collection<DataTable> dataTables,
       BrokerResponseNative brokerResponseNative) {
-    AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
-    assert aggregationFunctions != null;
-    int numAggregationFunctions = aggregationFunctions.length;
+    int numAggregationFunctions = _aggregationFunctions.length;
     Object[] intermediateResults = new Object[numAggregationFunctions];
     for (DataTable dataTable : dataTables) {
       for (int i = 0; i < numAggregationFunctions; i++) {
@@ -100,25 +99,23 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
         if (mergedIntermediateResult == null) {
           intermediateResults[i] = intermediateResultToMerge;
         } else {
-          intermediateResults[i] = 
aggregationFunctions[i].merge(mergedIntermediateResult, 
intermediateResultToMerge);
+          intermediateResults[i] = 
_aggregationFunctions[i].merge(mergedIntermediateResult, 
intermediateResultToMerge);
         }
         Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(i);
       }
     }
     Object[] finalResults = new Object[numAggregationFunctions];
     for (int i = 0; i < numAggregationFunctions; i++) {
-      AggregationFunction aggregationFunction = aggregationFunctions[i];
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
       Comparable result = 
aggregationFunction.extractFinalResult(intermediateResults[i]);
       finalResults[i] = result == null ? null : 
aggregationFunction.getFinalResultColumnType().convert(result);
     }
-    brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
+    
brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema),
 finalResults));
   }
 
   private void reduceWithFinalResult(DataSchema dataSchema, DataTable 
dataTable,
       BrokerResponseNative brokerResponseNative) {
-    AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
-    assert aggregationFunctions != null;
-    int numAggregationFunctions = aggregationFunctions.length;
+    int numAggregationFunctions = _aggregationFunctions.length;
     Object[] finalResults = new Object[numAggregationFunctions];
     for (int i = 0; i < numAggregationFunctions; i++) {
       ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
@@ -133,24 +130,23 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
         finalResults[i] = 
AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, 
i);
       }
     }
-    brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
+    brokerResponseNative.setResultTable(reduceToResultTable(dataSchema, 
finalResults));
   }
 
   /**
    * Sets aggregation results into ResultsTable
    */
-  private ResultTable reduceToResultTable(Object[] finalResults) {
-    PostAggregationHandler postAggregationHandler =
-        new PostAggregationHandler(_queryContext, 
getPrePostAggregationDataSchema());
-    DataSchema dataSchema = postAggregationHandler.getResultDataSchema();
+  private ResultTable reduceToResultTable(DataSchema dataSchema, Object[] 
finalResults) {
+    PostAggregationHandler postAggregationHandler = new 
PostAggregationHandler(_queryContext, dataSchema);
+    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
     Object[] row = postAggregationHandler.getResult(finalResults);
 
     RewriterResult resultRewriterResult =
-        ResultRewriteUtils.rewriteResult(dataSchema, 
Collections.singletonList(row));
-    dataSchema = resultRewriterResult.getDataSchema();
+        ResultRewriteUtils.rewriteResult(resultDataSchema, 
Collections.singletonList(row));
+    resultDataSchema = resultRewriterResult.getDataSchema();
     List<Object[]> rows = resultRewriterResult.getRows();
 
-    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    ColumnDataType[] columnDataTypes = resultDataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
     for (Object[] rewrittenRow : rows) {
       for (int j = 0; j < numColumns; j++) {
@@ -158,25 +154,18 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
       }
     }
 
-    return new ResultTable(dataSchema, rows);
+    return new ResultTable(resultDataSchema, rows);
   }
 
   /**
    * Constructs the DataSchema for the rows before the post-aggregation (SQL 
mode).
    */
-  private DataSchema getPrePostAggregationDataSchema() {
-    List<Pair<AggregationFunction, FilterContext>> 
filteredAggregationFunctions =
-        _queryContext.getFilteredAggregationFunctions();
-    assert filteredAggregationFunctions != null;
-    int numColumns = filteredAggregationFunctions.size();
-    String[] columnNames = new String[numColumns];
-    ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      Pair<AggregationFunction, FilterContext> pair = 
filteredAggregationFunctions.get(i);
-      AggregationFunction aggregationFunction = pair.getLeft();
-      columnNames[i] = 
AggregationFunctionUtils.getResultColumnName(aggregationFunction, 
pair.getRight());
-      columnDataTypes[i] = aggregationFunction.getFinalResultColumnType();
+  private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
+    int numAggregationFunctions = _aggregationFunctions.length;
+    ColumnDataType[] columnDataTypes = new 
ColumnDataType[numAggregationFunctions];
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      columnDataTypes[i] = _aggregationFunctions[i].getFinalResultColumnType();
     }
-    return new DataSchema(columnNames, columnDataTypes);
+    return new DataSchema(dataSchema.getColumnNames(), columnDataTypes);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index e89978625a..93cbecd5bb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -18,12 +18,16 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -38,6 +42,8 @@ import 
org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -46,6 +52,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
  */
 @ThreadSafe
 public class BrokerReduceService extends BaseReduceService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerReduceService.class);
+
   public BrokerReduceService(PinotConfiguration config) {
     super(config);
   }
@@ -65,7 +73,9 @@ public class BrokerReduceService extends BaseReduceService {
     BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
 
     // Cache a data schema from data tables (try to cache one with data rows 
associated with it).
-    DataSchema cachedDataSchema = null;
+    DataSchema dataSchemaFromEmptyDataTable = null;
+    DataSchema dataSchemaFromNonEmptyDataTable = null;
+    List<ServerRoutingInstance> serversWithConflictingDataSchema = new 
ArrayList<>();
 
     // Process server response metadata.
     Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = 
dataTableMap.entrySet().iterator();
@@ -83,12 +93,22 @@ public class BrokerReduceService extends BaseReduceService {
       } else {
         // Try to cache a data table with data rows inside, or cache one with 
data schema inside.
         if (dataTable.getNumberOfRows() == 0) {
-          if (cachedDataSchema == null) {
-            cachedDataSchema = dataSchema;
+          if (dataSchemaFromEmptyDataTable == null) {
+            dataSchemaFromEmptyDataTable = dataSchema;
           }
           iterator.remove();
         } else {
-          cachedDataSchema = dataSchema;
+          if (dataSchemaFromNonEmptyDataTable == null) {
+            dataSchemaFromNonEmptyDataTable = dataSchema;
+          } else {
+            // Remove data tables with conflicting data schema.
+            // NOTE: Only compare the column data types, since the column 
names (string representation of expression)
+            //       can change across different versions.
+            if (!Arrays.equals(dataSchema.getColumnDataTypes(), 
dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
+              serversWithConflictingDataSchema.add(entry.getKey());
+              iterator.remove();
+            }
+          }
         }
       }
     }
@@ -99,8 +119,23 @@ public class BrokerReduceService extends BaseReduceService {
     // Set execution statistics and Update broker metrics.
     aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
 
+    // Report the servers with conflicting data schema.
+    if (!serversWithConflictingDataSchema.isEmpty()) {
+      String errorMessage =
+          String.format("%s: responses for table: %s from servers: %s got 
dropped due to data schema inconsistency.",
+              QueryException.MERGE_RESPONSE_ERROR.getMessage(), tableName, 
serversWithConflictingDataSchema);
+      LOGGER.warn(errorMessage);
+      if (brokerMetrics != null) {
+        brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
+      }
+      brokerResponseNative.addToExceptions(
+          new 
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, 
errorMessage));
+    }
+
     // NOTE: When there is no cached data schema, that means all servers 
encountered exception. In such case, return the
     //       response with metadata only.
+    DataSchema cachedDataSchema =
+        dataSchemaFromNonEmptyDataTable != null ? 
dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
     if (cachedDataSchema == null) {
       return brokerResponseNative;
     }
@@ -124,8 +159,7 @@ public class BrokerReduceService extends BaseReduceService {
       if (gapfillType == null) {
         throw new BadQueryRequestException("Nested query is not supported 
without gapfill");
       }
-      BaseGapfillProcessor gapfillProcessor =
-          GapfillProcessorFactory.getGapfillProcessor(queryContext, 
gapfillType);
+      BaseGapfillProcessor gapfillProcessor = 
GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
       gapfillProcessor.process(brokerResponseNative);
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 5adc021bba..4553776963 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -19,20 +19,15 @@
 package org.apache.pinot.core.query.reduce;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -48,90 +43,74 @@ import org.roaringbitmap.RoaringBitmap;
 public class DistinctDataTableReducer implements DataTableReducer {
   private final QueryContext _queryContext;
 
-  DistinctDataTableReducer(QueryContext queryContext) {
+  public DistinctDataTableReducer(QueryContext queryContext) {
     _queryContext = queryContext;
   }
 
-  /**
-   * Reduces and sets results of distinct into ResultTable.
-   */
   @Override
   public void reduceAndSetResults(String tableName, DataSchema dataSchema,
       Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
       DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
-    // DISTINCT is implemented as an aggregation function in the execution 
engine. Just like
-    // other aggregation functions, DISTINCT returns its result as a single 
object
-    // (of type DistinctTable) serialized by the server into the DataTable and 
deserialized
-    // by the broker from the DataTable. So there should be exactly 1 row and 
1 column and that
-    // column value should be the serialized DistinctTable -- so essentially 
it is a DataTable
-    // inside a DataTable
+    dataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(_queryContext, 
dataSchema);
+    DistinctTable distinctTable =
+        new DistinctTable(dataSchema, _queryContext.getOrderByExpressions(), 
_queryContext.getLimit(),
+            _queryContext.isNullHandlingEnabled());
+    if (distinctTable.hasOrderBy()) {
+      addToOrderByDistinctTable(dataSchema, dataTableMap, distinctTable);
+    } else {
+      addToNonOrderByDistinctTable(dataSchema, dataTableMap, distinctTable);
+    }
+    brokerResponseNative.setResultTable(reduceToResultTable(distinctTable));
+  }
 
-    // Gather all non-empty DistinctTables
-    // TODO: until we upgrade to newer version of pinot, we have to keep both 
code path. remove after 0.12.0 release.
-    // This is to work with server rolling upgrade when partially returned as 
DistinctTable Obj and partially regular
-    // DataTable; if all returns are DataTable we can directly merge with 
priority queue (with dedup).
-    List<DistinctTable> nonEmptyDistinctTables = new 
ArrayList<>(dataTableMap.size());
+  private void addToOrderByDistinctTable(DataSchema dataSchema, 
Map<ServerRoutingInstance, DataTable> dataTableMap,
+      DistinctTable distinctTable) {
     for (DataTable dataTable : dataTableMap.values()) {
       Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
-
-      // Do not use the cached data schema because it might be either single 
object (legacy) or normal data table
-      dataSchema = dataTable.getDataSchema();
       int numColumns = dataSchema.size();
-      if (numColumns == 1 && dataSchema.getColumnDataType(0) == 
ColumnDataType.OBJECT) {
-        // DistinctTable is still being returned as a single object
-        CustomObject customObject = dataTable.getCustomObject(0, 0);
-        assert customObject != null;
-        DistinctTable distinctTable = 
ObjectSerDeUtils.deserialize(customObject);
-        if (!distinctTable.isEmpty()) {
-          nonEmptyDistinctTables.add(distinctTable);
+      int numRows = dataTable.getNumberOfRows();
+      if (_queryContext.isNullHandlingEnabled()) {
+        RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+        for (int coldId = 0; coldId < numColumns; coldId++) {
+          nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
+        }
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          distinctTable.addWithOrderBy(new Record(
+              
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable, 
rowId, nullBitmaps)));
         }
       } else {
-        // DistinctTable is being returned as normal data table
-        int numRows = dataTable.getNumberOfRows();
-        if (numRows > 0) {
-          List<Record> records = new ArrayList<>(numRows);
-          if (_queryContext.isNullHandlingEnabled()) {
-            RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
-            for (int coldId = 0; coldId < numColumns; coldId++) {
-              nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
-            }
-            for (int rowId = 0; rowId < numRows; rowId++) {
-              records.add(new Record(
-                  
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable, 
rowId, nullBitmaps)));
-            }
-          } else {
-            for (int rowId = 0; rowId < numRows; rowId++) {
-              records.add(new 
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)));
-            }
-          }
-          nonEmptyDistinctTables.add(new DistinctTable(dataSchema, records));
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          distinctTable.addWithOrderBy(new 
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)));
         }
       }
     }
+  }
 
-    if (nonEmptyDistinctTables.isEmpty()) {
-      // All the DistinctTables are empty, construct an empty response
-      // TODO: This returns schema with all STRING data types.
-      //       There's no way currently to get the data types of the distinct 
columns for empty results
-      List<ExpressionContext> expressions = 
_queryContext.getSelectExpressions();
-      int numExpressions = expressions.size();
-      String[] columns = new String[numExpressions];
-      for (int i = 0; i < numExpressions; i++) {
-        columns[i] = expressions.get(i).toString();
-      }
-      ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
-      Arrays.fill(columnDataTypes, ColumnDataType.STRING);
-      brokerResponseNative.setResultTable(
-          new ResultTable(new DataSchema(columns, columnDataTypes), 
Collections.emptyList()));
-    } else {
-      // Construct a main DistinctTable and merge all non-empty DistinctTables 
into it
-      DistinctTable mainDistinctTable =
-          new DistinctTable(nonEmptyDistinctTables.get(0).getDataSchema(), 
_queryContext.getOrderByExpressions(),
-              _queryContext.getLimit(), _queryContext.isNullHandlingEnabled());
-      for (DistinctTable distinctTable : nonEmptyDistinctTables) {
-        mainDistinctTable.mergeTable(distinctTable);
+  private void addToNonOrderByDistinctTable(DataSchema dataSchema, 
Map<ServerRoutingInstance, DataTable> dataTableMap,
+      DistinctTable distinctTable) {
+    for (DataTable dataTable : dataTableMap.values()) {
+      Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
+      int numColumns = dataSchema.size();
+      int numRows = dataTable.getNumberOfRows();
+      if (_queryContext.isNullHandlingEnabled()) {
+        RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+        for (int coldId = 0; coldId < numColumns; coldId++) {
+          nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
+        }
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          if (distinctTable.addWithoutOrderBy(new Record(
+              
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable, 
rowId, nullBitmaps)))) {
+            return;
+          }
+        }
+      } else {
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          if (distinctTable.addWithoutOrderBy(
+              new 
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)))) {
+            return;
+          }
+        }
       }
-      
brokerResponseNative.setResultTable(reduceToResultTable(mainDistinctTable));
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 654e6232a2..844d295892 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -72,7 +72,6 @@ import org.roaringbitmap.RoaringBitmap;
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class GroupByDataTableReducer implements DataTableReducer {
   private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, 
find a better value.
-  private static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK = 10_000;
 
   private final QueryContext _queryContext;
   private final AggregationFunction[] _aggregationFunctions;
@@ -81,7 +80,7 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
   private final int _numGroupByExpressions;
   private final int _numColumns;
 
-  GroupByDataTableReducer(QueryContext queryContext) {
+  public GroupByDataTableReducer(QueryContext queryContext) {
     _queryContext = queryContext;
     _aggregationFunctions = queryContext.getAggregationFunctions();
     assert _aggregationFunctions != null;
@@ -99,7 +98,7 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
   public void reduceAndSetResults(String tableName, DataSchema dataSchema,
       Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponse,
       DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
-    assert dataSchema != null;
+    dataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(_queryContext, 
dataSchema);
 
     if (dataTableMap.isEmpty()) {
       PostAggregationHandler postAggregationHandler =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtils.java
new file mode 100644
index 0000000000..7bd751877d
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+@SuppressWarnings("rawtypes")
+public class ReducerDataSchemaUtils {
+  private ReducerDataSchemaUtils() {
+  }
+
+  /**
+   * Returns the canonical data schema of the aggregation result based on the 
query and the data schema returned from
+   * the server.
+   * <p>Column names are re-generated in the canonical data schema to avoid 
the backward incompatibility caused by
+   * changing the string representation of the expression.
+   */
+  public static DataSchema canonicalizeDataSchemaForAggregation(QueryContext 
queryContext, DataSchema dataSchema) {
+    List<Pair<AggregationFunction, FilterContext>> 
filteredAggregationFunctions =
+        queryContext.getFilteredAggregationFunctions();
+    assert filteredAggregationFunctions != null;
+    int numAggregations = filteredAggregationFunctions.size();
+    Preconditions.checkState(dataSchema.size() == numAggregations,
+        "BUG: Expect same number of aggregations and columns in data schema, 
got %s aggregations, %s columns in data "
+            + "schema", numAggregations, dataSchema.size());
+    String[] columnNames = new String[numAggregations];
+    for (int i = 0; i < numAggregations; i++) {
+      Pair<AggregationFunction, FilterContext> pair = 
filteredAggregationFunctions.get(i);
+      AggregationFunction aggregationFunction = pair.getLeft();
+      columnNames[i] = 
AggregationFunctionUtils.getResultColumnName(aggregationFunction, 
pair.getRight());
+    }
+    return new DataSchema(columnNames, dataSchema.getColumnDataTypes());
+  }
+
+  /**
+   * Returns the canonical data schema of the group-by result based on the 
query and the data schema returned from the
+   * server. Group-by expressions are always at the beginning of the data 
schema, followed by the aggregations.
+   * <p>Column names are re-generated in the canonical data schema to avoid 
the backward incompatibility caused by
+   * changing the string representation of the expression.
+   */
+  public static DataSchema canonicalizeDataSchemaForGroupBy(QueryContext 
queryContext, DataSchema dataSchema) {
+    List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
+    List<Pair<AggregationFunction, FilterContext>> 
filteredAggregationFunctions =
+        queryContext.getFilteredAggregationFunctions();
+    assert groupByExpressions != null && filteredAggregationFunctions != null;
+    int numGroupByExpression = groupByExpressions.size();
+    int numAggregations = filteredAggregationFunctions.size();
+    int numColumns = numGroupByExpression + numAggregations;
+    String[] columnNames = new String[numColumns];
+    Preconditions.checkState(dataSchema.size() == numColumns,
+        "BUG: Expect same number of group-by expressions, aggregations and 
columns in data schema, got %s group-by "
+            + "expressions, %s aggregations, %s columns in data schema", 
numGroupByExpression, numAggregations,
+        dataSchema.size());
+    for (int i = 0; i < numGroupByExpression; i++) {
+      columnNames[i] = groupByExpressions.get(i).toString();
+    }
+    for (int i = 0; i < numAggregations; i++) {
+      Pair<AggregationFunction, FilterContext> pair = 
filteredAggregationFunctions.get(i);
+      columnNames[numGroupByExpression + i] =
+          AggregationFunctionUtils.getResultColumnName(pair.getLeft(), 
pair.getRight());
+    }
+    return new DataSchema(columnNames, dataSchema.getColumnDataTypes());
+  }
+
+  /**
+   * Returns the canonical data schema of the distinct result based on the 
query and the data schema returned from the
+   * server.
+   * <p>Column names are re-generated in the canonical data schema to avoid 
the backward incompatibility caused by
+   * changing the string representation of the expression.
+   */
+  public static DataSchema canonicalizeDataSchemaForDistinct(QueryContext 
queryContext, DataSchema dataSchema) {
+    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
+    int numSelectExpressions = selectExpressions.size();
+    Preconditions.checkState(dataSchema.size() == numSelectExpressions,
+        "BUG: Expect same number of columns in SELECT clause and data schema, 
got %s in SELECT clause, %s in data "
+            + "schema", numSelectExpressions, dataSchema.size());
+    String[] columnNames = new String[numSelectExpressions];
+    for (int i = 0; i < numSelectExpressions; i++) {
+      columnNames[i] = selectExpressions.get(i).toString();
+    }
+    return new DataSchema(columnNames, dataSchema.getColumnDataTypes());
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index a21684acdf..a8226aee5e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -18,38 +18,28 @@
  */
 package org.apache.pinot.core.query.reduce;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorService;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Helper class to reduce and set Selection results into the 
BrokerResponseNative
  */
 public class SelectionDataTableReducer implements DataTableReducer {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SelectionDataTableReducer.class);
-
   private final QueryContext _queryContext;
 
-  SelectionDataTableReducer(QueryContext queryContext) {
+  public SelectionDataTableReducer(QueryContext queryContext) {
     _queryContext = queryContext;
   }
 
@@ -60,55 +50,25 @@ public class SelectionDataTableReducer implements 
DataTableReducer {
   public void reduceAndSetResults(String tableName, DataSchema dataSchema,
       Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative 
brokerResponseNative,
       DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
-    if (dataTableMap.isEmpty()) {
-      // For empty data table map, construct empty result using the cached 
data schema for selection query
-      List<String> selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
-      DataSchema selectionDataSchema = 
SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns);
-      brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, 
Collections.emptyList()));
+    Pair<DataSchema, int[]> pair =
+        
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(_queryContext, 
dataSchema);
+    int limit = _queryContext.getLimit();
+    if (dataTableMap.isEmpty() || limit == 0) {
+      brokerResponseNative.setResultTable(new ResultTable(pair.getLeft(), 
Collections.emptyList()));
       return;
     }
-
-    // For data table map with more than one data tables, remove conflicting 
data tables
-    if (dataTableMap.size() > 1) {
-      DataSchema.ColumnDataType[] columnDataTypes = 
dataSchema.getColumnDataTypes();
-      List<ServerRoutingInstance> droppedServers = new ArrayList<>();
-      Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = 
dataTableMap.entrySet().iterator();
-      while (iterator.hasNext()) {
-        Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
-        DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
-        assert dataSchemaToCompare != null;
-        if (!Arrays.equals(columnDataTypes, 
dataSchemaToCompare.getColumnDataTypes())) {
-          droppedServers.add(entry.getKey());
-          iterator.remove();
-        }
-      }
-      if (!droppedServers.isEmpty()) {
-        String errorMessage =
-            QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses 
for table: " + tableName + " from servers: "
-                + droppedServers + " got dropped due to data schema 
inconsistency.";
-        LOGGER.warn(errorMessage);
-        if (brokerMetrics != null) {
-          
brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
-              BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
-        }
-        brokerResponseNative.addToExceptions(
-            new 
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, 
errorMessage));
-      }
-    }
-
-    int limit = _queryContext.getLimit();
-    if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
-      // Selection order-by
-      SelectionOperatorService selectionService = new 
SelectionOperatorService(_queryContext, dataSchema);
-      selectionService.reduceWithOrdering(dataTableMap.values(), 
_queryContext.isNullHandlingEnabled());
-      
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
-    } else {
+    if (_queryContext.getOrderByExpressions() == null) {
       // Selection only
-      List<String> selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
       List<Object[]> reducedRows = 
SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
           _queryContext.isNullHandlingEnabled());
       brokerResponseNative.setResultTable(
-          SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, 
dataSchema, selectionColumns));
+          SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, 
pair.getLeft(), pair.getRight()));
+    } else {
+      // Selection order-by
+      SelectionOperatorService selectionService =
+          new SelectionOperatorService(_queryContext, pair.getLeft(), 
pair.getRight());
+      selectionService.reduceWithOrdering(dataTableMap.values());
+      
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
index cda1ad2b87..ffa0ffdbd0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.reduce;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -58,7 +59,7 @@ public class SelectionOnlyStreamingReducer implements 
StreamingReducer {
     int numColumns = dataTable.getDataSchema().size();
     int numRows = dataTable.getNumberOfRows();
     if (nullHandlingEnabled) {
-      RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];;
+      RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
       for (int coldId = 0; coldId < numColumns; coldId++) {
         nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
       }
@@ -88,16 +89,19 @@ public class SelectionOnlyStreamingReducer implements 
StreamingReducer {
 
   @Override
   public BrokerResponseNative seal() {
-    BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
-    List<String> selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(_queryContext, _dataSchema);
-    if (_dataSchema != null && _rows.size() > 0) {
-      brokerResponseNative.setResultTable(
-          SelectionOperatorUtils.renderResultTableWithoutOrdering(_rows, 
_dataSchema, selectionColumns));
+    if (_dataSchema == null) {
+      return BrokerResponseNative.empty();
+    }
+    Pair<DataSchema, int[]> pair =
+        
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(_queryContext, 
_dataSchema);
+    ResultTable resultTable;
+    if (_rows.isEmpty()) {
+      resultTable = new ResultTable(pair.getLeft(), Collections.emptyList());
     } else {
-      // For empty data table map, construct empty result using the cached 
data schema for selection query
-      DataSchema selectionDataSchema = 
SelectionOperatorUtils.getResultTableDataSchema(_dataSchema, selectionColumns);
-      brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, 
Collections.emptyList()));
+      resultTable = 
SelectionOperatorUtils.renderResultTableWithoutOrdering(_rows, pair.getLeft(), 
pair.getRight());
     }
-    return brokerResponseNative;
+    BrokerResponseNative brokerResponse = new BrokerResponseNative();
+    brokerResponse.setResultTable(resultTable);
+    return brokerResponse;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 688c7bbf49..7db96f5224 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.query.selection;
 
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.PriorityQueue;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -58,22 +57,16 @@ import org.roaringbitmap.RoaringBitmap;
  */
 public class SelectionOperatorService {
   private final QueryContext _queryContext;
-  private final List<String> _selectionColumns;
   private final DataSchema _dataSchema;
+  private final int[] _columnIndices;
   private final int _offset;
   private final int _numRowsToKeep;
   private final PriorityQueue<Object[]> _rows;
 
-  /**
-   * Constructor for <code>SelectionOperatorService</code> with {@link 
DataSchema}. (Broker side)
-   *
-   * @param queryContext Selection order-by query
-   * @param dataSchema data schema.
-   */
-  public SelectionOperatorService(QueryContext queryContext, DataSchema 
dataSchema) {
+  public SelectionOperatorService(QueryContext queryContext, DataSchema 
dataSchema, int[] columnIndices) {
     _queryContext = queryContext;
-    _selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(queryContext, dataSchema);
     _dataSchema = dataSchema;
+    _columnIndices = columnIndices;
     // Select rows from offset to offset + limit.
     _offset = queryContext.getOffset();
     _numRowsToKeep = _offset + queryContext.getLimit();
@@ -83,25 +76,15 @@ public class SelectionOperatorService {
             _queryContext.isNullHandlingEnabled()).reversed());
   }
 
-  /**
-   * Get the selection results.
-   *
-   * @return selection results.
-   */
-  public PriorityQueue<Object[]> getRows() {
-    return _rows;
-  }
-
   /**
    * Reduces a collection of {@link DataTable}s to selection rows for 
selection queries with <code>ORDER BY</code>.
-   * (Broker side)
    * TODO: Do merge sort after releasing 0.13.0 when server side results are 
sorted
    *       Can also consider adding a data table metadata to indicate whether 
the server side results are sorted
    */
-  public void reduceWithOrdering(Collection<DataTable> dataTables, boolean 
nullHandlingEnabled) {
+  public void reduceWithOrdering(Collection<DataTable> dataTables) {
     for (DataTable dataTable : dataTables) {
       int numRows = dataTable.getNumberOfRows();
-      if (nullHandlingEnabled) {
+      if (_queryContext.isNullHandlingEnabled()) {
         RoaringBitmap[] nullBitmaps = new 
RoaringBitmap[dataTable.getDataSchema().size()];
         for (int colId = 0; colId < nullBitmaps.length; colId++) {
           nullBitmaps[colId] = dataTable.getNullRowIds(colId);
@@ -127,32 +110,24 @@ public class SelectionOperatorService {
   }
 
   /**
-   * Render the selection rows to a {@link ResultTable} object for selection 
queries with <code>ORDER BY</code>.
-   * (Broker side)
-   * <p>{@link ResultTable} object will be used to build the broker response.
-   * <p>Should be called after method "reduceWithOrdering()".
+   * Renders the selection rows to a {@link ResultTable} object for selection 
queries with <code>ORDER BY</code>.
    */
   public ResultTable renderResultTableWithOrdering() {
-    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
-    int numColumns = columnIndices.length;
-    DataSchema resultDataSchema = 
SelectionOperatorUtils.getSchemaForProjection(_dataSchema, columnIndices);
-
-    // Extract the result rows
-    LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
+    LinkedList<Object[]> resultRows = new LinkedList<>();
+    DataSchema.ColumnDataType[] columnDataTypes = 
_dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
     while (_rows.size() > _offset) {
       Object[] row = _rows.poll();
       assert row != null;
-      Object[] extractedRow = new Object[numColumns];
+      Object[] resultRow = new Object[numColumns];
       for (int i = 0; i < numColumns; i++) {
-        Object value = row[columnIndices[i]];
+        Object value = row[_columnIndices[i]];
         if (value != null) {
-          extractedRow[i] = 
resultDataSchema.getColumnDataType(i).convertAndFormat(value);
+          resultRow[i] = columnDataTypes[i].convertAndFormat(value);
         }
       }
-
-      rowsInSelectionResults.addFirst(extractedRow);
+      resultRows.addFirst(resultRow);
     }
-
-    return new ResultTable(resultDataSchema, rowsInSelectionResults);
+    return new ResultTable(_dataSchema, resultRows);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 923c744800..19b6c7f954 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.query.selection;
 
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -25,19 +27,17 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -167,26 +167,120 @@ public class SelectionOperatorUtils {
   }
 
   /**
-   * Constructs the final selection DataSchema based on the order of selection 
columns (data schema can have a
-   * different order, depending on order by clause)
-   * @param dataSchema data schema used for execution and ordering
-   * @param selectionColumns the selection order
-   * @return data schema for final results
+   * Returns the data schema and column indices of the final selection results 
based on the query and the data schema of
+   * the server response. See {@link #extractExpressions} for the column 
orders on the server side.
+   * NOTE: DO NOT rely on column name lookup across query context and data 
schema because the string representation of
+   *       expression can change, which will cause backward incompatibility.
    */
-  public static DataSchema getResultTableDataSchema(DataSchema dataSchema, 
List<String> selectionColumns) {
-    Map<String, ColumnDataType> columnNameToDataType = new HashMap<>();
-    String[] columnNames = dataSchema.getColumnNames();
-    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-    int numColumns = columnNames.length;
-    for (int i = 0; i < numColumns; i++) {
-      columnNameToDataType.put(columnNames[i], columnDataTypes[i]);
+  public static Pair<DataSchema, int[]> 
getResultTableDataSchemaAndColumnIndices(QueryContext queryContext,
+      DataSchema dataSchema) {
+    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
+    int numSelectExpressions = selectExpressions.size();
+    ColumnDataType[] columnDataTypesInDataSchema = 
dataSchema.getColumnDataTypes();
+    int numColumnsInDataSchema = columnDataTypesInDataSchema.length;
+
+    // No order-by expression
+    // NOTE: Order-by expressions are ignored for queries with LIMIT 0.
+    List<OrderByExpressionContext> orderByExpressions = 
queryContext.getOrderByExpressions();
+    if (orderByExpressions == null || queryContext.getLimit() == 0) {
+      // For 'SELECT *', use the server response data schema as the final 
results data schema.
+      if ((numSelectExpressions == 1 && 
selectExpressions.get(0).equals(IDENTIFIER_STAR))) {
+        int[] columnIndices = new int[numColumnsInDataSchema];
+        for (int i = 0; i < numColumnsInDataSchema; i++) {
+          columnIndices[i] = i;
+        }
+        return Pair.of(dataSchema, columnIndices);
+      }
+
+      // For select without duplicate columns, the order of the final 
selection columns is the same as the order of the
+      // columns in the data schema.
+      if (numSelectExpressions == numColumnsInDataSchema) {
+        String[] columnNames = new String[numSelectExpressions];
+        int[] columnIndices = new int[numSelectExpressions];
+        for (int i = 0; i < numSelectExpressions; i++) {
+          columnNames[i] = selectExpressions.get(i).toString();
+          columnIndices[i] = i;
+        }
+        return Pair.of(new DataSchema(columnNames, 
columnDataTypesInDataSchema), columnIndices);
+      }
+
+      // For select with duplicate columns, construct a map from expression to 
index with the same order as the data
+      // schema, then look up the selection expressions.
+      Object2IntOpenHashMap<ExpressionContext> expressionIndexMap = new 
Object2IntOpenHashMap<>(numColumnsInDataSchema);
+      for (ExpressionContext selectExpression : selectExpressions) {
+        expressionIndexMap.putIfAbsent(selectExpression, 
expressionIndexMap.size());
+      }
+      Preconditions.checkState(expressionIndexMap.size() == 
numColumnsInDataSchema,
+          "BUG: Expect same number of deduped columns in SELECT clause and in 
data schema, got %s before dedup and %s"
+              + " after dedup in SELECT clause, %s in data schema", 
numSelectExpressions, expressionIndexMap.size(),
+          numColumnsInDataSchema);
+      String[] columnNames = new String[numSelectExpressions];
+      ColumnDataType[] columnDataTypes = new 
ColumnDataType[numSelectExpressions];
+      int[] columnIndices = new int[numSelectExpressions];
+      for (int i = 0; i < numSelectExpressions; i++) {
+        ExpressionContext selectExpression = selectExpressions.get(i);
+        int columnIndex = expressionIndexMap.getInt(selectExpression);
+        columnNames[i] = selectExpression.toString();
+        columnDataTypes[i] = columnDataTypesInDataSchema[columnIndex];
+        columnIndices[i] = columnIndex;
+      }
+      return Pair.of(new DataSchema(columnNames, columnDataTypes), 
columnIndices);
+    }
+
+    // For 'SELECT *' with order-by, exclude transform functions from the 
returned columns and sort.
+    if (numSelectExpressions == 1 && 
selectExpressions.get(0).equals(IDENTIFIER_STAR)) {
+      String[] columnNamesInDataSchema = dataSchema.getColumnNames();
+      List<Integer> columnIndexList = new 
ArrayList<>(columnNamesInDataSchema.length);
+      for (int i = 0; i < columnNamesInDataSchema.length; i++) {
+        if (columnNamesInDataSchema[i].indexOf('(') == -1) {
+          columnIndexList.add(i);
+        }
+      }
+      columnIndexList.sort(Comparator.comparing(o -> 
columnNamesInDataSchema[o]));
+      int numColumns = columnIndexList.size();
+      String[] columnNames = new String[numColumns];
+      ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
+      int[] columnIndices = new int[numColumns];
+      for (int i = 0; i < numColumns; i++) {
+        int columnIndex = columnIndexList.get(i);
+        columnNames[i] = columnNamesInDataSchema[columnIndex];
+        columnDataTypes[i] = columnDataTypesInDataSchema[columnIndex];
+        columnIndices[i] = columnIndex;
+      }
+      return Pair.of(new DataSchema(columnNames, columnDataTypes), 
columnIndices);
+    }
+
+    // For other order-by queries, construct a map from expression to index 
with the same order as the data schema,
+    // then look up the selection expressions.
+    Object2IntOpenHashMap<ExpressionContext> expressionIndexMap = new 
Object2IntOpenHashMap<>(numColumnsInDataSchema);
+    // NOTE: Order-by expressions are already deduped in QueryContext.
+    for (OrderByExpressionContext orderByExpression : orderByExpressions) {
+      expressionIndexMap.put(orderByExpression.getExpression(), 
expressionIndexMap.size());
+    }
+    for (ExpressionContext selectExpression : selectExpressions) {
+      expressionIndexMap.putIfAbsent(selectExpression, 
expressionIndexMap.size());
     }
-    int numResultColumns = selectionColumns.size();
-    ColumnDataType[] finalColumnDataTypes = new 
ColumnDataType[numResultColumns];
-    for (int i = 0; i < numResultColumns; i++) {
-      finalColumnDataTypes[i] = 
columnNameToDataType.get(selectionColumns.get(i));
+    String[] columnNames = new String[numSelectExpressions];
+    ColumnDataType[] columnDataTypes = new 
ColumnDataType[numSelectExpressions];
+    int[] columnIndices = new int[numSelectExpressions];
+    if (expressionIndexMap.size() == numColumnsInDataSchema) {
+      for (int i = 0; i < numSelectExpressions; i++) {
+        ExpressionContext selectExpression = selectExpressions.get(i);
+        int columnIndex = expressionIndexMap.getInt(selectExpression);
+        columnNames[i] = selectExpression.toString();
+        columnDataTypes[i] = columnDataTypesInDataSchema[columnIndex];
+        columnIndices[i] = columnIndex;
+      }
+    } else {
+      // When all segments are pruned on the server side, the data schema will 
only contain the columns in the SELECT
+      // clause, and data type for all columns are set to STRING. See 
ResultBlocksUtils for details.
+      for (int i = 0; i < numSelectExpressions; i++) {
+        columnNames[i] = selectExpressions.get(i).toString();
+        columnDataTypes[i] = ColumnDataType.STRING;
+        columnIndices[i] = i;
+      }
     }
-    return new DataSchema(selectionColumns.toArray(new String[0]), 
finalColumnDataTypes);
+    return Pair.of(new DataSchema(columnNames, columnDataTypes), 
columnIndices);
   }
 
   /**
@@ -478,82 +572,26 @@ public class SelectionOperatorUtils {
   }
 
   /**
-   * Render the selection rows to a {@link ResultTable} object
-   * for selection queries without <code>ORDER BY</code>
-   * <p>{@link ResultTable} object will be used to set in the broker response.
-   * <p>Should be called after method "reduceWithoutOrdering()".
-   *
-   * @param rows selection rows.
-   * @param dataSchema data schema.
-   * @param selectionColumns selection columns.
-   * @return {@link ResultTable} object results.
+   * Renders the selection rows to a {@link ResultTable} object for selection 
queries without <code>ORDER BY</code>.
+   * (Broker side)
    */
   public static ResultTable renderResultTableWithoutOrdering(List<Object[]> 
rows, DataSchema dataSchema,
-      List<String> selectionColumns) {
+      int[] columnIndices) {
     int numRows = rows.size();
     List<Object[]> resultRows = new ArrayList<>(numRows);
-
-    DataSchema resultDataSchema = dataSchema;
-    Map<String, Integer> columnNameToIndexMap = null;
-    if (dataSchema.getColumnNames().length != selectionColumns.size()) {
-      // Create updated data schema since one column can be selected multiple 
times.
-      columnNameToIndexMap = new 
HashMap<>(HashUtil.getHashMapCapacity(dataSchema.getColumnNames().length));
-      String[] columnNames = dataSchema.getColumnNames();
-      ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-      for (int i = 0; i < columnNames.length; i++) {
-        columnNameToIndexMap.put(columnNames[i], i);
-      }
-
-      ColumnDataType[] newColumnDataTypes = new 
ColumnDataType[selectionColumns.size()];
-      for (int i = 0; i < newColumnDataTypes.length; i++) {
-        int index = columnNameToIndexMap.get(selectionColumns.get(i));
-        newColumnDataTypes[i] = columnDataTypes[index];
-      }
-
-      resultDataSchema = new DataSchema(selectionColumns.toArray(new 
String[0]), newColumnDataTypes);
-    }
-
-    int numColumns = resultDataSchema.getColumnNames().length;
-    ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
     for (Object[] row : rows) {
       Object[] resultRow = new Object[numColumns];
       for (int i = 0; i < numColumns; i++) {
-        int index = (columnNameToIndexMap != null) ? 
columnNameToIndexMap.get(selectionColumns.get(i)) : i;
-        Object value = row[index];
+        Object value = row[columnIndices[i]];
         if (value != null) {
-          resultRow[i] = resultColumnDataTypes[i].convertAndFormat(value);
+          resultRow[i] = columnDataTypes[i].convertAndFormat(value);
         }
       }
       resultRows.add(resultRow);
     }
-
-    return new ResultTable(resultDataSchema, resultRows);
-  }
-
-  /**
-   * Helper method to compute column indices from selection columns and the 
data schema for selection queries
-   * @param selectionColumns selection columns.
-   * @param dataSchema data schema.
-   * @return column indices
-   */
-  public static int[] getColumnIndices(List<String> selectionColumns, 
DataSchema dataSchema) {
-    String[] columnNames = dataSchema.getColumnNames();
-    Map<String, Integer> columnToIndexMap = getColumnToIndexMap(columnNames);
-    int numSelectionColumns = selectionColumns.size();
-    int[] columnIndices = new int[numSelectionColumns];
-    for (int i = 0; i < numSelectionColumns; i++) {
-      columnIndices[i] = columnToIndexMap.get(selectionColumns.get(i));
-    }
-    return columnIndices;
-  }
-
-  public static Map<String, Integer> getColumnToIndexMap(String[] columns) {
-    Map<String, Integer> columnToIndexMap = new HashMap<>();
-    int numColumns = columns.length;
-    for (int i = 0; i < numColumns; i++) {
-      columnToIndexMap.put(columns[i], i);
-    }
-    return columnToIndexMap;
+    return new ResultTable(dataSchema, resultRows);
   }
 
   /**
@@ -572,19 +610,4 @@ public class SelectionOperatorUtils {
       queue.offer(value);
     }
   }
-
-  public static DataSchema getSchemaForProjection(DataSchema dataSchema, int[] 
columnIndices) {
-    int numColumns = columnIndices.length;
-
-    String[] columnNames = dataSchema.getColumnNames();
-    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-    String[] resultColumnNames = new String[numColumns];
-    ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      int columnIndex = columnIndices[i];
-      resultColumnNames[i] = columnNames[columnIndex];
-      resultColumnDataTypes[i] = columnDataTypes[columnIndex];
-    }
-    return new DataSchema(resultColumnNames, resultColumnDataTypes);
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtilsTest.java
new file mode 100644
index 0000000000..d21d4f2779
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtilsTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ReducerDataSchemaUtilsTest {
+
+  @Test
+  public void testCanonicalizeDataSchemaForAggregation() {
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT SUM(col1 + col2) FROM 
testTable");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    DataSchema dataSchema = new DataSchema(new String[]{"sum(col1+col2)"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE});
+    DataSchema canonicalDataSchema =
+        
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(queryContext, 
dataSchema);
+    assertEquals(canonicalDataSchema,
+        new DataSchema(new String[]{"sum(plus(col1,col2))"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE}));
+
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT SUM(col1 
+ 1), MIN(col2 + 2) FROM testTable");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"sum(col1+1)", "min(col2+2)"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+    canonicalDataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(queryContext, 
dataSchema);
+    assertEquals(canonicalDataSchema, new DataSchema(new 
String[]{"sum(plus(col1,'1'))", "min(plus(col2,'2'))"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}));
+
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT MAX(col1 + 1) FILTER(WHERE col3 > 0) - MIN(col2 + 2) 
FILTER(WHERE col4 > 0) FROM testTable");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"max(col1+1)", "min(col2+2)"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+    canonicalDataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(queryContext, 
dataSchema);
+    assertEquals(canonicalDataSchema, new DataSchema(
+        new String[]{"max(plus(col1,'1')) FILTER(WHERE col3 > '0')", 
"min(plus(col2,'2')) FILTER(WHERE col4 > '0')"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}));
+  }
+
+  @Test
+  public void testCanonicalizeDataSchemaForGroupBy() {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT SUM(col1 + col2) FROM testTable GROUP BY col3 + col4 ORDER BY 
col3 + col4");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    DataSchema dataSchema = new DataSchema(new String[]{"add(col3+col4)", 
"sum(col1+col2)"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+    DataSchema canonicalDataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(queryContext, 
dataSchema);
+    assertEquals(canonicalDataSchema, new DataSchema(new 
String[]{"plus(col3,col4)", "sum(plus(col1,col2))"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}));
+
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT SUM(col1 + 1), MIN(col2 + 2), col4 FROM testTable GROUP BY 
col3, col4");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"col3", "col4", "sum(col1+1)", 
"min(col2+2)"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+    canonicalDataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(queryContext, 
dataSchema);
+    assertEquals(canonicalDataSchema,
+        new DataSchema(new String[]{"col3", "col4", "sum(plus(col1,'1'))", 
"min(plus(col2,'2'))"}, new ColumnDataType[]{
+            ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE
+        }));
+
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT col3 + col4, MAX(col1 + 1) FILTER(WHERE col3 > 0) - MIN(col2 + 
2) FILTER(WHERE col4 > 0) FROM "
+            + "testTable GROUP BY col3 + col4");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col3+col4)", "max(col1+1)", 
"min(col2+2)"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE});
+    canonicalDataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(queryContext, 
dataSchema);
+    assertEquals(canonicalDataSchema, new DataSchema(new String[]{
+        "plus(col3,col4)", "max(plus(col1,'1')) FILTER(WHERE col3 > '0')",
+        "min(plus(col2,'2')) FILTER" + "(WHERE col4 > '0')"
+    }, new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE}));
+  }
+
+  @Test
+  public void testCanonicalizeDataSchemaForDistinct() {
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT DISTINCT col1, col2 
+ col3 FROM testTable");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    DataSchema dataSchema = new DataSchema(new String[]{"col1", 
"add(col2+col3)"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.DOUBLE});
+    DataSchema canonicalDataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(queryContext, 
dataSchema);
+    assertEquals(canonicalDataSchema, new DataSchema(new String[]{"col1", 
"plus(col2,col3)"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.DOUBLE}));
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorUtilsTest.java
new file mode 100644
index 0000000000..196f4dd168
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorUtilsTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.selection;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SelectionOperatorUtilsTest {
+
+  @Test
+  public void testGetResultTableColumnIndices() {
+    // Select * without order-by
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+    DataSchema dataSchema = new DataSchema(new String[]{"col1", "col2", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+    });
+    Pair<DataSchema, int[]> pair =
+        
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+    }));
+    assertEquals(pair.getRight(), new int[]{0, 1, 2});
+
+    // Select * without order-by, all the segments are pruned on the server 
side
+    dataSchema = new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING});
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING}));
+
+    // Select * with order-by but LIMIT 0
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM 
testTable ORDER BY col1 LIMIT 0");
+    dataSchema = new DataSchema(new String[]{"col1", "col2", "col3"}, new 
ColumnDataType[]{
+        ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+    }));
+    assertEquals(pair.getRight(), new int[]{0, 1, 2});
+
+    // Select * with order-by but LIMIT 0, all the segments are pruned on the 
server side
+    dataSchema = new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING});
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING}));
+
+    // Select columns without order-by
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT col1 + 
1, col2 + 2 FROM testTable");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)"}, 
new ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')", 
"plus(col2,'2')"}, new ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+    }));
+    assertEquals(pair.getRight(), new int[]{0, 1});
+
+    // Select columns without order-by, all the segments are pruned on the 
server side
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)"}, 
new ColumnDataType[]{
+        ColumnDataType.STRING, ColumnDataType.STRING
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')", 
"plus(col2,'2')"}, new ColumnDataType[]{
+        ColumnDataType.STRING, ColumnDataType.STRING
+    }));
+
+    // Select duplicate columns without order-by
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT col1 + 
1, col2 + 2, col1 + 1 FROM testTable");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)"}, 
new ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(),
+        new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')", 
"plus(col1,'1')"}, new ColumnDataType[]{
+            ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+        }));
+    assertEquals(pair.getRight(), new int[]{0, 1, 0});
+
+    // Select duplicate columns without order-by, all the segments are pruned 
on the server side
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)", 
"add(col1+1)"}, new ColumnDataType[]{
+        ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(),
+        new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')", 
"plus(col1,'1')"}, new ColumnDataType[]{
+            ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+        }));
+
+    // Select * with order-by
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM 
testTable ORDER BY col3");
+    dataSchema = new DataSchema(new String[]{"col3", "col1", "col2"}, new 
ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.LONG
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+    }));
+    assertEquals(pair.getRight(), new int[]{1, 2, 0});
+
+    // Select * with order-by, all the segments are pruned on the server side
+    dataSchema = new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING});
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING}));
+
+    // Select * ordering on function
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM 
testTable ORDER BY col1 + col2");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+col2)", "col1", "col2", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.LONG, 
ColumnDataType.DOUBLE
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+    }));
+    assertEquals(pair.getRight(), new int[]{1, 2, 3});
+
+    // Select * ordering on function, all the segments are pruned on the 
server side
+    dataSchema = new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING});
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING}));
+
+    // Select * ordering on both column and function
+    queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM 
testTable ORDER BY col1 + col2, col2");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+col2)", "col2", "col1", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.LONG, ColumnDataType.INT, 
ColumnDataType.DOUBLE
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2", 
"col3"}, new ColumnDataType[]{
+        ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+    }));
+    assertEquals(pair.getRight(), new int[]{2, 1, 3});
+
+    // Select * ordering on both column and function, all the segments are 
pruned on the server side
+    dataSchema = new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING});
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new 
ColumnDataType[]{ColumnDataType.STRING}));
+
+    // Select columns with order-by
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT col1 + 1, col3, col2 + 2 FROM testTable ORDER BY col2 + 2, 
col4");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col2+2)", "col4", 
"add(col1+1)", "col3"}, new ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')", 
"col3", "plus(col2,'2')"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE}));
+    assertEquals(pair.getRight(), new int[]{2, 3, 0});
+
+    // Select columns with order-by, all the segments are pruned on the server 
side
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+1)", "col3", 
"add(col2+2)"}, new ColumnDataType[]{
+        ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')", 
"col3", "plus(col2,'2')"},
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING, 
ColumnDataType.STRING}));
+
+    // Select duplicate columns with order-by
+    queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT col1 + 1, col2 + 2, col1 + 1 FROM testTable ORDER BY col2 + 2, 
col4");
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col2+2)", "col4", 
"add(col1+1)"}, new ColumnDataType[]{
+        ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.DOUBLE
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(),
+        new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')", 
"plus(col1,'1')"}, new ColumnDataType[]{
+            ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+        }));
+    assertEquals(pair.getRight(), new int[]{2, 0, 2});
+
+    // Select duplicate columns with order-by, all the segments are pruned on 
the server side
+    // Intentionally make data schema not matching the string representation 
of the expression
+    dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)", 
"add(col1+1)"}, new ColumnDataType[]{
+        ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+    });
+    pair = 
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext, 
dataSchema);
+    assertEquals(pair.getLeft(),
+        new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')", 
"plus(col1,'1')"}, new ColumnDataType[]{
+            ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+        }));
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 0f8da45821..4c458b7979 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -1433,9 +1433,10 @@ public class DistinctQueriesTest extends BaseQueriesTest 
{
     {
       ResultTable resultTable = getBrokerResponse(queries[7]).getResultTable();
 
-      // Check data schema, where data type should be STRING for all columns
+      // Check data schema
+      // NOTE: Segment pruner is not wired up in QueriesTest, and the correct 
column data types should be returned.
       DataSchema expectedDataSchema = new DataSchema(new 
String[]{"floatColumn", "longMVColumn"},
-          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+          new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.LONG});
       assertEquals(resultTable.getDataSchema(), expectedDataSchema);
 
       // Check values, where no record should be returned
@@ -1589,9 +1590,10 @@ public class DistinctQueriesTest extends BaseQueriesTest 
{
     {
       ResultTable resultTable = 
getBrokerResponse(queries[13]).getResultTable();
 
-      // Check data schema, where data type should be STRING for all columns
+      // Check data schema
+      // NOTE: Segment pruner is not wired up in QueriesTest, and the correct 
column data types should be returned.
       DataSchema expectedDataSchema = new DataSchema(new 
String[]{"floatColumn", "rawLongMVColumn"},
-          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+          new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.LONG});
       assertEquals(resultTable.getDataSchema(), expectedDataSchema);
 
       // Check values, where no record should be returned
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index a81ceb8717..4e690e4041 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,6 +36,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -45,7 +47,7 @@ import 
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
@@ -281,13 +283,9 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
   /**
    * For selection, we need to check if the columns are in order. If not, we 
need to re-arrange the columns.
    */
-  @SuppressWarnings("ConstantConditions")
   private static TransferableBlock 
composeSelectTransferableBlock(SelectionResultsBlock resultsBlock,
       DataSchema desiredDataSchema) {
-    DataSchema resultSchema = resultsBlock.getDataSchema();
-    List<String> selectionColumns =
-        
SelectionOperatorUtils.getSelectionColumns(resultsBlock.getQueryContext(), 
resultSchema);
-    int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+    int[] columnIndices = getColumnIndices(resultsBlock);
     if (!inOrder(columnIndices)) {
       return composeColumnIndexedTransferableBlock(resultsBlock, 
desiredDataSchema, columnIndices);
     } else {
@@ -295,6 +293,25 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
     }
   }
 
+  private static int[] getColumnIndices(SelectionResultsBlock resultsBlock) {
+    DataSchema dataSchema = resultsBlock.getDataSchema();
+    assert dataSchema != null;
+    String[] columnNames = dataSchema.getColumnNames();
+    Object2IntOpenHashMap<String> columnIndexMap = new 
Object2IntOpenHashMap<>(columnNames.length);
+    for (int i = 0; i < columnNames.length; i++) {
+      columnIndexMap.put(columnNames[i], i);
+    }
+    QueryContext queryContext = resultsBlock.getQueryContext();
+    assert queryContext != null;
+    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
+    int numSelectExpressions = selectExpressions.size();
+    int[] columnIndices = new int[numSelectExpressions];
+    for (int i = 0; i < numSelectExpressions; i++) {
+      columnIndices[i] = 
columnIndexMap.getInt(selectExpressions.get(i).toString());
+    }
+    return columnIndices;
+  }
+
   private static boolean inOrder(int[] columnIndices) {
     for (int i = 0; i < columnIndices.length; i++) {
       if (columnIndices[i] != i) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to