This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch selection_comparator_hotfix in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e9c5fad0640abb28e874a001a8ec233d46bca02e Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Mon Nov 4 20:55:00 2019 -0800 Hotfix 2 for Selection Comparator - Change back to row-based switch so that comparison can be inlined - Reduce the expression compilation --- .../operator/query/EmptySelectionOperator.java | 2 +- .../core/operator/query/SelectionOnlyOperator.java | 2 +- .../operator/query/SelectionOrderByOperator.java | 79 ++++++++------- .../apache/pinot/core/plan/SelectionPlanNode.java | 2 +- .../apache/pinot/core/plan/TransformPlanNode.java | 28 +++--- .../query/selection/SelectionOperatorService.java | 40 +++++--- .../query/selection/SelectionOperatorUtils.java | 108 +++++++++------------ .../selection/SelectionOperatorServiceTest.java | 4 +- 8 files changed, 135 insertions(+), 130 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java index 044d4c5..ae846d2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/EmptySelectionOperator.java @@ -45,7 +45,7 @@ public class EmptySelectionOperator extends BaseOperator<IntermediateResultsBloc public EmptySelectionOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) { List<TransformExpressionTree> expressions = - SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment, null); + SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment); int numExpressions = expressions.size(); String[] columnNames = new String[numExpressions]; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java index 6c5cb3c..f4661eb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java @@ -55,7 +55,7 @@ public class SelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock public SelectionOnlyOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) { _indexSegment = indexSegment; _transformOperator = transformOperator; - _expressions = SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment, null); + _expressions = SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment); int numExpressions = _expressions.size(); _expressionMetadata = new TransformResultMetadata[numExpressions]; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index f6cf5a6..411e5a2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import org.apache.pinot.common.data.FieldSpec.DataType; import org.apache.pinot.common.request.Selection; import org.apache.pinot.common.request.SelectionSort; import org.apache.pinot.common.request.transform.TransformExpressionTree; @@ -46,7 +47,6 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl private final IndexSegment _indexSegment; private final TransformOperator _transformOperator; - private final List<SelectionSort> _sortSequence; private final List<TransformExpressionTree> _expressions; private final TransformResultMetadata[] _expressionMetadata; private final Dictionary[] _dictionaries; @@ -60,9 +60,8 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl public SelectionOrderByOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) { _indexSegment = indexSegment; _transformOperator = transformOperator; - _sortSequence = selection.getSelectionSortSequence(); - _expressions = - SelectionOperatorUtils.extractExpressions(selection.getSelectionColumns(), indexSegment, _sortSequence); + _expressions = SelectionOperatorUtils + .extractExpressions(selection.getSelectionColumns(), indexSegment, selection.getSelectionSortSequence()); int numExpressions = _expressions.size(); _expressionMetadata = new TransformResultMetadata[numExpressions]; @@ -85,12 +84,12 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl _numRowsToKeep = selection.getOffset() + selection.getSize(); _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), - getComparator()); + getComparator(selection.getSelectionSortSequence())); } - private Comparator<Serializable[]> getComparator() { + private Comparator<Serializable[]> getComparator(List<SelectionSort> sortSequence) { // Compare all single-value columns - int numOrderByExpressions = _sortSequence.size(); + int numOrderByExpressions = sortSequence.size(); List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions); for (int i = 0; i < numOrderByExpressions; i++) { if (_expressionMetadata[i].isSingleValue()) { @@ -100,38 +99,50 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl int numValuesToCompare = valueIndexList.size(); int[] valueIndices = new int[numValuesToCompare]; - Comparator[] valueComparators = new Comparator[numValuesToCompare]; + DataType[] dataTypes = new DataType[numValuesToCompare]; + // Use multiplier -1 or 1 to control ascending/descending order + int[] multipliers = new int[numValuesToCompare]; for (int i = 0; i < numValuesToCompare; i++) { int valueIndex = valueIndexList.get(i); valueIndices[i] = valueIndex; - switch (_expressionMetadata[valueIndex].getDataType()) { - case INT: - valueComparators[i] = (Comparator<Integer>) Integer::compare; - break; - case LONG: - valueComparators[i] = (Comparator<Long>) Long::compare; - break; - case FLOAT: - valueComparators[i] = (Comparator<Float>) Float::compare; - break; - case DOUBLE: - valueComparators[i] = (Comparator<Double>) Double::compare; - break; - case STRING: - valueComparators[i] = Comparator.naturalOrder(); - break; - case BYTES: - valueComparators[i] = (Comparator<byte[]>) ByteArray::compare; - break; - default: - throw new IllegalStateException(); - } - if (_sortSequence.get(valueIndex).isIsAsc()) { - valueComparators[i] = valueComparators[i].reversed(); - } + dataTypes[i] = _expressionMetadata[valueIndex].getDataType(); + multipliers[i] = sortSequence.get(valueIndex).isIsAsc() ? -1 : 1; } - return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators); + return (o1, o2) -> { + for (int i = 0; i < numValuesToCompare; i++) { + int index = valueIndices[i]; + Serializable v1 = o1[index]; + Serializable v2 = o2[index]; + int result; + switch (dataTypes[i]) { + case INT: + result = ((Integer) v1).compareTo((Integer) v2); + break; + case LONG: + result = ((Long) v1).compareTo((Long) v2); + break; + case FLOAT: + result = ((Float) v1).compareTo((Float) v2); + break; + case DOUBLE: + result = ((Double) v1).compareTo((Double) v2); + break; + case STRING: + result = ((String) v1).compareTo((String) v2); + break; + case BYTES: + result = ByteArray.compare((byte[]) v1, (byte[]) v2); + break; + default: + throw new IllegalStateException(); + } + if (result != 0) { + return result * multipliers[i]; + } + } + return 0; + }; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java index 0b87eb7..d1ac9fc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java @@ -38,7 +38,7 @@ public class SelectionPlanNode implements PlanNode { private final IndexSegment _indexSegment; private final Selection _selection; - private TransformPlanNode _transformPlanNode; + private final TransformPlanNode _transformPlanNode; public SelectionPlanNode(IndexSegment indexSegment, BrokerRequest brokerRequest) { _indexSegment = indexSegment; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java index 84d9287..494e123 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java @@ -58,19 +58,17 @@ public class TransformPlanNode implements PlanNode { * Helper method to extract projection columns and transform expressions from the given broker request. */ private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegment indexSegment) { + Set<String> columns = new HashSet<>(); if (brokerRequest.isSetAggregationsInfo()) { // Extract aggregation expressions for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) { if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) { - addExpressionColumn(AggregationFunctionUtils.getColumn(aggregationInfo)); + columns.add(AggregationFunctionUtils.getColumn(aggregationInfo)); } } - // Extract group-by expressions if (brokerRequest.isSetGroupBy()) { - for (String column : brokerRequest.getGroupBy().getExpressions()) { - addExpressionColumn(column); - } + columns.addAll(brokerRequest.getGroupBy().getExpressions()); } } else { Selection selection = brokerRequest.getSelections(); @@ -83,9 +81,7 @@ public class TransformPlanNode implements PlanNode { _expressions.add(new TransformExpressionTree(new IdentifierAstNode(column))); } } else { - for (String column : selectionColumns) { - addExpressionColumn(column); - } + columns.addAll(selectionColumns); } // Extract order-by expressions and update maxDocPerNextCall @@ -96,7 +92,10 @@ public class TransformPlanNode implements PlanNode { _maxDocPerNextCall = Math.min(selection.getSize(), _maxDocPerNextCall); } else { for (SelectionSort selectionSort : sortSequence) { - addExpressionColumn(selectionSort.getColumn()); + String orderByColumn = selectionSort.getColumn(); + if (!_projectionColumns.contains(orderByColumn)) { + columns.add(orderByColumn); + } } } } else { @@ -105,12 +104,11 @@ public class TransformPlanNode implements PlanNode { _maxDocPerNextCall = 1; } } - } - - private void addExpressionColumn(String column) { - TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column); - expression.getColumns(_projectionColumns); - _expressions.add(expression); + for (String column : columns) { + TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column); + expression.getColumns(_projectionColumns); + _expressions.add(expression); + } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java index b5ecb86..cdd2254 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java @@ -60,7 +60,6 @@ import org.apache.pinot.common.utils.DataTable; */ public class SelectionOperatorService { private final List<String> _selectionColumns; - private final List<SelectionSort> _sortSequence; private final DataSchema _dataSchema; private final int _offset; private final int _numRowsToKeep; @@ -74,13 +73,12 @@ public class SelectionOperatorService { */ public SelectionOperatorService(Selection selection, DataSchema dataSchema) { _selectionColumns = SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), dataSchema); - _sortSequence = selection.getSelectionSortSequence(); _dataSchema = dataSchema; // Select rows from offset to offset + size. _offset = selection.getOffset(); _numRowsToKeep = _offset + selection.getSize(); _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), - getTypeCompatibleComparator()); + getTypeCompatibleComparator(selection.getSelectionSortSequence())); } /** @@ -89,9 +87,9 @@ public class SelectionOperatorService { * * @return flexible {@link Comparator} for selection rows. */ - private Comparator<Serializable[]> getTypeCompatibleComparator() { + private Comparator<Serializable[]> getTypeCompatibleComparator(List<SelectionSort> sortSequence) { // Compare all single-value columns - int numOrderByExpressions = _sortSequence.size(); + int numOrderByExpressions = sortSequence.size(); List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions); for (int i = 0; i < numOrderByExpressions; i++) { if (!_dataSchema.getColumnDataType(i).isArray()) { @@ -101,21 +99,33 @@ public class SelectionOperatorService { int numValuesToCompare = valueIndexList.size(); int[] valueIndices = new int[numValuesToCompare]; - Comparator[] valueComparators = new Comparator[numValuesToCompare]; + boolean[] isNumber = new boolean[numValuesToCompare]; + // Use multiplier -1 or 1 to control ascending/descending order + int[] multipliers = new int[numValuesToCompare]; for (int i = 0; i < numValuesToCompare; i++) { int valueIndex = valueIndexList.get(i); valueIndices[i] = valueIndex; - if (_dataSchema.getColumnDataType(i).isNumber()) { - valueComparators[i] = Comparator.comparingDouble(Number::doubleValue); - } else { - valueComparators[i] = Comparator.naturalOrder(); - } - if (_sortSequence.get(valueIndex).isIsAsc()) { - valueComparators[i] = valueComparators[i].reversed(); - } + isNumber[i] = _dataSchema.getColumnDataType(valueIndex).isNumber(); + multipliers[i] = sortSequence.get(valueIndex).isIsAsc() ? -1 : 1; } - return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators); + return (o1, o2) -> { + for (int i = 0; i < numValuesToCompare; i++) { + int index = valueIndices[i]; + Serializable v1 = o1[index]; + Serializable v2 = o2[index]; + int result; + if (isNumber[i]) { + result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue()); + } else { + result = ((String) v1).compareTo((String) v2); + } + if (result != 0) { + return result * multipliers[i]; + } + } + return 0; + }; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java index 1d3eb8f..783b357 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java @@ -23,7 +23,6 @@ import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -87,24 +86,51 @@ public class SelectionOperatorUtils { ThreadLocal.withInitial(() -> new DecimalFormat(DOUBLE_PATTERN, DECIMAL_FORMAT_SYMBOLS)); /** - * Extracts the expressions from a selection query, expands {@code 'SELECT *'} to all physical columns if applies. - * <p>For selection order-by queries, order-by expressions will be put at the front. The expressions returned are - * deduplicated. - * <NOTE>DO NOT change the order of the expressions returned because broker relies on that to process the query. + * Extracts the expressions from a selection-only query, expands {@code 'SELECT *'} to all physical columns if + * applies. + * <p>NOTE: DO NOT change the order of the expressions returned because broker relies on that to process the query. */ public static List<TransformExpressionTree> extractExpressions(List<String> selectionColumns, - IndexSegment indexSegment, @Nullable List<SelectionSort> sortSequence) { - Set<TransformExpressionTree> expressionSet = new HashSet<>(); - List<TransformExpressionTree> expressions = new ArrayList<>(); + IndexSegment indexSegment) { + if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) { + // For 'SELECT *', sort all physical columns so that the order is deterministic + selectionColumns = new ArrayList<>(indexSegment.getPhysicalColumnNames()); + selectionColumns.sort(null); - if (sortSequence != null) { - for (SelectionSort selectionSort : sortSequence) { - TransformExpressionTree orderByExpression = - TransformExpressionTree.compileToExpressionTree(selectionSort.getColumn()); - if (expressionSet.add(orderByExpression)) { - expressions.add(orderByExpression); + List<TransformExpressionTree> expressions = new ArrayList<>(selectionColumns.size()); + for (String selectionColumn : selectionColumns) { + expressions.add(new TransformExpressionTree(new IdentifierAstNode(selectionColumn))); + } + return expressions; + } else { + // Note: selection expressions have been standardized during query compilation + Set<String> selectionColumnSet = new HashSet<>(); + List<TransformExpressionTree> expressions = new ArrayList<>(selectionColumns.size()); + for (String selectionColumn : selectionColumns) { + if (selectionColumnSet.add(selectionColumn)) { + expressions.add(TransformExpressionTree.compileToExpressionTree(selectionColumn)); } } + return expressions; + } + } + + /** + * Extracts the expressions from a selection order-by query, expands {@code 'SELECT *'} to all physical columns if + * applies. + * <p>Order-by expressions will be put at the front. The expressions returned are deduplicated. + * <p>NOTE: DO NOT change the order of the expressions returned because broker relies on that to process the query. + */ + public static List<TransformExpressionTree> extractExpressions(List<String> selectionColumns, + IndexSegment indexSegment, List<SelectionSort> sortSequence) { + Set<String> columnSet = new HashSet<>(); + List<TransformExpressionTree> expressions = new ArrayList<>(); + + // NOTE: order-by expressions have been standardized and deduplicated during query compilation + for (SelectionSort selectionSort : sortSequence) { + String orderByColumn = selectionSort.getColumn(); + columnSet.add(orderByColumn); + expressions.add(TransformExpressionTree.compileToExpressionTree(orderByColumn)); } if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) { @@ -113,17 +139,15 @@ public class SelectionOperatorUtils { selectionColumns.sort(null); for (String selectionColumn : selectionColumns) { - TransformExpressionTree selectionExpression = - new TransformExpressionTree(new IdentifierAstNode(selectionColumn)); - if (expressionSet.add(selectionExpression)) { - expressions.add(selectionExpression); + if (!columnSet.contains(selectionColumn)) { + expressions.add(new TransformExpressionTree(new IdentifierAstNode(selectionColumn))); } } } else { + // Note: selection expressions have been standardized during query compilation for (String selectionColumn : selectionColumns) { - TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn); - if (expressionSet.add(selectionExpression)) { - expressions.add(selectionExpression); + if (columnSet.add(selectionColumn)) { + expressions.add(TransformExpressionTree.compileToExpressionTree(selectionColumn)); } } } @@ -500,7 +524,7 @@ public class SelectionOperatorUtils { length = ints.length; formattedValue = new String[length]; for (int i = 0; i < length; i++) { - formattedValue[i] = longFormat.format((long) ints[i]); + formattedValue[i] = longFormat.format(ints[i]); } } else { long[] longs = (long[]) value; @@ -544,7 +568,7 @@ public class SelectionOperatorUtils { length = floats.length; formattedValue = new String[length]; for (int i = 0; i < length; i++) { - formattedValue[i] = doubleFormat.format((double) floats[i]); + formattedValue[i] = doubleFormat.format(floats[i]); } return formattedValue; } else { @@ -579,42 +603,4 @@ public class SelectionOperatorUtils { queue.offer(value); } } - - /** - * Helper Comparator class to compare rows. - * <p>Two arguments are expected to construct the comparator: - * <ul> - * <li> - * Value indices: an array of column indices in each row where the values need to be compared (only the - * single-value order-by columns need to be compared) - * </li> - * <li> - * Value comparators: an array of Comparator, where each element is the Comparator for the corresponding column in - * the value indices array - * </li> - * </ul> - */ - public static class RowComparator implements Comparator<Serializable[]> { - private final int[] _valueIndices; - private final Comparator[] _valueComparators; - - public RowComparator(int[] valueIndices, Comparator[] valueComparators) { - _valueIndices = valueIndices; - _valueComparators = valueComparators; - } - - @SuppressWarnings("unchecked") - @Override - public int compare(Serializable[] o1, Serializable[] o2) { - int numValuesToCompare = _valueIndices.length; - for (int i = 0; i < numValuesToCompare; i++) { - int valueIndex = _valueIndices[i]; - int result = _valueComparators[i].compare(o1[valueIndex], o2[valueIndex]); - if (result != 0) { - return result; - } - } - return 0; - } - } } diff --git a/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java index 2b6c42b..5ae3016 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/selection/SelectionOperatorServiceTest.java @@ -91,7 +91,7 @@ public class SelectionOperatorServiceTest { IndexSegment indexSegment = mock(IndexSegment.class); when(indexSegment.getPhysicalColumnNames()).thenReturn(new HashSet<>(Arrays.asList("foo", "bar", "foobar"))); List<TransformExpressionTree> expressions = - SelectionOperatorUtils.extractExpressions(selectionColumns, indexSegment, null); + SelectionOperatorUtils.extractExpressions(selectionColumns, indexSegment); assertEquals(expressions.size(), 5); assertEquals(expressions.get(0).toString(), "add(foo,'1')"); assertEquals(expressions.get(1).toString(), "foo"); @@ -100,7 +100,7 @@ public class SelectionOperatorServiceTest { assertEquals(expressions.get(4).toString(), "foobar"); // For 'SELECT *' select only queries, should return all physical columns in alphabetical order - expressions = SelectionOperatorUtils.extractExpressions(Collections.singletonList("*"), indexSegment, null); + expressions = SelectionOperatorUtils.extractExpressions(Collections.singletonList("*"), indexSegment); assertEquals(expressions.size(), 3); assertEquals(expressions.get(0).toString(), "bar"); assertEquals(expressions.get(1).toString(), "foo"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
