This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3f36623c56 Enhance broker reduce to handle different column names from
server response (#10454)
3f36623c56 is described below
commit 3f36623c563b5ad495a502a00f627b44bfc0861d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Mar 24 16:07:58 2023 -0700
Enhance broker reduce to handle different column names from server response
(#10454)
---
.../org/apache/pinot/common/utils/DataSchema.java | 61 ---------
.../apache/pinot/common/utils/DataSchemaTest.java | 31 +----
.../query/reduce/SelectionDataTableReducer.java | 92 ++++++--------
.../query/selection/SelectionOperatorService.java | 7 +-
.../query/selection/SelectionOperatorUtils.java | 138 ++-------------------
.../core/query/utils/OrderByComparatorFactory.java | 101 +++++----------
.../selection/SelectionOperatorServiceTest.java | 95 ++++++--------
7 files changed, 120 insertions(+), 405 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 4854f5bf22..a044f5dd60 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -101,67 +101,6 @@ public class DataSchema {
return storedColumnDataTypes;
}
- /**
- * Returns whether the given data schema is type compatible with this one.
- * <ul>
- * <li>All numbers are type compatible with each other</li>
- * <li>Numbers are not type compatible with string</li>
- * <li>Non-array types are not type compatible with array types</li>
- * </ul>
- *
- * @param anotherDataSchema Data schema to compare with
- * @return Whether the two data schemas are type compatible
- */
- public boolean isTypeCompatibleWith(DataSchema anotherDataSchema) {
- if (!Arrays.equals(_columnNames, anotherDataSchema._columnNames)) {
- return false;
- }
- int numColumns = _columnNames.length;
- for (int i = 0; i < numColumns; i++) {
- if
(!_columnDataTypes[i].isCompatible(anotherDataSchema._columnDataTypes[i])) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Upgrade the current data schema to cover the column data types in the
given data schema.
- * <p>Data type <code>LONG</code> can cover <code>INT</code> and
<code>LONG</code>.
- * <p>Data type <code>DOUBLE</code> can cover all numbers, but with
potential precision loss when use it to cover
- * <code>LONG</code>.
- * <p>NOTE: The given data schema should be type compatible with this one.
- *
- * @param originalSchema the original Data schema
- * @param anotherDataSchema Data schema to cover
- */
- public static DataSchema upgradeToCover(DataSchema originalSchema,
DataSchema anotherDataSchema) {
- int numColumns = originalSchema._columnDataTypes.length;
- ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
- for (int i = 0; i < numColumns; i++) {
- ColumnDataType thisColumnDataType = originalSchema._columnDataTypes[i];
- ColumnDataType thatColumnDataType =
anotherDataSchema._columnDataTypes[i];
- if (thisColumnDataType != thatColumnDataType) {
- if (thisColumnDataType.isArray()) {
- if (thisColumnDataType.isWholeNumberArray() &&
thatColumnDataType.isWholeNumberArray()) {
- columnDataTypes[i] = ColumnDataType.LONG_ARRAY;
- } else {
- columnDataTypes[i] = ColumnDataType.DOUBLE_ARRAY;
- }
- } else {
- if (thisColumnDataType.isWholeNumber() &&
thatColumnDataType.isWholeNumber()) {
- columnDataTypes[i] = ColumnDataType.LONG;
- } else {
- columnDataTypes[i] = ColumnDataType.DOUBLE;
- }
- }
- } else {
- columnDataTypes[i] = originalSchema._columnDataTypes[i];
- }
- }
- return new DataSchema(originalSchema._columnNames, columnDataTypes);
- }
-
public byte[] toBytes()
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index 71b062452b..e90842d925 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -34,14 +34,8 @@ public class DataSchemaTest {
"string_array", "boolean_array", "timestamp_array", "bytes_array"
};
private static final int NUM_COLUMNS = COLUMN_NAMES.length;
- private static final DataSchema.ColumnDataType[] COLUMN_DATA_TYPES =
- {INT, LONG, FLOAT, DOUBLE, STRING, OBJECT, INT_ARRAY, LONG_ARRAY,
FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
- BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
- private static final DataSchema.ColumnDataType[]
COMPATIBLE_COLUMN_DATA_TYPES =
- {LONG, FLOAT, DOUBLE, INT, STRING, OBJECT, LONG_ARRAY, FLOAT_ARRAY,
DOUBLE_ARRAY, INT_ARRAY, STRING_ARRAY,
- BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
- private static final DataSchema.ColumnDataType[] UPGRADED_COLUMN_DATA_TYPES
= {
- LONG, DOUBLE, DOUBLE, DOUBLE, STRING, OBJECT, LONG_ARRAY, DOUBLE_ARRAY,
DOUBLE_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
+ private static final DataSchema.ColumnDataType[] COLUMN_DATA_TYPES = {
+ INT, LONG, FLOAT, DOUBLE, STRING, OBJECT, INT_ARRAY, LONG_ARRAY,
FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY
};
@@ -72,22 +66,6 @@ public class DataSchemaTest {
Assert.assertEquals(dataSchema.hashCode(),
dataSchemaAfterSerDe.hashCode());
}
- @Test
- public void testTypeCompatible() {
- DataSchema dataSchema = new DataSchema(COLUMN_NAMES, COLUMN_DATA_TYPES);
- DataSchema compatibleDataSchema = new DataSchema(COLUMN_NAMES,
COMPATIBLE_COLUMN_DATA_TYPES);
- Assert.assertTrue(dataSchema.isTypeCompatibleWith(compatibleDataSchema));
-
- String[] anotherColumnNames = new String[NUM_COLUMNS];
- Arrays.fill(anotherColumnNames, "foo");
- DataSchema incompatibleDataSchema = new DataSchema(anotherColumnNames,
COLUMN_DATA_TYPES);
-
Assert.assertFalse(dataSchema.isTypeCompatibleWith(incompatibleDataSchema));
-
- dataSchema = DataSchema.upgradeToCover(dataSchema, compatibleDataSchema);
- DataSchema upgradedDataSchema = new DataSchema(COLUMN_NAMES,
UPGRADED_COLUMN_DATA_TYPES);
- Assert.assertEquals(dataSchema, upgradedDataSchema);
- }
-
@Test
public void testSuperTypeCheckers() {
List<DataSchema.ColumnDataType> numberTypeToTest = Arrays.asList(INT,
LONG, FLOAT, DOUBLE);
@@ -207,8 +185,9 @@ public class DataSchemaTest {
Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
}
- for (DataSchema.ColumnDataType columnDataType : new
DataSchema.ColumnDataType[]{STRING_ARRAY, BOOLEAN_ARRAY,
- TIMESTAMP_ARRAY, BYTES_ARRAY}) {
+ for (DataSchema.ColumnDataType columnDataType : new
DataSchema.ColumnDataType[]{
+ STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY
+ }) {
Assert.assertFalse(columnDataType.isNumber());
Assert.assertFalse(columnDataType.isWholeNumber());
Assert.assertTrue(columnDataType.isArray());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index 4d30f54935..a21684acdf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -64,63 +65,50 @@ public class SelectionDataTableReducer implements
DataTableReducer {
List<String> selectionColumns =
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
DataSchema selectionDataSchema =
SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns);
brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema,
Collections.emptyList()));
- } else {
- // For data table map with more than one data tables, remove conflicting
data tables
- if (dataTableMap.size() > 1) {
- List<ServerRoutingInstance> droppedServers =
removeConflictingResponses(dataSchema, dataTableMap);
- if (!droppedServers.isEmpty()) {
- String errorMessage =
QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " +
tableName
- + " from servers: " + droppedServers + " got dropped due to data
schema inconsistency.";
- LOGGER.warn(errorMessage);
- if (brokerMetrics != null) {
-
brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
- BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
- }
- brokerResponseNative.addToExceptions(
- new
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE,
errorMessage));
+ return;
+ }
+
+ // For data table map with more than one data tables, remove conflicting
data tables
+ if (dataTableMap.size() > 1) {
+ DataSchema.ColumnDataType[] columnDataTypes =
dataSchema.getColumnDataTypes();
+ List<ServerRoutingInstance> droppedServers = new ArrayList<>();
+ Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator =
dataTableMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
+ DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
+ assert dataSchemaToCompare != null;
+ if (!Arrays.equals(columnDataTypes,
dataSchemaToCompare.getColumnDataTypes())) {
+ droppedServers.add(entry.getKey());
+ iterator.remove();
}
}
-
- int limit = _queryContext.getLimit();
- if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
- // Selection order-by
- SelectionOperatorService selectionService = new
SelectionOperatorService(_queryContext, dataSchema);
- selectionService.reduceWithOrdering(dataTableMap.values(),
_queryContext.isNullHandlingEnabled());
-
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
- } else {
- // Selection only
- List<String> selectionColumns =
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
- List<Object[]> reducedRows =
SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
- _queryContext.isNullHandlingEnabled());
- brokerResponseNative.setResultTable(
-
SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows,
dataSchema, selectionColumns));
+ if (!droppedServers.isEmpty()) {
+ String errorMessage =
+ QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses
for table: " + tableName + " from servers: "
+ + droppedServers + " got dropped due to data schema
inconsistency.";
+ LOGGER.warn(errorMessage);
+ if (brokerMetrics != null) {
+
brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
+ BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
+ }
+ brokerResponseNative.addToExceptions(
+ new
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE,
errorMessage));
}
}
- }
- /**
- * Given a data schema, remove data tables that are not compatible with this
data schema.
- * <p>Upgrade the data schema passed in to cover all remaining data schemas.
- *
- * @param dataSchema data schema.
- * @param dataTableMap map from server to data table.
- * @return list of server names where the data table got removed.
- */
- private List<ServerRoutingInstance> removeConflictingResponses(DataSchema
dataSchema,
- Map<ServerRoutingInstance, DataTable> dataTableMap) {
- List<ServerRoutingInstance> droppedServers = new ArrayList<>();
- Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator =
dataTableMap.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
- DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
- assert dataSchemaToCompare != null;
- if (!dataSchema.isTypeCompatibleWith(dataSchemaToCompare)) {
- droppedServers.add(entry.getKey());
- iterator.remove();
- } else {
- dataSchema = DataSchema.upgradeToCover(dataSchema,
dataSchemaToCompare);
- }
+ int limit = _queryContext.getLimit();
+ if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
+ // Selection order-by
+ SelectionOperatorService selectionService = new
SelectionOperatorService(_queryContext, dataSchema);
+ selectionService.reduceWithOrdering(dataTableMap.values(),
_queryContext.isNullHandlingEnabled());
+
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
+ } else {
+ // Selection only
+ List<String> selectionColumns =
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
+ List<Object[]> reducedRows =
SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
+ _queryContext.isNullHandlingEnabled());
+ brokerResponseNative.setResultTable(
+ SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows,
dataSchema, selectionColumns));
}
- return droppedServers;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 419a4df883..688c7bbf49 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
import org.apache.pinot.spi.trace.Tracing;
import org.roaringbitmap.RoaringBitmap;
@@ -77,11 +78,9 @@ public class SelectionOperatorService {
_offset = queryContext.getOffset();
_numRowsToKeep = _offset + queryContext.getLimit();
assert queryContext.getOrderByExpressions() != null;
- // TODO: Do not use type compatible comparator for performance since we
don't support different data schema on
- // server side combine
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep,
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
-
SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(),
_dataSchema,
- _queryContext.isNullHandlingEnabled()));
+
OrderByComparatorFactory.getComparator(queryContext.getOrderByExpressions(),
+ _queryContext.isNullHandlingEnabled()).reversed());
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index dcb3cec6e6..5dc0bb5fbc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -43,7 +43,6 @@ import
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.trace.Tracing;
-import org.apache.pinot.spi.utils.ArrayCopyUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
@@ -260,19 +259,6 @@ public class SelectionOperatorUtils {
/**
* Build a {@link DataTable} from a {@link Collection} of selection rows
with {@link DataSchema}. (Server side)
- * <p>The passed in data schema stored the column data type that can cover
all actual data types for that column.
- * <p>The actual data types for each column in rows can be different but
must be compatible with each other.
- * <p>Before write each row into the data table, first convert it to match
the data types in data schema.
- *
- * TODO: Type compatibility is not supported for selection order-by because
all segments on the same server shared the
- * same comparator. Another solution is to always use the table schema
to execute the query (preferable because
- * type compatible checks are expensive).
- *
- * @param rows {@link Collection} of selection rows.
- * @param dataSchema data schema.
- * @param nullHandlingEnabled whether null handling is enabled.
- * @return data table.
- * @throws IOException
*/
public static DataTable getDataTableFromRows(Collection<Object[]> rows,
DataSchema dataSchema,
boolean nullHandlingEnabled)
@@ -309,22 +295,22 @@ public class SelectionOperatorUtils {
switch (storedColumnDataTypes[i]) {
// Single-value column
case INT:
- dataTableBuilder.setColumn(i, ((Number) columnValue).intValue());
+ dataTableBuilder.setColumn(i, (int) columnValue);
break;
case LONG:
- dataTableBuilder.setColumn(i, ((Number) columnValue).longValue());
+ dataTableBuilder.setColumn(i, (long) columnValue);
break;
case FLOAT:
- dataTableBuilder.setColumn(i, ((Number) columnValue).floatValue());
+ dataTableBuilder.setColumn(i, (float) columnValue);
break;
case DOUBLE:
- dataTableBuilder.setColumn(i, ((Number)
columnValue).doubleValue());
+ dataTableBuilder.setColumn(i, (double) columnValue);
break;
case BIG_DECIMAL:
dataTableBuilder.setColumn(i, (BigDecimal) columnValue);
break;
case STRING:
- dataTableBuilder.setColumn(i, ((String) columnValue));
+ dataTableBuilder.setColumn(i, (String) columnValue);
break;
case BYTES:
dataTableBuilder.setColumn(i, (ByteArray) columnValue);
@@ -335,43 +321,13 @@ public class SelectionOperatorUtils {
dataTableBuilder.setColumn(i, (int[]) columnValue);
break;
case LONG_ARRAY:
- // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
- if (columnValue instanceof int[]) {
- int[] ints = (int[]) columnValue;
- int length = ints.length;
- long[] longs = new long[length];
- ArrayCopyUtils.copy(ints, longs, length);
- dataTableBuilder.setColumn(i, longs);
- } else {
- dataTableBuilder.setColumn(i, (long[]) columnValue);
- }
+ dataTableBuilder.setColumn(i, (long[]) columnValue);
break;
case FLOAT_ARRAY:
dataTableBuilder.setColumn(i, (float[]) columnValue);
break;
case DOUBLE_ARRAY:
- // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and
DOUBLE_ARRAY
- if (columnValue instanceof int[]) {
- int[] ints = (int[]) columnValue;
- int length = ints.length;
- double[] doubles = new double[length];
- ArrayCopyUtils.copy(ints, doubles, length);
- dataTableBuilder.setColumn(i, doubles);
- } else if (columnValue instanceof long[]) {
- long[] longs = (long[]) columnValue;
- int length = longs.length;
- double[] doubles = new double[length];
- ArrayCopyUtils.copy(longs, doubles, length);
- dataTableBuilder.setColumn(i, doubles);
- } else if (columnValue instanceof float[]) {
- float[] floats = (float[]) columnValue;
- int length = floats.length;
- double[] doubles = new double[length];
- ArrayCopyUtils.copy(floats, doubles, length);
- dataTableBuilder.setColumn(i, doubles);
- } else {
- dataTableBuilder.setColumn(i, (double[]) columnValue);
- }
+ dataTableBuilder.setColumn(i, (double[]) columnValue);
break;
case STRING_ARRAY:
dataTableBuilder.setColumn(i, (String[]) columnValue);
@@ -593,86 +549,6 @@ public class SelectionOperatorUtils {
return columnToIndexMap;
}
- /**
- * Helper method to get the type-compatible {@link Comparator} for selection
rows. (Inter segment)
- * <p>Type-compatible comparator allows compatible types to compare with
each other.
- *
- * @return flexible {@link Comparator} for selection rows.
- */
- public static Comparator<Object[]>
getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions,
- DataSchema dataSchema, boolean isNullHandlingEnabled) {
- ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-
- // Compare all single-value columns
- int numOrderByExpressions = orderByExpressions.size();
- List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
- for (int i = 0; i < numOrderByExpressions; i++) {
- if (!columnDataTypes[i].isArray()) {
- valueIndexList.add(i);
- }
- }
-
- int numValuesToCompare = valueIndexList.size();
- int[] valueIndices = new int[numValuesToCompare];
- boolean[] useDoubleComparison = new boolean[numValuesToCompare];
- // Use multiplier -1 or 1 to control ascending/descending order
- int[] multipliers = new int[numValuesToCompare];
- for (int i = 0; i < numValuesToCompare; i++) {
- int valueIndex = valueIndexList.get(i);
- valueIndices[i] = valueIndex;
- if (columnDataTypes[valueIndex].isNumber()) {
- useDoubleComparison[i] = true;
- }
- multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
- }
-
- if (isNullHandlingEnabled) {
- return (o1, o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
- Object v1 = o1[index];
- Object v2 = o2[index];
- if (v1 == null) {
- // The default null ordering is: 'NULLS LAST'.
- return v2 == null ? 0 : -multipliers[i];
- } else if (v2 == null) {
- return multipliers[i];
- }
- int result;
- if (useDoubleComparison[i]) {
- result = Double.compare(((Number) v1).doubleValue(), ((Number)
v2).doubleValue());
- } else {
- //noinspection unchecked
- result = ((Comparable) v1).compareTo(v2);
- }
- if (result != 0) {
- return result * multipliers[i];
- }
- }
- return 0;
- };
- } else {
- return (o1, o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
- Object v1 = o1[index];
- Object v2 = o2[index];
- int result;
- if (useDoubleComparison[i]) {
- result = Double.compare(((Number) v1).doubleValue(), ((Number)
v2).doubleValue());
- } else {
- //noinspection unchecked
- result = ((Comparable) v1).compareTo(v2);
- }
- if (result != 0) {
- return result * multipliers[i];
- }
- }
- return 0;
- };
- }
- }
-
/**
* Helper method to add a value to a {@link PriorityQueue}.
*
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
index cbf978e882..f7225a2197 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
@@ -18,16 +18,11 @@
*/
package org.apache.pinot.core.query.utils;
-import com.google.common.base.Preconditions;
-import java.math.BigDecimal;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.core.operator.ColumnContext;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.utils.ByteArray;
/**
@@ -44,103 +39,67 @@ public class OrderByComparatorFactory {
public static Comparator<Object[]>
getComparator(List<OrderByExpressionContext> orderByExpressions,
ColumnContext[] orderByColumnContexts, boolean nullHandlingEnabled, int
from, int to) {
- Preconditions.checkArgument(to <= orderByExpressions.size(),
- "Trying to access %sth position of orderByExpressions with size %s",
to, orderByExpressions.size());
- Preconditions.checkArgument(to <= orderByColumnContexts.length,
- "Trying to access %sth position of orderByExpressionMetadata with size
%s", to, orderByColumnContexts.length);
- Preconditions.checkArgument(from < to, "FROM (%s) must be lower than TO
(%s)", from, to);
+ assert 0 <= from && from < to && to <= orderByExpressions.size();
- // Compare all single-value columns
- int numOrderByExpressions = to - from;
- List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+ // Check if all expressions are single-valued
for (int i = from; i < to; i++) {
- if (orderByColumnContexts[i].isSingleValue()) {
- valueIndexList.add(i);
- } else {
- // MV columns should not be part of the selection order by only list
+ if (!orderByColumnContexts[i].isSingleValue()) {
+ // MV columns should not be part of the selection order-by list
throw new BadQueryRequestException(
String.format("MV expression: %s should not be included in the
ORDER-BY clause",
orderByExpressions.get(i)));
}
}
- int numValuesToCompare = valueIndexList.size();
- int[] valueIndices = new int[numValuesToCompare];
- FieldSpec.DataType[] storedTypes = new
FieldSpec.DataType[numValuesToCompare];
+ return getComparator(orderByExpressions, nullHandlingEnabled, from, to);
+ }
+
+ public static Comparator<Object[]>
getComparator(List<OrderByExpressionContext> orderByExpressions,
+ boolean nullHandlingEnabled) {
+ return getComparator(orderByExpressions, nullHandlingEnabled, 0,
orderByExpressions.size());
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static Comparator<Object[]>
getComparator(List<OrderByExpressionContext> orderByExpressions,
+ boolean nullHandlingEnabled, int from, int to) {
+ assert 0 <= from && from < to && to <= orderByExpressions.size();
+
// Use multiplier -1 or 1 to control ascending/descending order
- int[] multipliers = new int[numValuesToCompare];
- for (int i = 0; i < numValuesToCompare; i++) {
- int valueIndex = valueIndexList.get(i);
- valueIndices[i] = valueIndex;
- storedTypes[i] =
orderByColumnContexts[valueIndex].getDataType().getStoredType();
- multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? 1 : -1;
+ int[] multipliers = new int[to];
+ for (int i = from; i < to; i++) {
+ multipliers[i] = orderByExpressions.get(i).isAsc() ? 1 : -1;
}
if (nullHandlingEnabled) {
return (Object[] o1, Object[] o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
- // TODO: Evaluate the performance of casting to Comparable and avoid
the switch
- Object v1 = o1[index];
- Object v2 = o2[index];
+ for (int i = from; i < to; i++) {
+ Comparable v1 = (Comparable) o1[i];
+ Comparable v2 = (Comparable) o2[i];
if (v1 == null) {
// The default null ordering is: 'NULLS LAST', regardless of the
ordering direction.
return v2 == null ? 0 : -multipliers[i];
} else if (v2 == null) {
return multipliers[i];
}
- int result = compareCols(v1, v2, storedTypes[i], multipliers[i]);
+ int result = v1.compareTo(v2);
if (result != 0) {
- return result;
+ return result * multipliers[i];
}
}
return 0;
};
} else {
return (Object[] o1, Object[] o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
- // TODO: Evaluate the performance of casting to Comparable and avoid
the switch
- int result = compareCols(o1[index], o2[index], storedTypes[i],
multipliers[i]);
+ for (int i = from; i < to; i++) {
+ Comparable v1 = (Comparable) o1[i];
+ Comparable v2 = (Comparable) o2[i];
+ int result = v1.compareTo(v2);
if (result != 0) {
- return result;
+ return result * multipliers[i];
}
}
return 0;
};
}
}
-
- private static int compareCols(Object v1, Object v2, FieldSpec.DataType
type, int multiplier) {
-
- // TODO: Evaluate the performance of casting to Comparable and avoid the
switch
- int result;
- switch (type) {
- case INT:
- result = ((Integer) v1).compareTo((Integer) v2);
- break;
- case LONG:
- result = ((Long) v1).compareTo((Long) v2);
- break;
- case FLOAT:
- result = ((Float) v1).compareTo((Float) v2);
- break;
- case DOUBLE:
- result = ((Double) v1).compareTo((Double) v2);
- break;
- case BIG_DECIMAL:
- result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
- break;
- case STRING:
- result = ((String) v1).compareTo((String) v2);
- break;
- case BYTES:
- result = ((ByteArray) v1).compareTo((ByteArray) v2);
- break;
- // NOTE: Multi-value columns are not comparable, so we should not reach
here
- default:
- throw new IllegalStateException();
- }
- return result * multiplier;
- }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
index cdae8a37e6..7d44c6b65b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.selection;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -28,9 +29,11 @@ import java.util.List;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.utils.BytesUtils;
import org.testng.annotations.BeforeClass;
@@ -50,45 +53,30 @@ import static org.testng.Assert.assertTrue;
*/
public class SelectionOperatorServiceTest {
private final String[] _columnNames = {
- "int", "long", "float", "double", "string", "int_array", "long_array",
"float_array", "double_array",
- "string_array", "bytes"
+ "int", "long", "float", "double", "big_decimal", "string", "bytes",
"int_array", "long_array", "float_array",
+ "double_array", "string_array"
};
- private final DataSchema.ColumnDataType[] _columnDataTypes = {
- DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
DataSchema.ColumnDataType.FLOAT,
- DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.INT_ARRAY,
- DataSchema.ColumnDataType.LONG_ARRAY,
DataSchema.ColumnDataType.FLOAT_ARRAY,
- DataSchema.ColumnDataType.DOUBLE_ARRAY,
DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.BYTES
+ private final ColumnDataType[] _columnDataTypes = {
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT,
ColumnDataType.DOUBLE,
+ ColumnDataType.BIG_DECIMAL, ColumnDataType.STRING, ColumnDataType.BYTES,
ColumnDataType.INT_ARRAY,
+ ColumnDataType.LONG_ARRAY, ColumnDataType.FLOAT_ARRAY,
ColumnDataType.DOUBLE_ARRAY, ColumnDataType.STRING_ARRAY
};
private final DataSchema _dataSchema = new DataSchema(_columnNames,
_columnDataTypes);
- private final DataSchema.ColumnDataType[] _compatibleColumnDataTypes = {
- DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.FLOAT,
DataSchema.ColumnDataType.DOUBLE,
- DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.LONG_ARRAY,
- DataSchema.ColumnDataType.FLOAT_ARRAY,
DataSchema.ColumnDataType.DOUBLE_ARRAY,
- DataSchema.ColumnDataType.INT_ARRAY,
DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.BYTES
- };
- private final DataSchema _compatibleDataSchema = new
DataSchema(_columnNames, _compatibleColumnDataTypes);
- private final DataSchema.ColumnDataType[] _upgradedColumnDataTypes = new
DataSchema.ColumnDataType[]{
- DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
DataSchema.ColumnDataType.DOUBLE,
- DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.LONG_ARRAY,
- DataSchema.ColumnDataType.DOUBLE_ARRAY,
DataSchema.ColumnDataType.DOUBLE_ARRAY,
- DataSchema.ColumnDataType.DOUBLE_ARRAY,
DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.BYTES
- };
- private final DataSchema _upgradedDataSchema = new DataSchema(_columnNames,
_upgradedColumnDataTypes);
private final Object[] _row1 = {
- 0, 1L, 2.0F, 3.0, "4", new int[]{5}, new long[]{6L}, new float[]{7.0F},
new double[]{8.0}, new String[]{"9"},
- BytesUtils.toByteArray("1020")
+ 0, 1L, 2.0F, 3.0, new BigDecimal(4), "5", BytesUtils.toByteArray(
+ "0606"), new int[]{7}, new long[]{8L}, new float[]{9.0F}, new
double[]{10.0}, new String[]{"11"}
};
private final Object[] _row2 = {
- 10, 11L, 12.0F, 13.0, "14", new int[]{15}, new long[]{16L}, new
float[]{17.0F}, new double[]{18.0},
- new String[]{"19"}, BytesUtils.toByteArray("3040")
+ 10, 11L, 12.0F, 13.0, new BigDecimal(14), "15", BytesUtils.toByteArray(
+ "1616"), new int[]{17}, new long[]{18L}, new float[]{19.0F}, new
double[]{20.0}, new String[]{"21"}
};
- private final Object[] _compatibleRow1 = {
- 1L, 2.0F, 3.0, 4, "5", new long[]{6L}, new float[]{7.0F}, new
double[]{8.0}, new int[]{9}, new String[]{"10"},
- BytesUtils.toByteArray("5060")
+ private final Object[] _row3 = {
+ 1, 2L, 3.0F, 4.0, new BigDecimal(5), "6", BytesUtils.toByteArray(
+ "0707"), new int[]{8}, new long[]{9L}, new float[]{10.0F}, new
double[]{11.0}, new String[]{"12"}
};
- private final Object[] _compatibleRow2 = {
- 11L, 12.0F, 13.0, 14, "15", new long[]{16L}, new float[]{17.0F}, new
double[]{18.0}, new int[]{19},
- new String[]{"20"}, BytesUtils.toByteArray("7000")
+ private final Object[] _row4 = {
+ 11, 12L, 13.0F, 14.0, new BigDecimal(15), "16", BytesUtils.toByteArray(
+ "1717"), new int[]{18}, new long[]{19L}, new float[]{20.0F}, new
double[]{21.0}, new String[]{"22"}
};
private QueryContext _queryContext;
@@ -169,63 +157,50 @@ public class SelectionOperatorServiceTest {
}
@Test
- public void testCompatibleRowsMergeWithoutOrdering() {
+ public void testRowsMergeWithoutOrdering() {
List<Object[]> mergedRows = new ArrayList<>(2);
mergedRows.add(_row1);
mergedRows.add(_row2);
SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema,
mergedRows);
List<Object[]> rowsToMerge = new ArrayList<>(2);
- rowsToMerge.add(_compatibleRow1);
- rowsToMerge.add(_compatibleRow2);
- SelectionResultsBlock blockToMerge = new
SelectionResultsBlock(_compatibleDataSchema, rowsToMerge);
+ rowsToMerge.add(_row3);
+ rowsToMerge.add(_row4);
+ SelectionResultsBlock blockToMerge = new
SelectionResultsBlock(_dataSchema, rowsToMerge);
SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, 3);
assertEquals(mergedRows.size(), 3);
assertSame(mergedRows.get(0), _row1);
assertSame(mergedRows.get(1), _row2);
- assertSame(mergedRows.get(2), _compatibleRow1);
+ assertSame(mergedRows.get(2), _row3);
}
@Test
- public void testCompatibleRowsMergeWithOrdering() {
+ public void testRowsMergeWithOrdering() {
assertNotNull(_queryContext.getOrderByExpressions());
Comparator<Object[]> comparator =
-
SelectionOperatorUtils.getTypeCompatibleComparator(_queryContext.getOrderByExpressions(),
_dataSchema,
- _queryContext.isNullHandlingEnabled()).reversed();
+
OrderByComparatorFactory.getComparator(_queryContext.getOrderByExpressions(),
false);
int maxNumRows = _queryContext.getOffset() + _queryContext.getLimit();
SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema,
Collections.emptyList(), comparator);
List<Object[]> rowsToMerge1 = Arrays.asList(_row2, _row1);
SelectionResultsBlock blockToMerge1 = new
SelectionResultsBlock(_dataSchema, rowsToMerge1, comparator);
SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge1,
maxNumRows);
- List<Object[]> rowsToMerge2 = Arrays.asList(_compatibleRow2,
_compatibleRow1);
- SelectionResultsBlock blockToMerge2 = new
SelectionResultsBlock(_compatibleDataSchema, rowsToMerge2, comparator);
+ List<Object[]> rowsToMerge2 = Arrays.asList(_row4, _row3);
+ SelectionResultsBlock blockToMerge2 = new
SelectionResultsBlock(_dataSchema, rowsToMerge2, comparator);
SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge2,
maxNumRows);
List<Object[]> mergedRows = mergedBlock.getRows();
assertEquals(mergedRows.size(), 3);
- assertSame(mergedRows.get(0), _compatibleRow2);
+ assertSame(mergedRows.get(0), _row4);
assertSame(mergedRows.get(1), _row2);
- assertSame(mergedRows.get(2), _compatibleRow1);
+ assertSame(mergedRows.get(2), _row3);
}
@Test
- public void testCompatibleRowsDataTableTransformation()
+ public void testExtractRowFromDataTable()
throws Exception {
Collection<Object[]> rows = new ArrayList<>(2);
rows.add(_row1);
- rows.add(_compatibleRow1);
- DataSchema dataSchema = _dataSchema.clone();
- assertTrue(dataSchema.isTypeCompatibleWith(_compatibleDataSchema));
- dataSchema = DataSchema.upgradeToCover(dataSchema, _compatibleDataSchema);
- assertEquals(dataSchema, _upgradedDataSchema);
- DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows,
dataSchema, false);
- Object[] expectedRow1 = {
- 0L, 1.0, 2.0, 3.0, "4", new long[]{5L}, new double[]{6.0}, new
double[]{7.0}, new double[]{8.0},
- new String[]{"9"}, BytesUtils.toByteArray("1020")
- };
- Object[] expectedCompatibleRow1 = {
- 1L, 2.0, 3.0, 4.0, "5", new long[]{6L}, new double[]{7.0}, new
double[]{8.0}, new double[]{9.0},
- new String[]{"10"}, BytesUtils.toByteArray("5060")
- };
-
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
0), expectedRow1));
-
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
1), expectedCompatibleRow1));
+ rows.add(_row2);
+ DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows,
_dataSchema, false);
+
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
0), _row1));
+
assertTrue(Arrays.deepEquals(SelectionOperatorUtils.extractRowFromDataTable(dataTable,
1), _row2));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]