This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f36623c56 Enhance broker reduce to handle different column names from 
server response (#10454)
3f36623c56 is described below

commit 3f36623c563b5ad495a502a00f627b44bfc0861d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Mar 24 16:07:58 2023 -0700

    Enhance broker reduce to handle different column names from server response 
(#10454)
---
 .../org/apache/pinot/common/utils/DataSchema.java  |  61 ---------
 .../apache/pinot/common/utils/DataSchemaTest.java  |  31 +----
 .../query/reduce/SelectionDataTableReducer.java    |  92 ++++++--------
 .../query/selection/SelectionOperatorService.java  |   7 +-
 .../query/selection/SelectionOperatorUtils.java    | 138 ++-------------------
 .../core/query/utils/OrderByComparatorFactory.java | 101 +++++----------
 .../selection/SelectionOperatorServiceTest.java    |  95 ++++++--------
 7 files changed, 120 insertions(+), 405 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 4854f5bf22..a044f5dd60 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -101,67 +101,6 @@ public class DataSchema {
     return storedColumnDataTypes;
   }
 
-  /**
-   * Returns whether the given data schema is type compatible with this one.
-   * <ul>
-   *   <li>All numbers are type compatible with each other</li>
-   *   <li>Numbers are not type compatible with string</li>
-   *   <li>Non-array types are not type compatible with array types</li>
-   * </ul>
-   *
-   * @param anotherDataSchema Data schema to compare with
-   * @return Whether the two data schemas are type compatible
-   */
-  public boolean isTypeCompatibleWith(DataSchema anotherDataSchema) {
-    if (!Arrays.equals(_columnNames, anotherDataSchema._columnNames)) {
-      return false;
-    }
-    int numColumns = _columnNames.length;
-    for (int i = 0; i < numColumns; i++) {
-      if 
(!_columnDataTypes[i].isCompatible(anotherDataSchema._columnDataTypes[i])) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Upgrade the current data schema to cover the column data types in the 
given data schema.
-   * <p>Data type <code>LONG</code> can cover <code>INT</code> and 
<code>LONG</code>.
-   * <p>Data type <code>DOUBLE</code> can cover all numbers, but with 
potential precision loss when use it to cover
-   * <code>LONG</code>.
-   * <p>NOTE: The given data schema should be type compatible with this one.
-   *
-   * @param originalSchema the original Data schema
-   * @param anotherDataSchema Data schema to cover
-   */
-  public static DataSchema upgradeToCover(DataSchema originalSchema, 
DataSchema anotherDataSchema) {
-    int numColumns = originalSchema._columnDataTypes.length;
-    ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      ColumnDataType thisColumnDataType = originalSchema._columnDataTypes[i];
-      ColumnDataType thatColumnDataType = 
anotherDataSchema._columnDataTypes[i];
-      if (thisColumnDataType != thatColumnDataType) {
-        if (thisColumnDataType.isArray()) {
-          if (thisColumnDataType.isWholeNumberArray() && 
thatColumnDataType.isWholeNumberArray()) {
-            columnDataTypes[i] = ColumnDataType.LONG_ARRAY;
-          } else {
-            columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY;
-          }
-        } else {
-          if (thisColumnDataType.isWholeNumber() && 
thatColumnDataType.isWholeNumber()) {
-            columnDataTypes[i] = ColumnDataType.LONG;
-          } else {
-            columnDataTypes[i] = ColumnDataType.DOUBLE;
-          }
-        }
-      } else {
-        columnDataTypes[i] = originalSchema._columnDataTypes[i];
-      }
-    }
-    return new DataSchema(originalSchema._columnNames, columnDataTypes);
-  }
-
   public byte[] toBytes()
       throws IOException {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index 71b062452b..e90842d925 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -34,14 +34,8 @@ public class DataSchemaTest {
       "string_array", "boolean_array", "timestamp_array", "bytes_array"
   };
   private static final int NUM_COLUMNS = COLUMN_NAMES.length;
-  private static final DataSchema.ColumnDataType[] COLUMN_DATA_TYPES =
-      {INT, LONG, FLOAT, DOUBLE, STRING, OBJECT, INT_ARRAY, LONG_ARRAY, 
FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
-      BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
-  private static final DataSchema.ColumnDataType[] 
COMPATIBLE_COLUMN_DATA_TYPES =
-      {LONG, FLOAT, DOUBLE, INT, STRING, OBJECT, LONG_ARRAY, FLOAT_ARRAY, 
DOUBLE_ARRAY, INT_ARRAY, STRING_ARRAY,
-       BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
-  private static final DataSchema.ColumnDataType[] UPGRADED_COLUMN_DATA_TYPES 
= {
-      LONG, DOUBLE, DOUBLE, DOUBLE, STRING, OBJECT, LONG_ARRAY, DOUBLE_ARRAY, 
DOUBLE_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
+  private static final DataSchema.ColumnDataType[] COLUMN_DATA_TYPES = {
+      INT, LONG, FLOAT, DOUBLE, STRING, OBJECT, INT_ARRAY, LONG_ARRAY, 
FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
       BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY
   };
 
@@ -72,22 +66,6 @@ public class DataSchemaTest {
     Assert.assertEquals(dataSchema.hashCode(), 
dataSchemaAfterSerDe.hashCode());
   }
 
-  @Test
-  public void testTypeCompatible() {
-    DataSchema dataSchema = new DataSchema(COLUMN_NAMES, COLUMN_DATA_TYPES);
-    DataSchema compatibleDataSchema = new DataSchema(COLUMN_NAMES, 
COMPATIBLE_COLUMN_DATA_TYPES);
-    Assert.assertTrue(dataSchema.isTypeCompatibleWith(compatibleDataSchema));
-
-    String[] anotherColumnNames = new String[NUM_COLUMNS];
-    Arrays.fill(anotherColumnNames, "foo");
-    DataSchema incompatibleDataSchema = new DataSchema(anotherColumnNames, 
COLUMN_DATA_TYPES);
-    
Assert.assertFalse(dataSchema.isTypeCompatibleWith(incompatibleDataSchema));
-
-    dataSchema = DataSchema.upgradeToCover(dataSchema, compatibleDataSchema);
-    DataSchema upgradedDataSchema = new DataSchema(COLUMN_NAMES, 
UPGRADED_COLUMN_DATA_TYPES);
-    Assert.assertEquals(dataSchema, upgradedDataSchema);
-  }
-
   @Test
   public void testSuperTypeCheckers() {
     List<DataSchema.ColumnDataType> numberTypeToTest = Arrays.asList(INT, 
LONG, FLOAT, DOUBLE);
@@ -207,8 +185,9 @@ public class DataSchemaTest {
       Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
     }
 
-    for (DataSchema.ColumnDataType columnDataType : new 
DataSchema.ColumnDataType[]{STRING_ARRAY, BOOLEAN_ARRAY,
-        TIMESTAMP_ARRAY, BYTES_ARRAY}) {
+    for (DataSchema.ColumnDataType columnDataType : new 
DataSchema.ColumnDataType[]{
+        STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY
+    }) {
       Assert.assertFalse(columnDataType.isNumber());
       Assert.assertFalse(columnDataType.isWholeNumber());
       Assert.assertTrue(columnDataType.isArray());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index 4d30f54935..a21684acdf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.reduce;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -64,63 +65,50 @@ public class SelectionDataTableReducer implements 
DataTableReducer {
       List<String> selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
       DataSchema selectionDataSchema = 
SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns);
       brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, 
Collections.emptyList()));
-    } else {
-      // For data table map with more than one data tables, remove conflicting 
data tables
-      if (dataTableMap.size() > 1) {
-        List<ServerRoutingInstance> droppedServers = 
removeConflictingResponses(dataSchema, dataTableMap);
-        if (!droppedServers.isEmpty()) {
-          String errorMessage = 
QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + 
tableName
-              + " from servers: " + droppedServers + " got dropped due to data 
schema inconsistency.";
-          LOGGER.warn(errorMessage);
-          if (brokerMetrics != null) {
-            
brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
-                BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
-          }
-          brokerResponseNative.addToExceptions(
-              new 
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, 
errorMessage));
+      return;
+    }
+
+    // For data table map with more than one data tables, remove conflicting 
data tables
+    if (dataTableMap.size() > 1) {
+      DataSchema.ColumnDataType[] columnDataTypes = 
dataSchema.getColumnDataTypes();
+      List<ServerRoutingInstance> droppedServers = new ArrayList<>();
+      Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = 
dataTableMap.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
+        DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
+        assert dataSchemaToCompare != null;
+        if (!Arrays.equals(columnDataTypes, 
dataSchemaToCompare.getColumnDataTypes())) {
+          droppedServers.add(entry.getKey());
+          iterator.remove();
         }
       }
-
-      int limit = _queryContext.getLimit();
-      if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
-        // Selection order-by
-        SelectionOperatorService selectionService = new 
SelectionOperatorService(_queryContext, dataSchema);
-        selectionService.reduceWithOrdering(dataTableMap.values(), 
_queryContext.isNullHandlingEnabled());
-        
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
-      } else {
-        // Selection only
-        List<String> selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
-        List<Object[]> reducedRows = 
SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
-            _queryContext.isNullHandlingEnabled());
-        brokerResponseNative.setResultTable(
-            
SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, 
dataSchema, selectionColumns));
+      if (!droppedServers.isEmpty()) {
+        String errorMessage =
+            QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses 
for table: " + tableName + " from servers: "
+                + droppedServers + " got dropped due to data schema 
inconsistency.";
+        LOGGER.warn(errorMessage);
+        if (brokerMetrics != null) {
+          
brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
+              BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
+        }
+        brokerResponseNative.addToExceptions(
+            new 
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, 
errorMessage));
       }
     }
-  }
 
-  /**
-   * Given a data schema, remove data tables that are not compatible with this 
data schema.
-   * <p>Upgrade the data schema passed in to cover all remaining data schemas.
-   *
-   * @param dataSchema data schema.
-   * @param dataTableMap map from server to data table.
-   * @return list of server names where the data table got removed.
-   */
-  private List<ServerRoutingInstance> removeConflictingResponses(DataSchema 
dataSchema,
-      Map<ServerRoutingInstance, DataTable> dataTableMap) {
-    List<ServerRoutingInstance> droppedServers = new ArrayList<>();
-    Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = 
dataTableMap.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
-      DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
-      assert dataSchemaToCompare != null;
-      if (!dataSchema.isTypeCompatibleWith(dataSchemaToCompare)) {
-        droppedServers.add(entry.getKey());
-        iterator.remove();
-      } else {
-        dataSchema = DataSchema.upgradeToCover(dataSchema, 
dataSchemaToCompare);
-      }
+    int limit = _queryContext.getLimit();
+    if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
+      // Selection order-by
+      SelectionOperatorService selectionService = new 
SelectionOperatorService(_queryContext, dataSchema);
+      selectionService.reduceWithOrdering(dataTableMap.values(), 
_queryContext.isNullHandlingEnabled());
+      
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
+    } else {
+      // Selection only
+      List<String> selectionColumns = 
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
+      List<Object[]> reducedRows = 
SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
+          _queryContext.isNullHandlingEnabled());
+      brokerResponseNative.setResultTable(
+          SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, 
dataSchema, selectionColumns));
     }
-    return droppedServers;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 419a4df883..688c7bbf49 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
 import org.apache.pinot.spi.trace.Tracing;
 import org.roaringbitmap.RoaringBitmap;
 
@@ -77,11 +78,9 @@ public class SelectionOperatorService {
     _offset = queryContext.getOffset();
     _numRowsToKeep = _offset + queryContext.getLimit();
     assert queryContext.getOrderByExpressions() != null;
-    // TODO: Do not use type compatible comparator for performance since we 
don't support different data schema on
-    //       server side combine
     _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
-        
SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(),
 _dataSchema,
-            _queryContext.isNullHandlingEnabled()));
+        
OrderByComparatorFactory.getComparator(queryContext.getOrderByExpressions(),
+            _queryContext.isNullHandlingEnabled()).reversed());
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index dcb3cec6e6..5dc0bb5fbc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -43,7 +43,6 @@ import 
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.trace.Tracing;
-import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
@@ -260,19 +259,6 @@ public class SelectionOperatorUtils {
 
   /**
    * Build a {@link DataTable} from a {@link Collection} of selection rows 
with {@link DataSchema}. (Server side)
-   * <p>The passed in data schema stored the column data type that can cover 
all actual data types for that column.
-   * <p>The actual data types for each column in rows can be different but 
must be compatible with each other.
-   * <p>Before write each row into the data table, first convert it to match 
the data types in data schema.
-   *
-   * TODO: Type compatibility is not supported for selection order-by because 
all segments on the same server shared the
-   *       same comparator. Another solution is to always use the table schema 
to execute the query (preferable because
-   *       type compatible checks are expensive).
-   *
-   * @param rows {@link Collection} of selection rows.
-   * @param dataSchema data schema.
-   * @param nullHandlingEnabled whether null handling is enabled.
-   * @return data table.
-   * @throws IOException
    */
   public static DataTable getDataTableFromRows(Collection<Object[]> rows, 
DataSchema dataSchema,
       boolean nullHandlingEnabled)
@@ -309,22 +295,22 @@ public class SelectionOperatorUtils {
         switch (storedColumnDataTypes[i]) {
           // Single-value column
           case INT:
-            dataTableBuilder.setColumn(i, ((Number) columnValue).intValue());
+            dataTableBuilder.setColumn(i, (int) columnValue);
             break;
           case LONG:
-            dataTableBuilder.setColumn(i, ((Number) columnValue).longValue());
+            dataTableBuilder.setColumn(i, (long) columnValue);
             break;
           case FLOAT:
-            dataTableBuilder.setColumn(i, ((Number) columnValue).floatValue());
+            dataTableBuilder.setColumn(i, (float) columnValue);
             break;
           case DOUBLE:
-            dataTableBuilder.setColumn(i, ((Number) 
columnValue).doubleValue());
+            dataTableBuilder.setColumn(i, (double) columnValue);
             break;
           case BIG_DECIMAL:
             dataTableBuilder.setColumn(i, (BigDecimal) columnValue);
             break;
           case STRING:
-            dataTableBuilder.setColumn(i, ((String) columnValue));
+            dataTableBuilder.setColumn(i, (String) columnValue);
             break;
           case BYTES:
             dataTableBuilder.setColumn(i, (ByteArray) columnValue);
@@ -335,43 +321,13 @@ public class SelectionOperatorUtils {
             dataTableBuilder.setColumn(i, (int[]) columnValue);
             break;
           case LONG_ARRAY:
-            // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
-            if (columnValue instanceof int[]) {
-              int[] ints = (int[]) columnValue;
-              int length = ints.length;
-              long[] longs = new long[length];
-              ArrayCopyUtils.copy(ints, longs, length);
-              dataTableBuilder.setColumn(i, longs);
-            } else {
-              dataTableBuilder.setColumn(i, (long[]) columnValue);
-            }
+            dataTableBuilder.setColumn(i, (long[]) columnValue);
             break;
           case FLOAT_ARRAY:
             dataTableBuilder.setColumn(i, (float[]) columnValue);
             break;
           case DOUBLE_ARRAY:
-            // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and 
DOUBLE_ARRAY
-            if (columnValue instanceof int[]) {
-              int[] ints = (int[]) columnValue;
-              int length = ints.length;
-              double[] doubles = new double[length];
-              ArrayCopyUtils.copy(ints, doubles, length);
-              dataTableBuilder.setColumn(i, doubles);
-            } else if (columnValue instanceof long[]) {
-              long[] longs = (long[]) columnValue;
-              int length = longs.length;
-              double[] doubles = new double[length];
-              ArrayCopyUtils.copy(longs, doubles, length);
-              dataTableBuilder.setColumn(i, doubles);
-            } else if (columnValue instanceof float[]) {
-              float[] floats = (float[]) columnValue;
-              int length = floats.length;
-              double[] doubles = new double[length];
-              ArrayCopyUtils.copy(floats, doubles, length);
-              dataTableBuilder.setColumn(i, doubles);
-            } else {
-              dataTableBuilder.setColumn(i, (double[]) columnValue);
-            }
+            dataTableBuilder.setColumn(i, (double[]) columnValue);
             break;
           case STRING_ARRAY:
             dataTableBuilder.setColumn(i, (String[]) columnValue);
@@ -593,86 +549,6 @@ public class SelectionOperatorUtils {
     return columnToIndexMap;
   }
 
-  /**
-   * Helper method to get the type-compatible {@link Comparator} for selection 
rows. (Inter segment)
-   * <p>Type-compatible comparator allows compatible types to compare with 
each other.
-   *
-   * @return flexible {@link Comparator} for selection rows.
-   */
-  public static Comparator<Object[]> 
getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions,
-      DataSchema dataSchema, boolean isNullHandlingEnabled) {
-    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-
-    // Compare all single-value columns
-    int numOrderByExpressions = orderByExpressions.size();
-    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
-    for (int i = 0; i < numOrderByExpressions; i++) {
-      if (!columnDataTypes[i].isArray()) {
-        valueIndexList.add(i);
-      }
-    }
-
-    int numValuesToCompare = valueIndexList.size();
-    int[] valueIndices = new int[numValuesToCompare];
-    boolean[] useDoubleComparison = new boolean[numValuesToCompare];
-    // Use multiplier -1 or 1 to control ascending/descending order
-    int[] multipliers = new int[numValuesToCompare];
-    for (int i = 0; i < numValuesToCompare; i++) {
-      int valueIndex = valueIndexList.get(i);
-      valueIndices[i] = valueIndex;
-      if (columnDataTypes[valueIndex].isNumber()) {
-        useDoubleComparison[i] = true;
-      }
-      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
-    }
-
-    if (isNullHandlingEnabled) {
-      return (o1, o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-          Object v1 = o1[index];
-          Object v2 = o2[index];
-          if (v1 == null) {
-            // The default null ordering is: 'NULLS LAST'.
-            return v2 == null ? 0 : -multipliers[i];
-          } else if (v2 == null) {
-            return multipliers[i];
-          }
-          int result;
-          if (useDoubleComparison[i]) {
-            result = Double.compare(((Number) v1).doubleValue(), ((Number) 
v2).doubleValue());
-          } else {
-            //noinspection unchecked
-            result = ((Comparable) v1).compareTo(v2);
-          }
-          if (result != 0) {
-            return result * multipliers[i];
-          }
-        }
-        return 0;
-      };
-    } else {
-      return (o1, o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-          Object v1 = o1[index];
-          Object v2 = o2[index];
-          int result;
-          if (useDoubleComparison[i]) {
-            result = Double.compare(((Number) v1).doubleValue(), ((Number) 
v2).doubleValue());
-          } else {
-            //noinspection unchecked
-            result = ((Comparable) v1).compareTo(v2);
-          }
-          if (result != 0) {
-            return result * multipliers[i];
-          }
-        }
-        return 0;
-      };
-    }
-  }
-
   /**
    * Helper method to add a value to a {@link PriorityQueue}.
    *
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
index cbf978e882..f7225a2197 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
@@ -18,16 +18,11 @@
  */
 package org.apache.pinot.core.query.utils;
 
-import com.google.common.base.Preconditions;
-import java.math.BigDecimal;
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.operator.ColumnContext;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
@@ -44,103 +39,67 @@ public class OrderByComparatorFactory {
 
   public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext> orderByExpressions,
       ColumnContext[] orderByColumnContexts, boolean nullHandlingEnabled, int 
from, int to) {
-    Preconditions.checkArgument(to <= orderByExpressions.size(),
-        "Trying to access %sth position of orderByExpressions with size %s", 
to, orderByExpressions.size());
-    Preconditions.checkArgument(to <= orderByColumnContexts.length,
-        "Trying to access %sth position of orderByExpressionMetadata with size 
%s", to, orderByColumnContexts.length);
-    Preconditions.checkArgument(from < to, "FROM (%s) must be lower than TO 
(%s)", from, to);
+    assert 0 <= from && from < to && to <= orderByExpressions.size();
 
-    // Compare all single-value columns
-    int numOrderByExpressions = to - from;
-    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+    // Check if all expressions are single-valued
     for (int i = from; i < to; i++) {
-      if (orderByColumnContexts[i].isSingleValue()) {
-        valueIndexList.add(i);
-      } else {
-        // MV columns should not be part of the selection order by only list
+      if (!orderByColumnContexts[i].isSingleValue()) {
+        // MV columns should not be part of the selection order-by list
         throw new BadQueryRequestException(
             String.format("MV expression: %s should not be included in the 
ORDER-BY clause",
                 orderByExpressions.get(i)));
       }
     }
 
-    int numValuesToCompare = valueIndexList.size();
-    int[] valueIndices = new int[numValuesToCompare];
-    FieldSpec.DataType[] storedTypes = new 
FieldSpec.DataType[numValuesToCompare];
+    return getComparator(orderByExpressions, nullHandlingEnabled, from, to);
+  }
+
+  public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext> orderByExpressions,
+      boolean nullHandlingEnabled) {
+    return getComparator(orderByExpressions, nullHandlingEnabled, 0, 
orderByExpressions.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext> orderByExpressions,
+      boolean nullHandlingEnabled, int from, int to) {
+    assert 0 <= from && from < to && to <= orderByExpressions.size();
+
     // Use multiplier -1 or 1 to control ascending/descending order
-    int[] multipliers = new int[numValuesToCompare];
-    for (int i = 0; i < numValuesToCompare; i++) {
-      int valueIndex = valueIndexList.get(i);
-      valueIndices[i] = valueIndex;
-      storedTypes[i] = 
orderByColumnContexts[valueIndex].getDataType().getStoredType();
-      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? 1 : -1;
+    int[] multipliers = new int[to];
+    for (int i = from; i < to; i++) {
+      multipliers[i] = orderByExpressions.get(i).isAsc() ? 1 : -1;
     }
 
     if (nullHandlingEnabled) {
       return (Object[] o1, Object[] o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-          // TODO: Evaluate the performance of casting to Comparable and avoid 
the switch
-          Object v1 = o1[index];
-          Object v2 = o2[index];
+        for (int i = from; i < to; i++) {
+          Comparable v1 = (Comparable) o1[i];
+          Comparable v2 = (Comparable) o2[i];
           if (v1 == null) {
             // The default null ordering is: 'NULLS LAST', regardless of the 
ordering direction.
             return v2 == null ? 0 : -multipliers[i];
           } else if (v2 == null) {
             return multipliers[i];
           }
-          int result = compareCols(v1, v2, storedTypes[i], multipliers[i]);
+          int result = v1.compareTo(v2);
           if (result != 0) {
-            return result;
+            return result * multipliers[i];
           }
         }
         return 0;
       };
     } else {
       return (Object[] o1, Object[] o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-          // TODO: Evaluate the performance of casting to Comparable and avoid 
the switch
-          int result = compareCols(o1[index], o2[index], storedTypes[i], 
multipliers[i]);
+        for (int i = from; i < to; i++) {
+          Comparable v1 = (Comparable) o1[i];
+          Comparable v2 = (Comparable) o2[i];
+          int result = v1.compareTo(v2);
           if (result != 0) {
-            return result;
+            return result * multipliers[i];
           }
         }
         return 0;
       };
     }
   }
-
-  private static int compareCols(Object v1, Object v2, FieldSpec.DataType 
type, int multiplier) {
-
-    // TODO: Evaluate the performance of casting to Comparable and avoid the 
switch
-    int result;
-    switch (type) {
-      case INT:
-        result = ((Integer) v1).compareTo((Integer) v2);
-        break;
-      case LONG:
-        result = ((Long) v1).compareTo((Long) v2);
-        break;
-      case FLOAT:
-        result = ((Float) v1).compareTo((Float) v2);
-        break;
-      case DOUBLE:
-        result = ((Double) v1).compareTo((Double) v2);
-        break;
-      case BIG_DECIMAL:
-        result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
-        break;
-      case STRING:
-        result = ((String) v1).compareTo((String) v2);
-        break;
-      case BYTES:
-        result = ((ByteArray) v1).compareTo((ByteArray) v2);
-        break;
-      // NOTE: Multi-value columns are not comparable, so we should not reach 
here
-      default:
-        throw new IllegalStateException();
-    }
-    return result * multiplier;
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
index cdae8a37e6..7d44c6b65b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.selection;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,9 +29,11 @@ import java.util.List;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.testng.annotations.BeforeClass;
@@ -50,45 +53,30 @@ import static org.testng.Assert.assertTrue;
  */
 public class SelectionOperatorServiceTest {
   private final String[] _columnNames = {
-      "int", "long", "float", "double", "string", "int_array", "long_array", 
"float_array", "double_array",
-      "string_array", "bytes"
+      "int", "long", "float", "double", "big_decimal", "string", "bytes", 
"int_array", "long_array", "float_array",
+      "double_array", "string_array"
   };
-  private final DataSchema.ColumnDataType[] _columnDataTypes = {
-      DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, 
DataSchema.ColumnDataType.FLOAT,
-      DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT_ARRAY,
-      DataSchema.ColumnDataType.LONG_ARRAY, 
DataSchema.ColumnDataType.FLOAT_ARRAY,
-      DataSchema.ColumnDataType.DOUBLE_ARRAY, 
DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.BYTES
+  private final ColumnDataType[] _columnDataTypes = {
+      ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, 
ColumnDataType.DOUBLE,
+      ColumnDataType.BIG_DECIMAL, ColumnDataType.STRING, ColumnDataType.BYTES, 
ColumnDataType.INT_ARRAY,
+      ColumnDataType.LONG_ARRAY, ColumnDataType.FLOAT_ARRAY, 
ColumnDataType.DOUBLE_ARRAY, ColumnDataType.STRING_ARRAY
   };
   private final DataSchema _dataSchema = new DataSchema(_columnNames, 
_columnDataTypes);
-  private final DataSchema.ColumnDataType[] _compatibleColumnDataTypes = {
-      DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.FLOAT, 
DataSchema.ColumnDataType.DOUBLE,
-      DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.LONG_ARRAY,
-      DataSchema.ColumnDataType.FLOAT_ARRAY, 
DataSchema.ColumnDataType.DOUBLE_ARRAY,
-      DataSchema.ColumnDataType.INT_ARRAY, 
DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.BYTES
-  };
-  private final DataSchema _compatibleDataSchema = new 
DataSchema(_columnNames, _compatibleColumnDataTypes);
-  private final DataSchema.ColumnDataType[] _upgradedColumnDataTypes = new 
DataSchema.ColumnDataType[]{
-      DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE,
-      DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.LONG_ARRAY,
-      DataSchema.ColumnDataType.DOUBLE_ARRAY, 
DataSchema.ColumnDataType.DOUBLE_ARRAY,
-      DataSchema.ColumnDataType.DOUBLE_ARRAY, 
DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.BYTES
-  };
-  private final DataSchema _upgradedDataSchema = new DataSchema(_columnNames, 
_upgradedColumnDataTypes);
   private final Object[] _row1 = {
-      0, 1L, 2.0F, 3.0, "4", new int[]{5}, new long[]{6L}, new float[]{7.0F}, 
new double[]{8.0}, new String[]{"9"},
-      BytesUtils.toByteArray("1020")
+      0, 1L, 2.0F, 3.0, new BigDecimal(4), "5", BytesUtils.toByteArray(
+      "0606"), new int[]{7}, new long[]{8L}, new float[]{9.0F}, new 
double[]{10.0}, new String[]{"11"}
   };
   private final Object[] _row2 = {
-      10, 11L, 12.0F, 13.0, "14", new int[]{15}, new long[]{16L}, new 
float[]{17.0F}, new double[]{18.0},
-      new String[]{"19"}, BytesUtils.toByteArray("3040")
+      10, 11L, 12.0F, 13.0, new BigDecimal(14), "15", BytesUtils.toByteArray(
+      "1616"), new int[]{17}, new long[]{18L}, new float[]{19.0F}, new 
double[]{20.0}, new String[]{"21"}
   };
-  private final Object[] _compatibleRow1 = {
-      1L, 2.0F, 3.0, 4, "5", new long[]{6L}, new float[]{7.0F}, new 
double[]{8.0}, new int[]{9}, new String[]{"10"},
-      BytesUtils.toByteArray("5060")
+  private final Object[] _row3 = {
+      1, 2L, 3.0F, 4.0, new BigDecimal(5), "6", BytesUtils.toByteArray(
+      "0707"), new int[]{8}, new long[]{9L}, new float[]{10.0F}, new 
double[]{11.0}, new String[]{"12"}
   };
-  private final Object[] _compatibleRow2 = {
-      11L, 12.0F, 13.0, 14, "15", new long[]{16L}, new float[]{17.0F}, new 
double[]{18.0}, new int[]{19},
-      new String[]{"20"}, BytesUtils.toByteArray("7000")
+  private final Object[] _row4 = {
+      11, 12L, 13.0F, 14.0, new BigDecimal(15), "16", BytesUtils.toByteArray(
+      "1717"), new int[]{18}, new long[]{19L}, new float[]{20.0F}, new 
double[]{21.0}, new String[]{"22"}
   };
   private QueryContext _queryContext;
 
@@ -169,63 +157,50 @@ public class SelectionOperatorServiceTest {
   }
 
   @Test
-  public void testCompatibleRowsMergeWithoutOrdering() {
+  public void testRowsMergeWithoutOrdering() {
     List<Object[]> mergedRows = new ArrayList<>(2);
     mergedRows.add(_row1);
     mergedRows.add(_row2);
     SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, 
mergedRows);
     List<Object[]> rowsToMerge = new ArrayList<>(2);
-    rowsToMerge.add(_compatibleRow1);
-    rowsToMerge.add(_compatibleRow2);
-    SelectionResultsBlock blockToMerge = new 
SelectionResultsBlock(_compatibleDataSchema, rowsToMerge);
+    rowsToMerge.add(_row3);
+    rowsToMerge.add(_row4);
+    SelectionResultsBlock blockToMerge = new 
SelectionResultsBlock(_dataSchema, rowsToMerge);
     SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, 3);
     assertEquals(mergedRows.size(), 3);
     assertSame(mergedRows.get(0), _row1);
     assertSame(mergedRows.get(1), _row2);
-    assertSame(mergedRows.get(2), _compatibleRow1);
+    assertSame(mergedRows.get(2), _row3);
   }
 
   @Test
-  public void testCompatibleRowsMergeWithOrdering() {
+  public void testRowsMergeWithOrdering() {
     assertNotNull(_queryContext.getOrderByExpressions());
     Comparator<Object[]> comparator =
-        
SelectionOperatorUtils.getTypeCompatibleComparator(_queryContext.getOrderByExpressions(),
 _dataSchema,
-            _queryContext.isNullHandlingEnabled()).reversed();
+        
OrderByComparatorFactory.getComparator(_queryContext.getOrderByExpressions(), 
false);
     int maxNumRows = _queryContext.getOffset() + _queryContext.getLimit();
     SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, 
Collections.emptyList(), comparator);
     List<Object[]> rowsToMerge1 = Arrays.asList(_row2, _row1);
     SelectionResultsBlock blockToMerge1 = new 
SelectionResultsBlock(_dataSchema, rowsToMerge1, comparator);
     SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge1, 
maxNumRows);
-    List<Object[]> rowsToMerge2 = Arrays.asList(_compatibleRow2, 
_compatibleRow1);
-    SelectionResultsBlock blockToMerge2 = new 
SelectionResultsBlock(_compatibleDataSchema, rowsToMerge2, comparator);
+    List<Object[]> rowsToMerge2 = Arrays.asList(_row4, _row3);
+    SelectionResultsBlock blockToMerge2 = new 
SelectionResultsBlock(_dataSchema, rowsToMerge2, comparator);
     SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge2, 
maxNumRows);
     List<Object[]> mergedRows = mergedBlock.getRows();
     assertEquals(mergedRows.size(), 3);
-    assertSame(mergedRows.get(0), _compatibleRow2);
+    assertSame(mergedRows.get(0), _row4);
     assertSame(mergedRows.get(1), _row2);
-    assertSame(mergedRows.get(2), _compatibleRow1);
+    assertSame(mergedRows.get(2), _row3);
   }
 
   @Test
-  public void testCompatibleRowsDataTableTransformation()
+  public void testExtractRowFromDataTable()
       throws Exception {
     Collection<Object[]> rows = new ArrayList<>(2);
     rows.add(_row1);
-    rows.add(_compatibleRow1);
-    DataSchema dataSchema = _dataSchema.clone();
-    assertTrue(dataSchema.isTypeCompatibleWith(_compatibleDataSchema));
-    dataSchema = DataSchema.upgradeToCover(dataSchema, _compatibleDataSchema);
-    assertEquals(dataSchema, _upgradedDataSchema);
-    DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, 
dataSchema, false);
-    Object[] expectedRow1 = {
-        0L, 1.0, 2.0, 3.0, "4", new long[]{5L}, new double[]{6.0}, new 
double[]{7.0}, new double[]{8.0},
-        new String[]{"9"}, BytesUtils.toByteArray("1020")
-    };
-    Object[] expectedCompatibleRow1 = {
-        1L, 2.0, 3.0, 4.0, "5", new long[]{6L}, new double[]{7.0}, new 
double[]{8.0}, new double[]{9.0},
-        new String[]{"10"}, BytesUtils.toByteArray("5060")
-    };
-    
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
 0), expectedRow1));
-    
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
 1), expectedCompatibleRow1));
+    rows.add(_row2);
+    DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, 
_dataSchema, false);
+    
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
 0), _row1));
+    
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
 1), _row2));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to