This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ad0d2b173a Enhance Broker reducer to handle expression format change
(#11762)
ad0d2b173a is described below
commit ad0d2b173a67ac19bc39120179f48192c80520e4
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 11 18:21:29 2023 -0700
Enhance Broker reducer to handle expression format change (#11762)
---
.../query/reduce/AggregationDataTableReducer.java | 61 +++---
.../core/query/reduce/BrokerReduceService.java | 46 ++++-
.../query/reduce/DistinctDataTableReducer.java | 121 +++++------
.../core/query/reduce/GroupByDataTableReducer.java | 5 +-
.../core/query/reduce/ReducerDataSchemaUtils.java | 108 ++++++++++
.../query/reduce/SelectionDataTableReducer.java | 70 ++-----
.../reduce/SelectionOnlyStreamingReducer.java | 24 ++-
.../query/selection/SelectionOperatorService.java | 53 ++---
.../query/selection/SelectionOperatorUtils.java | 221 ++++++++++++---------
.../query/reduce/ReducerDataSchemaUtilsTest.java | 107 ++++++++++
.../selection/SelectionOperatorUtilsTest.java | 210 ++++++++++++++++++++
.../apache/pinot/queries/DistinctQueriesTest.java | 10 +-
.../LeafStageTransferableBlockOperator.java | 29 ++-
13 files changed, 736 insertions(+), 329 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index 89e28d1924..d3c2711e81 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -23,10 +23,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -47,9 +45,12 @@ import org.roaringbitmap.RoaringBitmap;
@SuppressWarnings({"rawtypes", "unchecked"})
public class AggregationDataTableReducer implements DataTableReducer {
private final QueryContext _queryContext;
+ private final AggregationFunction[] _aggregationFunctions;
- AggregationDataTableReducer(QueryContext queryContext) {
+ public AggregationDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
+ _aggregationFunctions = _queryContext.getAggregationFunctions();
+ assert _aggregationFunctions != null;
}
/**
@@ -59,11 +60,11 @@ public class AggregationDataTableReducer implements
DataTableReducer {
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative
brokerResponseNative,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
- assert dataSchema != null;
+ dataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext,
dataSchema);
if (dataTableMap.isEmpty()) {
DataSchema resultTableSchema =
- new PostAggregationHandler(_queryContext,
getPrePostAggregationDataSchema()).getResultDataSchema();
+ new PostAggregationHandler(_queryContext,
getPrePostAggregationDataSchema(dataSchema)).getResultDataSchema();
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema,
Collections.emptyList()));
return;
}
@@ -78,9 +79,7 @@ public class AggregationDataTableReducer implements
DataTableReducer {
private void reduceWithIntermediateResult(DataSchema dataSchema,
Collection<DataTable> dataTables,
BrokerResponseNative brokerResponseNative) {
- AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
- assert aggregationFunctions != null;
- int numAggregationFunctions = aggregationFunctions.length;
+ int numAggregationFunctions = _aggregationFunctions.length;
Object[] intermediateResults = new Object[numAggregationFunctions];
for (DataTable dataTable : dataTables) {
for (int i = 0; i < numAggregationFunctions; i++) {
@@ -100,25 +99,23 @@ public class AggregationDataTableReducer implements
DataTableReducer {
if (mergedIntermediateResult == null) {
intermediateResults[i] = intermediateResultToMerge;
} else {
- intermediateResults[i] =
aggregationFunctions[i].merge(mergedIntermediateResult,
intermediateResultToMerge);
+ intermediateResults[i] =
_aggregationFunctions[i].merge(mergedIntermediateResult,
intermediateResultToMerge);
}
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(i);
}
}
Object[] finalResults = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
- AggregationFunction aggregationFunction = aggregationFunctions[i];
+ AggregationFunction aggregationFunction = _aggregationFunctions[i];
Comparable result =
aggregationFunction.extractFinalResult(intermediateResults[i]);
finalResults[i] = result == null ? null :
aggregationFunction.getFinalResultColumnType().convert(result);
}
- brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
+
brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema),
finalResults));
}
private void reduceWithFinalResult(DataSchema dataSchema, DataTable
dataTable,
BrokerResponseNative brokerResponseNative) {
- AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
- assert aggregationFunctions != null;
- int numAggregationFunctions = aggregationFunctions.length;
+ int numAggregationFunctions = _aggregationFunctions.length;
Object[] finalResults = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
@@ -133,24 +130,23 @@ public class AggregationDataTableReducer implements
DataTableReducer {
finalResults[i] =
AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0,
i);
}
}
- brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
+ brokerResponseNative.setResultTable(reduceToResultTable(dataSchema,
finalResults));
}
/**
* Sets aggregation results into ResultsTable
*/
- private ResultTable reduceToResultTable(Object[] finalResults) {
- PostAggregationHandler postAggregationHandler =
- new PostAggregationHandler(_queryContext,
getPrePostAggregationDataSchema());
- DataSchema dataSchema = postAggregationHandler.getResultDataSchema();
+ private ResultTable reduceToResultTable(DataSchema dataSchema, Object[]
finalResults) {
+ PostAggregationHandler postAggregationHandler = new
PostAggregationHandler(_queryContext, dataSchema);
+ DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
Object[] row = postAggregationHandler.getResult(finalResults);
RewriterResult resultRewriterResult =
- ResultRewriteUtils.rewriteResult(dataSchema,
Collections.singletonList(row));
- dataSchema = resultRewriterResult.getDataSchema();
+ ResultRewriteUtils.rewriteResult(resultDataSchema,
Collections.singletonList(row));
+ resultDataSchema = resultRewriterResult.getDataSchema();
List<Object[]> rows = resultRewriterResult.getRows();
- ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ ColumnDataType[] columnDataTypes = resultDataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
for (Object[] rewrittenRow : rows) {
for (int j = 0; j < numColumns; j++) {
@@ -158,25 +154,18 @@ public class AggregationDataTableReducer implements
DataTableReducer {
}
}
- return new ResultTable(dataSchema, rows);
+ return new ResultTable(resultDataSchema, rows);
}
/**
* Constructs the DataSchema for the rows before the post-aggregation (SQL
mode).
*/
- private DataSchema getPrePostAggregationDataSchema() {
- List<Pair<AggregationFunction, FilterContext>>
filteredAggregationFunctions =
- _queryContext.getFilteredAggregationFunctions();
- assert filteredAggregationFunctions != null;
- int numColumns = filteredAggregationFunctions.size();
- String[] columnNames = new String[numColumns];
- ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
- for (int i = 0; i < numColumns; i++) {
- Pair<AggregationFunction, FilterContext> pair =
filteredAggregationFunctions.get(i);
- AggregationFunction aggregationFunction = pair.getLeft();
- columnNames[i] =
AggregationFunctionUtils.getResultColumnName(aggregationFunction,
pair.getRight());
- columnDataTypes[i] = aggregationFunction.getFinalResultColumnType();
+ private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
+ int numAggregationFunctions = _aggregationFunctions.length;
+ ColumnDataType[] columnDataTypes = new
ColumnDataType[numAggregationFunctions];
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ columnDataTypes[i] = _aggregationFunctions[i].getFinalResultColumnType();
}
- return new DataSchema(columnNames, columnDataTypes);
+ return new DataSchema(dataSchema.getColumnNames(), columnDataTypes);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index e89978625a..93cbecd5bb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -18,12 +18,16 @@
*/
package org.apache.pinot.core.query.reduce;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -38,6 +42,8 @@ import
org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -46,6 +52,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
*/
@ThreadSafe
public class BrokerReduceService extends BaseReduceService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BrokerReduceService.class);
+
public BrokerReduceService(PinotConfiguration config) {
super(config);
}
@@ -65,7 +73,9 @@ public class BrokerReduceService extends BaseReduceService {
BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
// Cache a data schema from data tables (try to cache one with data rows
associated with it).
- DataSchema cachedDataSchema = null;
+ DataSchema dataSchemaFromEmptyDataTable = null;
+ DataSchema dataSchemaFromNonEmptyDataTable = null;
+ List<ServerRoutingInstance> serversWithConflictingDataSchema = new
ArrayList<>();
// Process server response metadata.
Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator =
dataTableMap.entrySet().iterator();
@@ -83,12 +93,22 @@ public class BrokerReduceService extends BaseReduceService {
} else {
// Try to cache a data table with data rows inside, or cache one with
data schema inside.
if (dataTable.getNumberOfRows() == 0) {
- if (cachedDataSchema == null) {
- cachedDataSchema = dataSchema;
+ if (dataSchemaFromEmptyDataTable == null) {
+ dataSchemaFromEmptyDataTable = dataSchema;
}
iterator.remove();
} else {
- cachedDataSchema = dataSchema;
+ if (dataSchemaFromNonEmptyDataTable == null) {
+ dataSchemaFromNonEmptyDataTable = dataSchema;
+ } else {
+ // Remove data tables with conflicting data schema.
+ // NOTE: Only compare the column data types, since the column
names (string representation of expression)
+ // can change across different versions.
+ if (!Arrays.equals(dataSchema.getColumnDataTypes(),
dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
+ serversWithConflictingDataSchema.add(entry.getKey());
+ iterator.remove();
+ }
+ }
}
}
}
@@ -99,8 +119,23 @@ public class BrokerReduceService extends BaseReduceService {
// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
+ // Report the servers with conflicting data schema.
+ if (!serversWithConflictingDataSchema.isEmpty()) {
+ String errorMessage =
+ String.format("%s: responses for table: %s from servers: %s got
dropped due to data schema inconsistency.",
+ QueryException.MERGE_RESPONSE_ERROR.getMessage(), tableName,
serversWithConflictingDataSchema);
+ LOGGER.warn(errorMessage);
+ if (brokerMetrics != null) {
+ brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
+ }
+ brokerResponseNative.addToExceptions(
+ new
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE,
errorMessage));
+ }
+
// NOTE: When there is no cached data schema, that means all servers
encountered exception. In such case, return the
// response with metadata only.
+ DataSchema cachedDataSchema =
+ dataSchemaFromNonEmptyDataTable != null ?
dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
if (cachedDataSchema == null) {
return brokerResponseNative;
}
@@ -124,8 +159,7 @@ public class BrokerReduceService extends BaseReduceService {
if (gapfillType == null) {
throw new BadQueryRequestException("Nested query is not supported
without gapfill");
}
- BaseGapfillProcessor gapfillProcessor =
- GapfillProcessorFactory.getGapfillProcessor(queryContext,
gapfillType);
+ BaseGapfillProcessor gapfillProcessor =
GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(brokerResponseNative);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 5adc021bba..4553776963 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -19,20 +19,15 @@
package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -48,90 +43,74 @@ import org.roaringbitmap.RoaringBitmap;
public class DistinctDataTableReducer implements DataTableReducer {
private final QueryContext _queryContext;
- DistinctDataTableReducer(QueryContext queryContext) {
+ public DistinctDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
}
- /**
- * Reduces and sets results of distinct into ResultTable.
- */
@Override
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative
brokerResponseNative,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
- // DISTINCT is implemented as an aggregation function in the execution
engine. Just like
- // other aggregation functions, DISTINCT returns its result as a single
object
- // (of type DistinctTable) serialized by the server into the DataTable and
deserialized
- // by the broker from the DataTable. So there should be exactly 1 row and
1 column and that
- // column value should be the serialized DistinctTable -- so essentially
it is a DataTable
- // inside a DataTable
+ dataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(_queryContext,
dataSchema);
+ DistinctTable distinctTable =
+ new DistinctTable(dataSchema, _queryContext.getOrderByExpressions(),
_queryContext.getLimit(),
+ _queryContext.isNullHandlingEnabled());
+ if (distinctTable.hasOrderBy()) {
+ addToOrderByDistinctTable(dataSchema, dataTableMap, distinctTable);
+ } else {
+ addToNonOrderByDistinctTable(dataSchema, dataTableMap, distinctTable);
+ }
+ brokerResponseNative.setResultTable(reduceToResultTable(distinctTable));
+ }
- // Gather all non-empty DistinctTables
- // TODO: until we upgrade to newer version of pinot, we have to keep both
code path. remove after 0.12.0 release.
- // This is to work with server rolling upgrade when partially returned as
DistinctTable Obj and partially regular
- // DataTable; if all returns are DataTable we can directly merge with
priority queue (with dedup).
- List<DistinctTable> nonEmptyDistinctTables = new
ArrayList<>(dataTableMap.size());
+ private void addToOrderByDistinctTable(DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap,
+ DistinctTable distinctTable) {
for (DataTable dataTable : dataTableMap.values()) {
Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
-
- // Do not use the cached data schema because it might be either single
object (legacy) or normal data table
- dataSchema = dataTable.getDataSchema();
int numColumns = dataSchema.size();
- if (numColumns == 1 && dataSchema.getColumnDataType(0) ==
ColumnDataType.OBJECT) {
- // DistinctTable is still being returned as a single object
- CustomObject customObject = dataTable.getCustomObject(0, 0);
- assert customObject != null;
- DistinctTable distinctTable =
ObjectSerDeUtils.deserialize(customObject);
- if (!distinctTable.isEmpty()) {
- nonEmptyDistinctTables.add(distinctTable);
+ int numRows = dataTable.getNumberOfRows();
+ if (_queryContext.isNullHandlingEnabled()) {
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ for (int coldId = 0; coldId < numColumns; coldId++) {
+ nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
+ }
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ distinctTable.addWithOrderBy(new Record(
+
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable,
rowId, nullBitmaps)));
}
} else {
- // DistinctTable is being returned as normal data table
- int numRows = dataTable.getNumberOfRows();
- if (numRows > 0) {
- List<Record> records = new ArrayList<>(numRows);
- if (_queryContext.isNullHandlingEnabled()) {
- RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
- for (int coldId = 0; coldId < numColumns; coldId++) {
- nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
- }
- for (int rowId = 0; rowId < numRows; rowId++) {
- records.add(new Record(
-
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable,
rowId, nullBitmaps)));
- }
- } else {
- for (int rowId = 0; rowId < numRows; rowId++) {
- records.add(new
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)));
- }
- }
- nonEmptyDistinctTables.add(new DistinctTable(dataSchema, records));
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ distinctTable.addWithOrderBy(new
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)));
}
}
}
+ }
- if (nonEmptyDistinctTables.isEmpty()) {
- // All the DistinctTables are empty, construct an empty response
- // TODO: This returns schema with all STRING data types.
- // There's no way currently to get the data types of the distinct
columns for empty results
- List<ExpressionContext> expressions =
_queryContext.getSelectExpressions();
- int numExpressions = expressions.size();
- String[] columns = new String[numExpressions];
- for (int i = 0; i < numExpressions; i++) {
- columns[i] = expressions.get(i).toString();
- }
- ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
- Arrays.fill(columnDataTypes, ColumnDataType.STRING);
- brokerResponseNative.setResultTable(
- new ResultTable(new DataSchema(columns, columnDataTypes),
Collections.emptyList()));
- } else {
- // Construct a main DistinctTable and merge all non-empty DistinctTables
into it
- DistinctTable mainDistinctTable =
- new DistinctTable(nonEmptyDistinctTables.get(0).getDataSchema(),
_queryContext.getOrderByExpressions(),
- _queryContext.getLimit(), _queryContext.isNullHandlingEnabled());
- for (DistinctTable distinctTable : nonEmptyDistinctTables) {
- mainDistinctTable.mergeTable(distinctTable);
+ private void addToNonOrderByDistinctTable(DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap,
+ DistinctTable distinctTable) {
+ for (DataTable dataTable : dataTableMap.values()) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
+ int numColumns = dataSchema.size();
+ int numRows = dataTable.getNumberOfRows();
+ if (_queryContext.isNullHandlingEnabled()) {
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ for (int coldId = 0; coldId < numColumns; coldId++) {
+ nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
+ }
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (distinctTable.addWithoutOrderBy(new Record(
+
SelectionOperatorUtils.extractRowFromDataTableWithNullHandling(dataTable,
rowId, nullBitmaps)))) {
+ return;
+ }
+ }
+ } else {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (distinctTable.addWithoutOrderBy(
+ new
Record(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId)))) {
+ return;
+ }
+ }
}
-
brokerResponseNative.setResultTable(reduceToResultTable(mainDistinctTable));
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 654e6232a2..844d295892 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -72,7 +72,6 @@ import org.roaringbitmap.RoaringBitmap;
@SuppressWarnings({"rawtypes", "unchecked"})
public class GroupByDataTableReducer implements DataTableReducer {
private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD,
find a better value.
- private static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK = 10_000;
private final QueryContext _queryContext;
private final AggregationFunction[] _aggregationFunctions;
@@ -81,7 +80,7 @@ public class GroupByDataTableReducer implements
DataTableReducer {
private final int _numGroupByExpressions;
private final int _numColumns;
- GroupByDataTableReducer(QueryContext queryContext) {
+ public GroupByDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
_aggregationFunctions = queryContext.getAggregationFunctions();
assert _aggregationFunctions != null;
@@ -99,7 +98,7 @@ public class GroupByDataTableReducer implements
DataTableReducer {
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative
brokerResponse,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
- assert dataSchema != null;
+ dataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(_queryContext,
dataSchema);
if (dataTableMap.isEmpty()) {
PostAggregationHandler postAggregationHandler =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtils.java
new file mode 100644
index 0000000000..7bd751877d
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+@SuppressWarnings("rawtypes")
+public class ReducerDataSchemaUtils {
+ private ReducerDataSchemaUtils() {
+ }
+
+ /**
+ * Returns the canonical data schema of the aggregation result based on the
query and the data schema returned from
+ * the server.
+ * <p>Column names are re-generated in the canonical data schema to avoid
the backward incompatibility caused by
+ * changing the string representation of the expression.
+ */
+ public static DataSchema canonicalizeDataSchemaForAggregation(QueryContext
queryContext, DataSchema dataSchema) {
+ List<Pair<AggregationFunction, FilterContext>>
filteredAggregationFunctions =
+ queryContext.getFilteredAggregationFunctions();
+ assert filteredAggregationFunctions != null;
+ int numAggregations = filteredAggregationFunctions.size();
+ Preconditions.checkState(dataSchema.size() == numAggregations,
+ "BUG: Expect same number of aggregations and columns in data schema,
got %s aggregations, %s columns in data "
+ + "schema", numAggregations, dataSchema.size());
+ String[] columnNames = new String[numAggregations];
+ for (int i = 0; i < numAggregations; i++) {
+ Pair<AggregationFunction, FilterContext> pair =
filteredAggregationFunctions.get(i);
+ AggregationFunction aggregationFunction = pair.getLeft();
+ columnNames[i] =
AggregationFunctionUtils.getResultColumnName(aggregationFunction,
pair.getRight());
+ }
+ return new DataSchema(columnNames, dataSchema.getColumnDataTypes());
+ }
+
+ /**
+ * Returns the canonical data schema of the group-by result based on the
query and the data schema returned from the
+ * server. Group-by expressions are always at the beginning of the data
schema, followed by the aggregations.
+ * <p>Column names are re-generated in the canonical data schema to avoid
the backward incompatibility caused by
+ * changing the string representation of the expression.
+ */
+ public static DataSchema canonicalizeDataSchemaForGroupBy(QueryContext
queryContext, DataSchema dataSchema) {
+ List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
+ List<Pair<AggregationFunction, FilterContext>>
filteredAggregationFunctions =
+ queryContext.getFilteredAggregationFunctions();
+ assert groupByExpressions != null && filteredAggregationFunctions != null;
+ int numGroupByExpression = groupByExpressions.size();
+ int numAggregations = filteredAggregationFunctions.size();
+ int numColumns = numGroupByExpression + numAggregations;
+ String[] columnNames = new String[numColumns];
+ Preconditions.checkState(dataSchema.size() == numColumns,
+ "BUG: Expect same number of group-by expressions, aggregations and
columns in data schema, got %s group-by "
+ + "expressions, %s aggregations, %s columns in data schema",
numGroupByExpression, numAggregations,
+ dataSchema.size());
+ for (int i = 0; i < numGroupByExpression; i++) {
+ columnNames[i] = groupByExpressions.get(i).toString();
+ }
+ for (int i = 0; i < numAggregations; i++) {
+ Pair<AggregationFunction, FilterContext> pair =
filteredAggregationFunctions.get(i);
+ columnNames[numGroupByExpression + i] =
+ AggregationFunctionUtils.getResultColumnName(pair.getLeft(),
pair.getRight());
+ }
+ return new DataSchema(columnNames, dataSchema.getColumnDataTypes());
+ }
+
+ /**
+ * Returns the canonical data schema of the distinct result based on the
query and the data schema returned from the
+ * server.
+ * <p>Column names are re-generated in the canonical data schema to avoid
the backward incompatibility caused by
+ * changing the string representation of the expression.
+ */
+ public static DataSchema canonicalizeDataSchemaForDistinct(QueryContext
queryContext, DataSchema dataSchema) {
+ List<ExpressionContext> selectExpressions =
queryContext.getSelectExpressions();
+ int numSelectExpressions = selectExpressions.size();
+ Preconditions.checkState(dataSchema.size() == numSelectExpressions,
+ "BUG: Expect same number of columns in SELECT clause and data schema,
got %s in SELECT clause, %s in data "
+ + "schema", numSelectExpressions, dataSchema.size());
+ String[] columnNames = new String[numSelectExpressions];
+ for (int i = 0; i < numSelectExpressions; i++) {
+ columnNames[i] = selectExpressions.get(i).toString();
+ }
+ return new DataSchema(columnNames, dataSchema.getColumnDataTypes());
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index a21684acdf..a8226aee5e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -18,38 +18,28 @@
*/
package org.apache.pinot.core.query.reduce;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorService;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Helper class to reduce and set Selection results into the
BrokerResponseNative
*/
public class SelectionDataTableReducer implements DataTableReducer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SelectionDataTableReducer.class);
-
private final QueryContext _queryContext;
- SelectionDataTableReducer(QueryContext queryContext) {
+ public SelectionDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
}
@@ -60,55 +50,25 @@ public class SelectionDataTableReducer implements
DataTableReducer {
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative
brokerResponseNative,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
- if (dataTableMap.isEmpty()) {
- // For empty data table map, construct empty result using the cached
data schema for selection query
- List<String> selectionColumns =
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
- DataSchema selectionDataSchema =
SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns);
- brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema,
Collections.emptyList()));
+ Pair<DataSchema, int[]> pair =
+
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(_queryContext,
dataSchema);
+ int limit = _queryContext.getLimit();
+ if (dataTableMap.isEmpty() || limit == 0) {
+ brokerResponseNative.setResultTable(new ResultTable(pair.getLeft(),
Collections.emptyList()));
return;
}
-
- // For data table map with more than one data tables, remove conflicting
data tables
- if (dataTableMap.size() > 1) {
- DataSchema.ColumnDataType[] columnDataTypes =
dataSchema.getColumnDataTypes();
- List<ServerRoutingInstance> droppedServers = new ArrayList<>();
- Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator =
dataTableMap.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
- DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
- assert dataSchemaToCompare != null;
- if (!Arrays.equals(columnDataTypes,
dataSchemaToCompare.getColumnDataTypes())) {
- droppedServers.add(entry.getKey());
- iterator.remove();
- }
- }
- if (!droppedServers.isEmpty()) {
- String errorMessage =
- QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses
for table: " + tableName + " from servers: "
- + droppedServers + " got dropped due to data schema
inconsistency.";
- LOGGER.warn(errorMessage);
- if (brokerMetrics != null) {
-
brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
- BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
- }
- brokerResponseNative.addToExceptions(
- new
QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE,
errorMessage));
- }
- }
-
- int limit = _queryContext.getLimit();
- if (limit > 0 && _queryContext.getOrderByExpressions() != null) {
- // Selection order-by
- SelectionOperatorService selectionService = new
SelectionOperatorService(_queryContext, dataSchema);
- selectionService.reduceWithOrdering(dataTableMap.values(),
_queryContext.isNullHandlingEnabled());
-
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
- } else {
+ if (_queryContext.getOrderByExpressions() == null) {
// Selection only
- List<String> selectionColumns =
SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
List<Object[]> reducedRows =
SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit,
_queryContext.isNullHandlingEnabled());
brokerResponseNative.setResultTable(
- SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows,
dataSchema, selectionColumns));
+ SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows,
pair.getLeft(), pair.getRight()));
+ } else {
+ // Selection order-by
+ SelectionOperatorService selectionService =
+ new SelectionOperatorService(_queryContext, pair.getLeft(),
pair.getRight());
+ selectionService.reduceWithOrdering(dataTableMap.values());
+
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
index cda1ad2b87..ffa0ffdbd0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
@@ -58,7 +59,7 @@ public class SelectionOnlyStreamingReducer implements
StreamingReducer {
int numColumns = dataTable.getDataSchema().size();
int numRows = dataTable.getNumberOfRows();
if (nullHandlingEnabled) {
- RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];;
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
for (int coldId = 0; coldId < numColumns; coldId++) {
nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
}
@@ -88,16 +89,19 @@ public class SelectionOnlyStreamingReducer implements
StreamingReducer {
@Override
public BrokerResponseNative seal() {
- BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
- List<String> selectionColumns =
SelectionOperatorUtils.getSelectionColumns(_queryContext, _dataSchema);
- if (_dataSchema != null && _rows.size() > 0) {
- brokerResponseNative.setResultTable(
- SelectionOperatorUtils.renderResultTableWithoutOrdering(_rows,
_dataSchema, selectionColumns));
+ if (_dataSchema == null) {
+ return BrokerResponseNative.empty();
+ }
+ Pair<DataSchema, int[]> pair =
+
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(_queryContext,
_dataSchema);
+ ResultTable resultTable;
+ if (_rows.isEmpty()) {
+ resultTable = new ResultTable(pair.getLeft(), Collections.emptyList());
} else {
- // For empty data table map, construct empty result using the cached
data schema for selection query
- DataSchema selectionDataSchema =
SelectionOperatorUtils.getResultTableDataSchema(_dataSchema, selectionColumns);
- brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema,
Collections.emptyList()));
+ resultTable =
SelectionOperatorUtils.renderResultTableWithoutOrdering(_rows, pair.getLeft(),
pair.getRight());
}
- return brokerResponseNative;
+ BrokerResponseNative brokerResponse = new BrokerResponseNative();
+ brokerResponse.setResultTable(resultTable);
+ return brokerResponse;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 688c7bbf49..7db96f5224 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.query.selection;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.PriorityQueue;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.response.broker.ResultTable;
@@ -58,22 +57,16 @@ import org.roaringbitmap.RoaringBitmap;
*/
public class SelectionOperatorService {
private final QueryContext _queryContext;
- private final List<String> _selectionColumns;
private final DataSchema _dataSchema;
+ private final int[] _columnIndices;
private final int _offset;
private final int _numRowsToKeep;
private final PriorityQueue<Object[]> _rows;
- /**
- * Constructor for <code>SelectionOperatorService</code> with {@link
DataSchema}. (Broker side)
- *
- * @param queryContext Selection order-by query
- * @param dataSchema data schema.
- */
- public SelectionOperatorService(QueryContext queryContext, DataSchema
dataSchema) {
+ public SelectionOperatorService(QueryContext queryContext, DataSchema
dataSchema, int[] columnIndices) {
_queryContext = queryContext;
- _selectionColumns =
SelectionOperatorUtils.getSelectionColumns(queryContext, dataSchema);
_dataSchema = dataSchema;
+ _columnIndices = columnIndices;
// Select rows from offset to offset + limit.
_offset = queryContext.getOffset();
_numRowsToKeep = _offset + queryContext.getLimit();
@@ -83,25 +76,15 @@ public class SelectionOperatorService {
_queryContext.isNullHandlingEnabled()).reversed());
}
- /**
- * Get the selection results.
- *
- * @return selection results.
- */
- public PriorityQueue<Object[]> getRows() {
- return _rows;
- }
-
/**
* Reduces a collection of {@link DataTable}s to selection rows for
selection queries with <code>ORDER BY</code>.
- * (Broker side)
* TODO: Do merge sort after releasing 0.13.0 when server side results are
sorted
* Can also consider adding a data table metadata to indicate whether
the server side results are sorted
*/
- public void reduceWithOrdering(Collection<DataTable> dataTables, boolean
nullHandlingEnabled) {
+ public void reduceWithOrdering(Collection<DataTable> dataTables) {
for (DataTable dataTable : dataTables) {
int numRows = dataTable.getNumberOfRows();
- if (nullHandlingEnabled) {
+ if (_queryContext.isNullHandlingEnabled()) {
RoaringBitmap[] nullBitmaps = new
RoaringBitmap[dataTable.getDataSchema().size()];
for (int colId = 0; colId < nullBitmaps.length; colId++) {
nullBitmaps[colId] = dataTable.getNullRowIds(colId);
@@ -127,32 +110,24 @@ public class SelectionOperatorService {
}
/**
- * Render the selection rows to a {@link ResultTable} object for selection
queries with <code>ORDER BY</code>.
- * (Broker side)
- * <p>{@link ResultTable} object will be used to build the broker response.
- * <p>Should be called after method "reduceWithOrdering()".
+ * Renders the selection rows to a {@link ResultTable} object for selection
queries with <code>ORDER BY</code>.
*/
public ResultTable renderResultTableWithOrdering() {
- int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
- int numColumns = columnIndices.length;
- DataSchema resultDataSchema =
SelectionOperatorUtils.getSchemaForProjection(_dataSchema, columnIndices);
-
- // Extract the result rows
- LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
+ LinkedList<Object[]> resultRows = new LinkedList<>();
+ DataSchema.ColumnDataType[] columnDataTypes =
_dataSchema.getColumnDataTypes();
+ int numColumns = columnDataTypes.length;
while (_rows.size() > _offset) {
Object[] row = _rows.poll();
assert row != null;
- Object[] extractedRow = new Object[numColumns];
+ Object[] resultRow = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
- Object value = row[columnIndices[i]];
+ Object value = row[_columnIndices[i]];
if (value != null) {
- extractedRow[i] =
resultDataSchema.getColumnDataType(i).convertAndFormat(value);
+ resultRow[i] = columnDataTypes[i].convertAndFormat(value);
}
}
-
- rowsInSelectionResults.addFirst(extractedRow);
+ resultRows.addFirst(resultRow);
}
-
- return new ResultTable(resultDataSchema, rowsInSelectionResults);
+ return new ResultTable(_dataSchema, resultRows);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 923c744800..19b6c7f954 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.query.selection;
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -25,19 +27,17 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -167,26 +167,120 @@ public class SelectionOperatorUtils {
}
/**
- * Constructs the final selection DataSchema based on the order of selection
columns (data schema can have a
- * different order, depending on order by clause)
- * @param dataSchema data schema used for execution and ordering
- * @param selectionColumns the selection order
- * @return data schema for final results
+ * Returns the data schema and column indices of the final selection results
based on the query and the data schema of
+ * the server response. See {@link #extractExpressions} for the column
orders on the server side.
+ * NOTE: DO NOT rely on column name lookup across query context and data
schema because the string representation of
+ * expression can change, which will cause backward incompatibility.
*/
- public static DataSchema getResultTableDataSchema(DataSchema dataSchema,
List<String> selectionColumns) {
- Map<String, ColumnDataType> columnNameToDataType = new HashMap<>();
- String[] columnNames = dataSchema.getColumnNames();
- ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
- int numColumns = columnNames.length;
- for (int i = 0; i < numColumns; i++) {
- columnNameToDataType.put(columnNames[i], columnDataTypes[i]);
+ public static Pair<DataSchema, int[]>
getResultTableDataSchemaAndColumnIndices(QueryContext queryContext,
+ DataSchema dataSchema) {
+ List<ExpressionContext> selectExpressions =
queryContext.getSelectExpressions();
+ int numSelectExpressions = selectExpressions.size();
+ ColumnDataType[] columnDataTypesInDataSchema =
dataSchema.getColumnDataTypes();
+ int numColumnsInDataSchema = columnDataTypesInDataSchema.length;
+
+ // No order-by expression
+ // NOTE: Order-by expressions are ignored for queries with LIMIT 0.
+ List<OrderByExpressionContext> orderByExpressions =
queryContext.getOrderByExpressions();
+ if (orderByExpressions == null || queryContext.getLimit() == 0) {
+ // For 'SELECT *', use the server response data schema as the final
results data schema.
+ if ((numSelectExpressions == 1 &&
selectExpressions.get(0).equals(IDENTIFIER_STAR))) {
+ int[] columnIndices = new int[numColumnsInDataSchema];
+ for (int i = 0; i < numColumnsInDataSchema; i++) {
+ columnIndices[i] = i;
+ }
+ return Pair.of(dataSchema, columnIndices);
+ }
+
+ // For select without duplicate columns, the order of the final
selection columns is the same as the order of the
+ // columns in the data schema.
+ if (numSelectExpressions == numColumnsInDataSchema) {
+ String[] columnNames = new String[numSelectExpressions];
+ int[] columnIndices = new int[numSelectExpressions];
+ for (int i = 0; i < numSelectExpressions; i++) {
+ columnNames[i] = selectExpressions.get(i).toString();
+ columnIndices[i] = i;
+ }
+ return Pair.of(new DataSchema(columnNames,
columnDataTypesInDataSchema), columnIndices);
+ }
+
+ // For select with duplicate columns, construct a map from expression to
index with the same order as the data
+ // schema, then look up the selection expressions.
+ Object2IntOpenHashMap<ExpressionContext> expressionIndexMap = new
Object2IntOpenHashMap<>(numColumnsInDataSchema);
+ for (ExpressionContext selectExpression : selectExpressions) {
+ expressionIndexMap.putIfAbsent(selectExpression,
expressionIndexMap.size());
+ }
+ Preconditions.checkState(expressionIndexMap.size() ==
numColumnsInDataSchema,
+ "BUG: Expect same number of deduped columns in SELECT clause and in
data schema, got %s before dedup and %s"
+ + " after dedup in SELECT clause, %s in data schema",
numSelectExpressions, expressionIndexMap.size(),
+ numColumnsInDataSchema);
+ String[] columnNames = new String[numSelectExpressions];
+ ColumnDataType[] columnDataTypes = new
ColumnDataType[numSelectExpressions];
+ int[] columnIndices = new int[numSelectExpressions];
+ for (int i = 0; i < numSelectExpressions; i++) {
+ ExpressionContext selectExpression = selectExpressions.get(i);
+ int columnIndex = expressionIndexMap.getInt(selectExpression);
+ columnNames[i] = selectExpression.toString();
+ columnDataTypes[i] = columnDataTypesInDataSchema[columnIndex];
+ columnIndices[i] = columnIndex;
+ }
+ return Pair.of(new DataSchema(columnNames, columnDataTypes),
columnIndices);
+ }
+
+ // For 'SELECT *' with order-by, exclude transform functions from the
returned columns and sort.
+ if (numSelectExpressions == 1 &&
selectExpressions.get(0).equals(IDENTIFIER_STAR)) {
+ String[] columnNamesInDataSchema = dataSchema.getColumnNames();
+ List<Integer> columnIndexList = new
ArrayList<>(columnNamesInDataSchema.length);
+ for (int i = 0; i < columnNamesInDataSchema.length; i++) {
+ if (columnNamesInDataSchema[i].indexOf('(') == -1) {
+ columnIndexList.add(i);
+ }
+ }
+ columnIndexList.sort(Comparator.comparing(o ->
columnNamesInDataSchema[o]));
+ int numColumns = columnIndexList.size();
+ String[] columnNames = new String[numColumns];
+ ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
+ int[] columnIndices = new int[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ int columnIndex = columnIndexList.get(i);
+ columnNames[i] = columnNamesInDataSchema[columnIndex];
+ columnDataTypes[i] = columnDataTypesInDataSchema[columnIndex];
+ columnIndices[i] = columnIndex;
+ }
+ return Pair.of(new DataSchema(columnNames, columnDataTypes),
columnIndices);
+ }
+
+ // For other order-by queries, construct a map from expression to index
with the same order as the data schema,
+ // then look up the selection expressions.
+ Object2IntOpenHashMap<ExpressionContext> expressionIndexMap = new
Object2IntOpenHashMap<>(numColumnsInDataSchema);
+ // NOTE: Order-by expressions are already deduped in QueryContext.
+ for (OrderByExpressionContext orderByExpression : orderByExpressions) {
+ expressionIndexMap.put(orderByExpression.getExpression(),
expressionIndexMap.size());
+ }
+ for (ExpressionContext selectExpression : selectExpressions) {
+ expressionIndexMap.putIfAbsent(selectExpression,
expressionIndexMap.size());
}
- int numResultColumns = selectionColumns.size();
- ColumnDataType[] finalColumnDataTypes = new
ColumnDataType[numResultColumns];
- for (int i = 0; i < numResultColumns; i++) {
- finalColumnDataTypes[i] =
columnNameToDataType.get(selectionColumns.get(i));
+ String[] columnNames = new String[numSelectExpressions];
+ ColumnDataType[] columnDataTypes = new
ColumnDataType[numSelectExpressions];
+ int[] columnIndices = new int[numSelectExpressions];
+ if (expressionIndexMap.size() == numColumnsInDataSchema) {
+ for (int i = 0; i < numSelectExpressions; i++) {
+ ExpressionContext selectExpression = selectExpressions.get(i);
+ int columnIndex = expressionIndexMap.getInt(selectExpression);
+ columnNames[i] = selectExpression.toString();
+ columnDataTypes[i] = columnDataTypesInDataSchema[columnIndex];
+ columnIndices[i] = columnIndex;
+ }
+ } else {
+ // When all segments are pruned on the server side, the data schema will
only contain the columns in the SELECT
+ // clause, and data type for all columns are set to STRING. See
ResultBlocksUtils for details.
+ for (int i = 0; i < numSelectExpressions; i++) {
+ columnNames[i] = selectExpressions.get(i).toString();
+ columnDataTypes[i] = ColumnDataType.STRING;
+ columnIndices[i] = i;
+ }
}
- return new DataSchema(selectionColumns.toArray(new String[0]),
finalColumnDataTypes);
+ return Pair.of(new DataSchema(columnNames, columnDataTypes),
columnIndices);
}
/**
@@ -478,82 +572,26 @@ public class SelectionOperatorUtils {
}
/**
- * Render the selection rows to a {@link ResultTable} object
- * for selection queries without <code>ORDER BY</code>
- * <p>{@link ResultTable} object will be used to set in the broker response.
- * <p>Should be called after method "reduceWithoutOrdering()".
- *
- * @param rows selection rows.
- * @param dataSchema data schema.
- * @param selectionColumns selection columns.
- * @return {@link ResultTable} object results.
+ * Renders the selection rows to a {@link ResultTable} object for selection
queries without <code>ORDER BY</code>.
+ * (Broker side)
*/
public static ResultTable renderResultTableWithoutOrdering(List<Object[]>
rows, DataSchema dataSchema,
- List<String> selectionColumns) {
+ int[] columnIndices) {
int numRows = rows.size();
List<Object[]> resultRows = new ArrayList<>(numRows);
-
- DataSchema resultDataSchema = dataSchema;
- Map<String, Integer> columnNameToIndexMap = null;
- if (dataSchema.getColumnNames().length != selectionColumns.size()) {
- // Create updated data schema since one column can be selected multiple
times.
- columnNameToIndexMap = new
HashMap<>(HashUtil.getHashMapCapacity(dataSchema.getColumnNames().length));
- String[] columnNames = dataSchema.getColumnNames();
- ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
- for (int i = 0; i < columnNames.length; i++) {
- columnNameToIndexMap.put(columnNames[i], i);
- }
-
- ColumnDataType[] newColumnDataTypes = new
ColumnDataType[selectionColumns.size()];
- for (int i = 0; i < newColumnDataTypes.length; i++) {
- int index = columnNameToIndexMap.get(selectionColumns.get(i));
- newColumnDataTypes[i] = columnDataTypes[index];
- }
-
- resultDataSchema = new DataSchema(selectionColumns.toArray(new
String[0]), newColumnDataTypes);
- }
-
- int numColumns = resultDataSchema.getColumnNames().length;
- ColumnDataType[] resultColumnDataTypes =
resultDataSchema.getColumnDataTypes();
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ int numColumns = columnDataTypes.length;
for (Object[] row : rows) {
Object[] resultRow = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
- int index = (columnNameToIndexMap != null) ?
columnNameToIndexMap.get(selectionColumns.get(i)) : i;
- Object value = row[index];
+ Object value = row[columnIndices[i]];
if (value != null) {
- resultRow[i] = resultColumnDataTypes[i].convertAndFormat(value);
+ resultRow[i] = columnDataTypes[i].convertAndFormat(value);
}
}
resultRows.add(resultRow);
}
-
- return new ResultTable(resultDataSchema, resultRows);
- }
-
- /**
- * Helper method to compute column indices from selection columns and the
data schema for selection queries
- * @param selectionColumns selection columns.
- * @param dataSchema data schema.
- * @return column indices
- */
- public static int[] getColumnIndices(List<String> selectionColumns,
DataSchema dataSchema) {
- String[] columnNames = dataSchema.getColumnNames();
- Map<String, Integer> columnToIndexMap = getColumnToIndexMap(columnNames);
- int numSelectionColumns = selectionColumns.size();
- int[] columnIndices = new int[numSelectionColumns];
- for (int i = 0; i < numSelectionColumns; i++) {
- columnIndices[i] = columnToIndexMap.get(selectionColumns.get(i));
- }
- return columnIndices;
- }
-
- public static Map<String, Integer> getColumnToIndexMap(String[] columns) {
- Map<String, Integer> columnToIndexMap = new HashMap<>();
- int numColumns = columns.length;
- for (int i = 0; i < numColumns; i++) {
- columnToIndexMap.put(columns[i], i);
- }
- return columnToIndexMap;
+ return new ResultTable(dataSchema, resultRows);
}
/**
@@ -572,19 +610,4 @@ public class SelectionOperatorUtils {
queue.offer(value);
}
}
-
- public static DataSchema getSchemaForProjection(DataSchema dataSchema, int[]
columnIndices) {
- int numColumns = columnIndices.length;
-
- String[] columnNames = dataSchema.getColumnNames();
- ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
- String[] resultColumnNames = new String[numColumns];
- ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
- for (int i = 0; i < numColumns; i++) {
- int columnIndex = columnIndices[i];
- resultColumnNames[i] = columnNames[columnIndex];
- resultColumnDataTypes[i] = columnDataTypes[columnIndex];
- }
- return new DataSchema(resultColumnNames, resultColumnDataTypes);
- }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtilsTest.java
new file mode 100644
index 0000000000..d21d4f2779
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ReducerDataSchemaUtilsTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ReducerDataSchemaUtilsTest {
+
+ @Test
+ public void testCanonicalizeDataSchemaForAggregation() {
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT SUM(col1 + col2) FROM
testTable");
+ // Intentionally make data schema not matching the string representation
of the expression
+ DataSchema dataSchema = new DataSchema(new String[]{"sum(col1+col2)"}, new
ColumnDataType[]{ColumnDataType.DOUBLE});
+ DataSchema canonicalDataSchema =
+
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(queryContext,
dataSchema);
+ assertEquals(canonicalDataSchema,
+ new DataSchema(new String[]{"sum(plus(col1,col2))"}, new
ColumnDataType[]{ColumnDataType.DOUBLE}));
+
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT SUM(col1
+ 1), MIN(col2 + 2) FROM testTable");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"sum(col1+1)", "min(col2+2)"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+ canonicalDataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(queryContext,
dataSchema);
+ assertEquals(canonicalDataSchema, new DataSchema(new
String[]{"sum(plus(col1,'1'))", "min(plus(col2,'2'))"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}));
+
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT MAX(col1 + 1) FILTER(WHERE col3 > 0) - MIN(col2 + 2)
FILTER(WHERE col4 > 0) FROM testTable");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"max(col1+1)", "min(col2+2)"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+ canonicalDataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(queryContext,
dataSchema);
+ assertEquals(canonicalDataSchema, new DataSchema(
+ new String[]{"max(plus(col1,'1')) FILTER(WHERE col3 > '0')",
"min(plus(col2,'2')) FILTER(WHERE col4 > '0')"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}));
+ }
+
+ @Test
+ public void testCanonicalizeDataSchemaForGroupBy() {
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT SUM(col1 + col2) FROM testTable GROUP BY col3 + col4 ORDER BY
col3 + col4");
+ // Intentionally make data schema not matching the string representation
of the expression
+ DataSchema dataSchema = new DataSchema(new String[]{"add(col3+col4)",
"sum(col1+col2)"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+ DataSchema canonicalDataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(queryContext,
dataSchema);
+ assertEquals(canonicalDataSchema, new DataSchema(new
String[]{"plus(col3,col4)", "sum(plus(col1,col2))"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}));
+
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT SUM(col1 + 1), MIN(col2 + 2), col4 FROM testTable GROUP BY
col3, col4");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"col3", "col4", "sum(col1+1)",
"min(col2+2)"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+ canonicalDataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(queryContext,
dataSchema);
+ assertEquals(canonicalDataSchema,
+ new DataSchema(new String[]{"col3", "col4", "sum(plus(col1,'1'))",
"min(plus(col2,'2'))"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE
+ }));
+
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT col3 + col4, MAX(col1 + 1) FILTER(WHERE col3 > 0) - MIN(col2 +
2) FILTER(WHERE col4 > 0) FROM "
+ + "testTable GROUP BY col3 + col4");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col3+col4)", "max(col1+1)",
"min(col2+2)"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE});
+ canonicalDataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(queryContext,
dataSchema);
+ assertEquals(canonicalDataSchema, new DataSchema(new String[]{
+ "plus(col3,col4)", "max(plus(col1,'1')) FILTER(WHERE col3 > '0')",
+ "min(plus(col2,'2')) FILTER" + "(WHERE col4 > '0')"
+ }, new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE}));
+ }
+
+ @Test
+ public void testCanonicalizeDataSchemaForDistinct() {
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT DISTINCT col1, col2
+ col3 FROM testTable");
+ // Intentionally make data schema not matching the string representation
of the expression
+ DataSchema dataSchema = new DataSchema(new String[]{"col1",
"add(col2+col3)"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.DOUBLE});
+ DataSchema canonicalDataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(queryContext,
dataSchema);
+ assertEquals(canonicalDataSchema, new DataSchema(new String[]{"col1",
"plus(col2,col3)"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.DOUBLE}));
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorUtilsTest.java
new file mode 100644
index 0000000000..196f4dd168
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorUtilsTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.selection;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SelectionOperatorUtilsTest {
+
+ @Test
+ public void testGetResultTableColumnIndices() {
+ // Select * without order-by
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+ DataSchema dataSchema = new DataSchema(new String[]{"col1", "col2",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+ });
+ Pair<DataSchema, int[]> pair =
+
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{0, 1, 2});
+
+ // Select * without order-by, all the segments are pruned on the server
side
+ dataSchema = new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING});
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING}));
+
+ // Select * with order-by but LIMIT 0
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM
testTable ORDER BY col1 LIMIT 0");
+ dataSchema = new DataSchema(new String[]{"col1", "col2", "col3"}, new
ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{0, 1, 2});
+
+ // Select * with order-by but LIMIT 0, all the segments are pruned on the
server side
+ dataSchema = new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING});
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING}));
+
+ // Select columns without order-by
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT col1 +
1, col2 + 2 FROM testTable");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)"},
new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')",
"plus(col2,'2')"}, new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{0, 1});
+
+ // Select columns without order-by, all the segments are pruned on the
server side
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)"},
new ColumnDataType[]{
+ ColumnDataType.STRING, ColumnDataType.STRING
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')",
"plus(col2,'2')"}, new ColumnDataType[]{
+ ColumnDataType.STRING, ColumnDataType.STRING
+ }));
+
+ // Select duplicate columns without order-by
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT col1 +
1, col2 + 2, col1 + 1 FROM testTable");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)"},
new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(),
+ new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')",
"plus(col1,'1')"}, new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{0, 1, 0});
+
+ // Select duplicate columns without order-by, all the segments are pruned
on the server side
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)",
"add(col1+1)"}, new ColumnDataType[]{
+ ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(),
+ new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')",
"plus(col1,'1')"}, new ColumnDataType[]{
+ ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+ }));
+
+ // Select * with order-by
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM
testTable ORDER BY col3");
+ dataSchema = new DataSchema(new String[]{"col3", "col1", "col2"}, new
ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.LONG
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{1, 2, 0});
+
+ // Select * with order-by, all the segments are pruned on the server side
+ dataSchema = new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING});
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING}));
+
+ // Select * ordering on function
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM
testTable ORDER BY col1 + col2");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+col2)", "col1", "col2",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.LONG,
ColumnDataType.DOUBLE
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{1, 2, 3});
+
+ // Select * ordering on function, all the segments are pruned on the
server side
+ dataSchema = new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING});
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING}));
+
+ // Select * ordering on both column and function
+ queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM
testTable ORDER BY col1 + col2, col2");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+col2)", "col2", "col1",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.LONG, ColumnDataType.INT,
ColumnDataType.DOUBLE
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"col1", "col2",
"col3"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{2, 1, 3});
+
+ // Select * ordering on both column and function, all the segments are
pruned on the server side
+ dataSchema = new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING});
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"*"}, new
ColumnDataType[]{ColumnDataType.STRING}));
+
+ // Select columns with order-by
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT col1 + 1, col3, col2 + 2 FROM testTable ORDER BY col2 + 2,
col4");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col2+2)", "col4",
"add(col1+1)", "col3"}, new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')",
"col3", "plus(col2,'2')"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE}));
+ assertEquals(pair.getRight(), new int[]{2, 3, 0});
+
+ // Select columns with order-by, all the segments are pruned on the server
side
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+1)", "col3",
"add(col2+2)"}, new ColumnDataType[]{
+ ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(), new DataSchema(new String[]{"plus(col1,'1')",
"col3", "plus(col2,'2')"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING,
ColumnDataType.STRING}));
+
+ // Select duplicate columns with order-by
+ queryContext = QueryContextConverterUtils.getQueryContext(
+ "SELECT col1 + 1, col2 + 2, col1 + 1 FROM testTable ORDER BY col2 + 2,
col4");
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col2+2)", "col4",
"add(col1+1)"}, new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.DOUBLE
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(),
+ new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')",
"plus(col1,'1')"}, new ColumnDataType[]{
+ ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+ }));
+ assertEquals(pair.getRight(), new int[]{2, 0, 2});
+
+ // Select duplicate columns with order-by, all the segments are pruned on
the server side
+ // Intentionally make data schema not matching the string representation
of the expression
+ dataSchema = new DataSchema(new String[]{"add(col1+1)", "add(col2+2)",
"add(col1+1)"}, new ColumnDataType[]{
+ ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+ });
+ pair =
SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(queryContext,
dataSchema);
+ assertEquals(pair.getLeft(),
+ new DataSchema(new String[]{"plus(col1,'1')", "plus(col2,'2')",
"plus(col1,'1')"}, new ColumnDataType[]{
+ ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING
+ }));
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 0f8da45821..4c458b7979 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -1433,9 +1433,10 @@ public class DistinctQueriesTest extends BaseQueriesTest
{
{
ResultTable resultTable = getBrokerResponse(queries[7]).getResultTable();
- // Check data schema, where data type should be STRING for all columns
+ // Check data schema
+ // NOTE: Segment pruner is not wired up in QueriesTest, and the correct
column data types should be returned.
DataSchema expectedDataSchema = new DataSchema(new
String[]{"floatColumn", "longMVColumn"},
- new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+ new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.LONG});
assertEquals(resultTable.getDataSchema(), expectedDataSchema);
// Check values, where no record should be returned
@@ -1589,9 +1590,10 @@ public class DistinctQueriesTest extends BaseQueriesTest
{
{
ResultTable resultTable =
getBrokerResponse(queries[13]).getResultTable();
- // Check data schema, where data type should be STRING for all columns
+ // Check data schema
+ // NOTE: Segment pruner is not wired up in QueriesTest, and the correct
column data types should be returned.
DataSchema expectedDataSchema = new DataSchema(new
String[]{"floatColumn", "rawLongMVColumn"},
- new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+ new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.LONG});
assertEquals(resultTable.getDataSchema(), expectedDataSchema);
// Check values, where no record should be returned
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index a81ceb8717..4e690e4041 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -35,6 +36,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -45,7 +47,7 @@ import
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
import org.apache.pinot.core.query.request.ServerQueryRequest;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
@@ -281,13 +283,9 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
/**
* For selection, we need to check if the columns are in order. If not, we
need to re-arrange the columns.
*/
- @SuppressWarnings("ConstantConditions")
private static TransferableBlock
composeSelectTransferableBlock(SelectionResultsBlock resultsBlock,
DataSchema desiredDataSchema) {
- DataSchema resultSchema = resultsBlock.getDataSchema();
- List<String> selectionColumns =
-
SelectionOperatorUtils.getSelectionColumns(resultsBlock.getQueryContext(),
resultSchema);
- int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+ int[] columnIndices = getColumnIndices(resultsBlock);
if (!inOrder(columnIndices)) {
return composeColumnIndexedTransferableBlock(resultsBlock,
desiredDataSchema, columnIndices);
} else {
@@ -295,6 +293,25 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
}
}
+ private static int[] getColumnIndices(SelectionResultsBlock resultsBlock) {
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ assert dataSchema != null;
+ String[] columnNames = dataSchema.getColumnNames();
+ Object2IntOpenHashMap<String> columnIndexMap = new
Object2IntOpenHashMap<>(columnNames.length);
+ for (int i = 0; i < columnNames.length; i++) {
+ columnIndexMap.put(columnNames[i], i);
+ }
+ QueryContext queryContext = resultsBlock.getQueryContext();
+ assert queryContext != null;
+ List<ExpressionContext> selectExpressions =
queryContext.getSelectExpressions();
+ int numSelectExpressions = selectExpressions.size();
+ int[] columnIndices = new int[numSelectExpressions];
+ for (int i = 0; i < numSelectExpressions; i++) {
+ columnIndices[i] =
columnIndexMap.getInt(selectExpressions.get(i).toString());
+ }
+ return columnIndices;
+ }
+
private static boolean inOrder(int[] columnIndices) {
for (int i = 0; i < columnIndices.length; i++) {
if (columnIndices[i] != i) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]