This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 784860e3b4 Remove BrokerResponse from QueryContext (#8643)
784860e3b4 is described below
commit 784860e3b4b096c5233d051d9dbff62ec7310665
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed May 4 18:16:23 2022 -0700
Remove BrokerResponse from QueryContext (#8643)
---
.../pinot/sql/parsers/CalciteSqlCompiler.java | 5 +-
.../controller/recommender/io/InputManager.java | 14 +-
.../query/executor/ServerQueryExecutorV1Impl.java | 25 ++--
.../core/query/reduce/BrokerReduceService.java | 8 +-
.../core/query/reduce/ResultReducerFactory.java | 4 +-
.../core/query/reduce/StreamingReduceService.java | 11 +-
.../core/query/request/ServerQueryRequest.java | 16 +--
.../core/query/request/context/QueryContext.java | 98 ++++++-------
.../BrokerRequestToQueryContextConverter.java | 157 ---------------------
.../context/utils/QueryContextConverterUtils.java | 134 +++++++++++++++++-
.../plan/maker/QueryOverrideWithHintsTest.java | 20 ++-
.../org/apache/pinot/queries/BaseQueriesTest.java | 38 +++--
.../tests/ClusterIntegrationTestUtils.java | 11 +-
.../apache/pinot/spi/utils/CommonConstants.java | 1 -
14 files changed, 252 insertions(+), 290 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java
index 2b1a69aa0f..f6e36aebc8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java
@@ -29,7 +29,10 @@ public class CalciteSqlCompiler {
}
public static BrokerRequest compileToBrokerRequest(String query) {
- PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+ return convertToBrokerRequest(CalciteSqlParser.compileToPinotQuery(query));
+ }
+
+ public static BrokerRequest convertToBrokerRequest(PinotQuery pinotQuery) {
BrokerRequest brokerRequest = new BrokerRequest();
brokerRequest.setPinotQuery(pinotQuery);
// Set table name in broker request because it is used for access control,
query routing etc.
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
index ffd7f2d52b..dc5deab407 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
import
org.apache.pinot.controller.recommender.exceptions.InvalidInputException;
import org.apache.pinot.controller.recommender.io.metadata.FieldMetadata;
import org.apache.pinot.controller.recommender.io.metadata.SchemaWithMetaData;
@@ -51,7 +52,7 @@ import
org.apache.pinot.controller.recommender.rules.io.params.SegmentSizeRulePa
import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.query.request.context.QueryContext;
-import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -59,6 +60,7 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlCompilationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,10 +168,12 @@ public class InputManager {
List<String> invalidQueries = new LinkedList<>();
for (String queryString : _queryWeightMap.keySet()) {
try {
- BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest(queryString);
- _queryOptimizer.optimize(brokerRequest.getPinotQuery(), _schema);
- QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
- _parsedQueries.put(queryString,
Triple.of(_queryWeightMap.get(queryString), brokerRequest, queryContext));
+ PinotQuery pinotQuery =
CalciteSqlParser.compileToPinotQuery(queryString);
+ _queryOptimizer.optimize(pinotQuery, _schema);
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
+ _parsedQueries.put(queryString,
+ Triple.of(_queryWeightMap.get(queryString),
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery),
+ queryContext));
} catch (SqlCompilationException e) {
invalidQueries.add(queryString);
_overWrittenConfigs.getFlaggedQueries().add(queryString,
ERROR_INVALID_QUERY);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index bd88fcd736..200692daf9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -160,9 +160,9 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
long querySchedulingTimeMs = System.currentTimeMillis() -
queryArrivalTimeMs;
if (querySchedulingTimeMs >= queryTimeoutMs) {
_serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
- String errorMessage = String
- .format("Query scheduling took %dms (longer than query timeout of
%dms) on server: %s", querySchedulingTimeMs,
- queryTimeoutMs, _instanceDataManager.getInstanceId());
+ String errorMessage =
+ String.format("Query scheduling took %dms (longer than query timeout
of %dms) on server: %s",
+ querySchedulingTimeMs, queryTimeoutMs,
_instanceDataManager.getInstanceId());
DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
@@ -171,8 +171,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
- String errorMessage = String
- .format("Failed to find table: %s on server: %s", tableNameWithType,
_instanceDataManager.getInstanceId());
+ String errorMessage = String.format("Failed to find table: %s on server:
%s", tableNameWithType,
+ _instanceDataManager.getInstanceId());
DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
@@ -210,7 +210,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
DataTable dataTable = null;
try {
dataTable = processQuery(indexSegments, queryContext, timerContext,
executorService, responseObserver,
- queryRequest.isEnableStreaming(), queryRequest.isExplain());
+ queryRequest.isEnableStreaming());
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
@@ -271,7 +271,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
private DataTable processQuery(List<IndexSegment> indexSegments,
QueryContext queryContext, TimerContext timerContext,
ExecutorService executorService, @Nullable
StreamObserver<Server.ServerResponse> responseObserver,
- boolean enableStreaming, boolean isExplain)
+ boolean enableStreaming)
throws Exception {
handleSubquery(queryContext, indexSegments, timerContext, executorService);
@@ -288,7 +288,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
if (numSelectedSegments == 0) {
// Only return metadata for streaming query
- if (isExplain) {
+ if (queryContext.isExplain()) {
return EXPLAIN_PLAN_RESULTS_NO_MATCHING_SEGMENT;
}
@@ -309,7 +309,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
planBuildTimer.stopAndRecord();
TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
- DataTable dataTable = isExplain ? processExplainPlanQueries(queryPlan) :
queryPlan.execute();
+ DataTable dataTable = queryContext.isExplain() ?
processExplainPlanQueries(queryPlan) : queryPlan.execute();
planExecTimer.stopAndRecord();
// Update the total docs in the metadata based on the un-pruned segments
@@ -349,8 +349,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
/** Create EXPLAIN query result {@link DataTable} by recursively stepping
through the {@link Operator} tree. */
- public static void addOperatorToTable(DataTableBuilder dataTableBuilder,
Operator node, int[] globalId,
- int parentId) throws IOException {
+ public static void addOperatorToTable(DataTableBuilder dataTableBuilder,
Operator node, int[] globalId, int parentId)
+ throws IOException {
if (node == null) {
return;
}
@@ -434,8 +434,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
subqueryExpression.getLiteral());
// Execute the subquery
subquery.setEndTimeMs(endTimeMs);
- DataTable dataTable = processQuery(indexSegments, subquery,
timerContext, executorService, null,
- false, false);
+ DataTable dataTable = processQuery(indexSegments, subquery,
timerContext, executorService, null, false);
IdSet idSet = dataTable.getObject(0, 0);
String serializedIdSet = idSet.toBase64String();
// Rewrite the expression
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 4a635b613a..82a1f6022c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -28,7 +28,7 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.request.context.QueryContext;
-import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -53,7 +53,7 @@ public class BrokerReduceService extends BaseReduceService {
return BrokerResponseNative.empty();
}
- Map<String, String> queryOptions =
serverBrokerRequest.getPinotQuery().getQueryOptions();
+ Map<String, String> queryOptions =
brokerRequest.getPinotQuery().getQueryOptions();
boolean enableTrace =
queryOptions != null &&
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
@@ -101,7 +101,7 @@ public class BrokerReduceService extends BaseReduceService {
return brokerResponseNative;
}
- QueryContext serverQueryContext =
BrokerRequestToQueryContextConverter.convert(serverBrokerRequest);
+ QueryContext serverQueryContext =
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer =
ResultReducerFactory.getResultReducer(serverQueryContext);
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema,
dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
@@ -110,7 +110,7 @@ public class BrokerReduceService extends BaseReduceService {
if (brokerRequest == serverBrokerRequest) {
queryContext = serverQueryContext;
} else {
- queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+ queryContext =
QueryContextConverterUtils.getQueryContext(brokerRequest.getPinotQuery());
GapfillUtils.GapfillType gapfillType =
GapfillUtils.getGapfillType(queryContext);
if (gapfillType != null) {
GapfillProcessor gapfillProcessor = new GapfillProcessor(queryContext,
gapfillType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index 7d0678f4ba..53f242f7ca 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.query.reduce;
-import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -38,8 +37,7 @@ public final class ResultReducerFactory {
* Constructs the right result reducer based on the given query context.
*/
public static DataTableReducer getResultReducer(QueryContext queryContext) {
- BrokerRequest brokerRequest = queryContext.getBrokerRequest();
- if (brokerRequest != null && brokerRequest.getPinotQuery() != null &&
brokerRequest.getPinotQuery().isExplain()) {
+ if (queryContext.isExplain()) {
return new ExplainPlanDataTableReducer(queryContext);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index ed3e13b402..c1f6ef8d23 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -30,11 +30,12 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
-import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -65,13 +66,13 @@ public class StreamingReduceService extends
BaseReduceService {
}
// prepare contextual info for reduce.
- Map<String, String> queryOptions =
brokerRequest.getPinotQuery().getQueryOptions();
+ PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+ Map<String, String> queryOptions = pinotQuery.getQueryOptions();
boolean enableTrace =
queryOptions != null &&
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
- QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
-
- String tableName = brokerRequest.getQuerySource().getTableName();
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
+ String tableName = queryContext.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
// initialize empty response.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 2c685d070e..09bfa4ad2f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -26,7 +26,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.TimerContext;
-import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.spi.utils.CommonConstants.Query.Request;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.thrift.TDeserializer;
@@ -46,7 +46,6 @@ public class ServerQueryRequest {
private final boolean _enableStreaming;
private final List<String> _segmentsToQuery;
private final QueryContext _queryContext;
- private final boolean _explain;
// Timing information for different phases of query execution
private final TimerContext _timerContext;
@@ -57,11 +56,7 @@ public class ServerQueryRequest {
_enableTrace = instanceRequest.isEnableTrace();
_enableStreaming = false;
_segmentsToQuery = instanceRequest.getSearchSegments();
- BrokerRequest brokerRequest = instanceRequest.getQuery();
- _queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
- _explain =
- brokerRequest != null && brokerRequest.getPinotQuery() != null ?
brokerRequest.getPinotQuery().isExplain()
- : false;
+ _queryContext =
QueryContextConverterUtils.getQueryContext(instanceRequest.getQuery().getPinotQuery());
_timerContext = new TimerContext(_queryContext.getTableName(),
serverMetrics, queryArrivalTimeMs);
}
@@ -88,9 +83,8 @@ public class ServerQueryRequest {
} else {
throw new UnsupportedOperationException("Unsupported payloadType: " +
payloadType);
}
- _queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+ _queryContext =
QueryContextConverterUtils.getQueryContext(brokerRequest.getPinotQuery());
_timerContext = new TimerContext(_queryContext.getTableName(),
serverMetrics, queryArrivalTimeMs);
- _explain =
Boolean.parseBoolean(metadata.get(Request.MetadataKeys.EXPLAIN));
}
public long getRequestId() {
@@ -124,8 +118,4 @@ public class ServerQueryRequest {
public TimerContext getTimerContext() {
return _timerContext;
}
-
- public boolean isExplain() {
- return _explain;
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 1076a8504f..fe9311e928 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
@@ -71,6 +70,7 @@ import org.apache.pinot.core.util.MemoizedClassAssociation;
@SuppressWarnings({"rawtypes", "unchecked"})
public class QueryContext {
private final String _tableName;
+ private final QueryContext _subquery;
private final List<ExpressionContext> _selectExpressions;
private final List<String> _aliasList;
private final FilterContext _filter;
@@ -82,20 +82,15 @@ public class QueryContext {
private final Map<String, String> _queryOptions;
private final Map<String, String> _debugOptions;
private final Map<ExpressionContext, ExpressionContext>
_expressionOverrideHints;
-
- // Keep the BrokerRequest to make incremental changes
- // TODO: Remove it once the whole query engine is using the QueryContext
- private final BrokerRequest _brokerRequest;
- private final QueryContext _subquery;
+ private final boolean _explain;
private final Function<Class<?>, Map<?, ?>> _sharedValues =
MemoizedClassAssociation.of(ConcurrentHashMap::new);
// Pre-calculate the aggregation functions and columns for the query so that
it can be shared across all the segments
private AggregationFunction[] _aggregationFunctions;
- private List<Pair<AggregationFunction, FilterContext>>
_filteredAggregationFunctions;
-
private Map<FunctionContext, Integer> _aggregationFunctionIndexMap;
private boolean _hasFilteredAggregations;
+ private List<Pair<AggregationFunction, FilterContext>>
_filteredAggregationFunctions;
private Map<Pair<FunctionContext, FilterContext>, Integer>
_filteredAggregationsIndexMap;
private Set<String> _columns;
@@ -120,13 +115,14 @@ public class QueryContext {
// Trim threshold to use for server combine for SQL GROUP BY
private int _groupTrimThreshold =
InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
- private QueryContext(String tableName, List<ExpressionContext>
selectExpressions, List<String> aliasList,
- @Nullable FilterContext filter, @Nullable List<ExpressionContext>
groupByExpressions,
- @Nullable FilterContext havingFilter, @Nullable
List<OrderByExpressionContext> orderByExpressions, int limit,
- int offset, Map<String, String> queryOptions, @Nullable Map<String,
String> debugOptions,
- BrokerRequest brokerRequest, QueryContext subquery,
- @Nullable Map<ExpressionContext, ExpressionContext>
expressionOverrideHints) {
+ private QueryContext(@Nullable String tableName, @Nullable QueryContext
subquery,
+ List<ExpressionContext> selectExpressions, List<String> aliasList,
@Nullable FilterContext filter,
+ @Nullable List<ExpressionContext> groupByExpressions, @Nullable
FilterContext havingFilter,
+ @Nullable List<OrderByExpressionContext> orderByExpressions, int limit,
int offset,
+ Map<String, String> queryOptions, @Nullable Map<String, String>
debugOptions,
+ @Nullable Map<ExpressionContext, ExpressionContext>
expressionOverrideHints, boolean explain) {
_tableName = tableName;
+ _subquery = subquery;
_selectExpressions = selectExpressions;
_aliasList = Collections.unmodifiableList(aliasList);
_filter = filter;
@@ -137,18 +133,26 @@ public class QueryContext {
_offset = offset;
_queryOptions = queryOptions;
_debugOptions = debugOptions;
- _brokerRequest = brokerRequest;
- _subquery = subquery;
_expressionOverrideHints = expressionOverrideHints;
+ _explain = explain;
}
/**
* Returns the table name.
+ * NOTE: on the broker side, table name might be {@code null} when subquery
is available.
*/
public String getTableName() {
return _tableName;
}
+ /**
+ * Returns the subquery.
+ */
+ @Nullable
+ public QueryContext getSubquery() {
+ return _subquery;
+ }
+
/**
* Returns a list of expressions in the SELECT clause.
*/
@@ -195,10 +199,6 @@ public class QueryContext {
return _orderByExpressions;
}
- public QueryContext getSubquery() {
- return _subquery;
- }
-
/**
* Returns the limit of the query.
*/
@@ -236,10 +236,10 @@ public class QueryContext {
}
/**
- * Returns the BrokerRequest where the QueryContext is extracted from.
+ * Returns {@code true} if the query is an EXPLAIN query, {@code false}
otherwise.
*/
- public BrokerRequest getBrokerRequest() {
- return _brokerRequest;
+ public boolean isExplain() {
+ return _explain;
}
/**
@@ -381,15 +381,16 @@ public class QueryContext {
*/
@Override
public String toString() {
- return "QueryContext{" + "_tableName='" + _tableName + '\'' + ",
_selectExpressions=" + _selectExpressions
- + ", _aliasList=" + _aliasList + ", _filter=" + _filter + ",
_groupByExpressions=" + _groupByExpressions
- + ", _havingFilter=" + _havingFilter + ", _orderByExpressions=" +
_orderByExpressions + ", _limit=" + _limit
- + ", _offset=" + _offset + ", _queryOptions=" + _queryOptions + ",
_debugOptions=" + _debugOptions
- + ", _brokerRequest=" + _brokerRequest + '}';
+ return "QueryContext{" + "_tableName='" + _tableName + '\'' + ",
_subquery=" + _subquery + ", _selectExpressions="
+ + _selectExpressions + ", _aliasList=" + _aliasList + ", _filter=" +
_filter + ", _groupByExpressions="
+ + _groupByExpressions + ", _havingFilter=" + _havingFilter + ",
_orderByExpressions=" + _orderByExpressions
+ + ", _limit=" + _limit + ", _offset=" + _offset + ", _queryOptions=" +
_queryOptions + ", _debugOptions="
+ + _debugOptions + ", _expressionOverrideHints=" +
_expressionOverrideHints + ", _explain=" + _explain + '}';
}
public static class Builder {
private String _tableName;
+ private QueryContext _subquery;
private List<ExpressionContext> _selectExpressions;
private List<String> _aliasList;
private FilterContext _filter;
@@ -400,15 +401,19 @@ public class QueryContext {
private int _offset;
private Map<String, String> _queryOptions;
private Map<String, String> _debugOptions;
- private BrokerRequest _brokerRequest;
- private QueryContext _subquery;
private Map<ExpressionContext, ExpressionContext> _expressionOverrideHints;
+ private boolean _explain;
public Builder setTableName(String tableName) {
_tableName = tableName;
return this;
}
+ public Builder setSubquery(QueryContext subquery) {
+ _subquery = subquery;
+ return this;
+ }
+
public Builder setSelectExpressions(List<ExpressionContext>
selectExpressions) {
_selectExpressions = selectExpressions;
return this;
@@ -419,22 +424,22 @@ public class QueryContext {
return this;
}
- public Builder setFilter(@Nullable FilterContext filter) {
+ public Builder setFilter(FilterContext filter) {
_filter = filter;
return this;
}
- public Builder setGroupByExpressions(@Nullable List<ExpressionContext>
groupByExpressions) {
+ public Builder setGroupByExpressions(List<ExpressionContext>
groupByExpressions) {
_groupByExpressions = groupByExpressions;
return this;
}
- public Builder setHavingFilter(@Nullable FilterContext havingFilter) {
+ public Builder setHavingFilter(FilterContext havingFilter) {
_havingFilter = havingFilter;
return this;
}
- public Builder setOrderByExpressions(@Nullable
List<OrderByExpressionContext> orderByExpressions) {
+ public Builder setOrderByExpressions(List<OrderByExpressionContext>
orderByExpressions) {
_orderByExpressions = orderByExpressions;
return this;
}
@@ -449,28 +454,23 @@ public class QueryContext {
return this;
}
- public Builder setQueryOptions(@Nullable Map<String, String> queryOptions)
{
+ public Builder setQueryOptions(Map<String, String> queryOptions) {
_queryOptions = queryOptions;
return this;
}
- public Builder setDebugOptions(@Nullable Map<String, String> debugOptions)
{
+ public Builder setDebugOptions(Map<String, String> debugOptions) {
_debugOptions = debugOptions;
return this;
}
- public Builder setBrokerRequest(BrokerRequest brokerRequest) {
- _brokerRequest = brokerRequest;
- return this;
- }
-
- public Builder setSubquery(QueryContext subquery) {
- _subquery = subquery;
+ public Builder setExpressionOverrideHints(Map<ExpressionContext,
ExpressionContext> expressionOverrideHints) {
+ _expressionOverrideHints = expressionOverrideHints;
return this;
}
- public Builder setExpressionOverrideHints(Map<ExpressionContext,
ExpressionContext> expressionOverrideHints) {
- _expressionOverrideHints = expressionOverrideHints;
+ public Builder setExplain(boolean explain) {
+ _explain = explain;
return this;
}
@@ -481,9 +481,9 @@ public class QueryContext {
_queryOptions = Collections.emptyMap();
}
QueryContext queryContext =
- new QueryContext(_tableName, _selectExpressions, _aliasList,
_filter, _groupByExpressions, _havingFilter,
- _orderByExpressions, _limit, _offset, _queryOptions,
_debugOptions, _brokerRequest, _subquery,
- _expressionOverrideHints);
+ new QueryContext(_tableName, _subquery, _selectExpressions,
_aliasList, _filter, _groupByExpressions,
+ _havingFilter, _orderByExpressions, _limit, _offset,
_queryOptions, _debugOptions,
+ _expressionOverrideHints, _explain);
// Pre-calculate the aggregation functions and columns for the query
generateAggregationFunctions(queryContext);
@@ -555,8 +555,8 @@ public class QueryContext {
aggregationFunctionIndexMap.put(entry.getKey().getLeft(),
entry.getValue());
}
queryContext._aggregationFunctions = aggregationFunctions;
- queryContext._filteredAggregationFunctions =
filteredAggregationFunctions;
queryContext._aggregationFunctionIndexMap =
aggregationFunctionIndexMap;
+ queryContext._filteredAggregationFunctions =
filteredAggregationFunctions;
queryContext._filteredAggregationsIndexMap =
filteredAggregationsIndexMap;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
deleted file mode 100644
index dfdd1f9ab0..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.request.context.utils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.request.ExpressionType;
-import org.apache.pinot.common.request.Function;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.FilterContext;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.common.request.context.RequestContextUtils;
-import org.apache.pinot.core.query.request.context.QueryContext;
-
-
-public class BrokerRequestToQueryContextConverter {
- private BrokerRequestToQueryContextConverter() {
- }
-
- /**
- * Converts the given {@link BrokerRequest} into a {@link QueryContext}.
- */
- public static QueryContext convert(BrokerRequest brokerRequest) {
- return convert(brokerRequest.getPinotQuery(), brokerRequest);
- }
-
- private static QueryContext convert(PinotQuery pinotQuery, BrokerRequest
brokerRequest) {
- QueryContext subquery = null;
- if (pinotQuery.getDataSource().getSubquery() != null) {
- subquery = convert(pinotQuery.getDataSource().getSubquery(),
brokerRequest);
- }
- // SELECT
- List<ExpressionContext> selectExpressions;
- List<Expression> selectList = pinotQuery.getSelectList();
- List<String> aliasList = new ArrayList<>(selectList.size());
- selectExpressions = new ArrayList<>(selectList.size());
- for (Expression thriftExpression : selectList) {
- // Handle alias
- Expression expressionWithoutAlias = thriftExpression;
- if (thriftExpression.getType() == ExpressionType.FUNCTION) {
- Function function = thriftExpression.getFunctionCall();
- List<Expression> operands = function.getOperands();
- switch (function.getOperator().toUpperCase()) {
- case "AS":
- expressionWithoutAlias = operands.get(0);
- aliasList.add(operands.get(1).getIdentifier().getName());
- break;
- case "DISTINCT":
- int numOperands = operands.size();
- for (int i = 0; i < numOperands; i++) {
- Expression operand = operands.get(i);
- Function operandFunction = operand.getFunctionCall();
- if (operandFunction != null &&
operandFunction.getOperator().equalsIgnoreCase("AS")) {
- operands.set(i, operandFunction.getOperands().get(0));
-
aliasList.add(operandFunction.getOperands().get(1).getIdentifier().getName());
- } else {
- aliasList.add(null);
- }
- }
- break;
- default:
- // Add null as a placeholder for alias.
- aliasList.add(null);
- break;
- }
- } else {
- // Add null as a placeholder for alias.
- aliasList.add(null);
- }
-
selectExpressions.add(RequestContextUtils.getExpression(expressionWithoutAlias));
- }
-
- // WHERE
- FilterContext filter = null;
- Expression filterExpression = pinotQuery.getFilterExpression();
- if (filterExpression != null) {
- filter = RequestContextUtils.getFilter(filterExpression);
- }
-
- // GROUP BY
- List<ExpressionContext> groupByExpressions = null;
- List<Expression> groupByList = pinotQuery.getGroupByList();
- if (CollectionUtils.isNotEmpty(groupByList)) {
- groupByExpressions = new ArrayList<>(groupByList.size());
- for (Expression thriftExpression : groupByList) {
-
groupByExpressions.add(RequestContextUtils.getExpression(thriftExpression));
- }
- }
-
- // ORDER BY
- List<OrderByExpressionContext> orderByExpressions = null;
- List<Expression> orderByList = pinotQuery.getOrderByList();
- if (CollectionUtils.isNotEmpty(orderByList)) {
- // Deduplicate the order-by expressions
- orderByExpressions = new ArrayList<>(orderByList.size());
- Set<ExpressionContext> expressionSet = new HashSet<>();
- for (Expression orderBy : orderByList) {
- // NOTE: Order-by is always a Function with the ordering of the
Expression
- Function thriftFunction = orderBy.getFunctionCall();
- ExpressionContext expression =
RequestContextUtils.getExpression(thriftFunction.getOperands().get(0));
- if (expressionSet.add(expression)) {
- boolean isAsc = thriftFunction.getOperator().equalsIgnoreCase("ASC");
- orderByExpressions.add(new OrderByExpressionContext(expression,
isAsc));
- }
- }
- }
-
- // HAVING
- FilterContext havingFilter = null;
- Expression havingExpression = pinotQuery.getHavingExpression();
- if (havingExpression != null) {
- havingFilter = RequestContextUtils.getFilter(havingExpression);
- }
-
- // EXPRESSION OVERRIDE HINTS
- Map<ExpressionContext, ExpressionContext> expressionContextOverrideHints =
new HashMap<>();
- Map<Expression, Expression> expressionOverrideHints =
pinotQuery.getExpressionOverrideHints();
- if (expressionOverrideHints != null) {
- for (Map.Entry<Expression, Expression> entry :
expressionOverrideHints.entrySet()) {
-
expressionContextOverrideHints.put(RequestContextUtils.getExpression(entry.getKey()),
- RequestContextUtils.getExpression(entry.getValue()));
- }
- }
-
- return new
QueryContext.Builder().setTableName(pinotQuery.getDataSource().getTableName())
-
.setSelectExpressions(selectExpressions).setAliasList(aliasList).setFilter(filter)
-
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
-
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
-
.setQueryOptions(pinotQuery.getQueryOptions()).setDebugOptions(pinotQuery.getDebugOptions())
-
.setSubquery(subquery).setExpressionOverrideHints(expressionContextOverrideHints)
- .setBrokerRequest(brokerRequest).build();
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java
index d2595e5bb7..d6a8437ec4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java
@@ -18,8 +18,24 @@
*/
package org.apache.pinot.core.query.request.context.utils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
public class QueryContextConverterUtils {
@@ -30,6 +46,120 @@ public class QueryContextConverterUtils {
* Converts the given query into a {@link QueryContext}.
*/
public static QueryContext getQueryContext(String query) {
- return
BrokerRequestToQueryContextConverter.convert(CalciteSqlCompiler.compileToBrokerRequest(query));
+ return getQueryContext(CalciteSqlParser.compileToPinotQuery(query));
+ }
+
+ /**
+ * Converts the given {@link PinotQuery} into a {@link QueryContext}.
+ */
+ public static QueryContext getQueryContext(PinotQuery pinotQuery) {
+ // FROM
+ String tableName;
+ DataSource dataSource = pinotQuery.getDataSource();
+ tableName = dataSource.getTableName();
+ QueryContext subquery = null;
+ if (dataSource.getSubquery() != null) {
+ subquery = getQueryContext(dataSource.getSubquery());
+ }
+
+ // SELECT
+ List<ExpressionContext> selectExpressions;
+ List<Expression> selectList = pinotQuery.getSelectList();
+ List<String> aliasList = new ArrayList<>(selectList.size());
+ selectExpressions = new ArrayList<>(selectList.size());
+ for (Expression thriftExpression : selectList) {
+ // Handle alias
+ Expression expressionWithoutAlias = thriftExpression;
+ if (thriftExpression.getType() == ExpressionType.FUNCTION) {
+ Function function = thriftExpression.getFunctionCall();
+ List<Expression> operands = function.getOperands();
+ switch (function.getOperator().toUpperCase()) {
+ case "AS":
+ expressionWithoutAlias = operands.get(0);
+ aliasList.add(operands.get(1).getIdentifier().getName());
+ break;
+ case "DISTINCT":
+ int numOperands = operands.size();
+ for (int i = 0; i < numOperands; i++) {
+ Expression operand = operands.get(i);
+ Function operandFunction = operand.getFunctionCall();
+ if (operandFunction != null &&
operandFunction.getOperator().equalsIgnoreCase("AS")) {
+ operands.set(i, operandFunction.getOperands().get(0));
+
aliasList.add(operandFunction.getOperands().get(1).getIdentifier().getName());
+ } else {
+ aliasList.add(null);
+ }
+ }
+ break;
+ default:
+ // Add null as a placeholder for alias.
+ aliasList.add(null);
+ break;
+ }
+ } else {
+ // Add null as a placeholder for alias.
+ aliasList.add(null);
+ }
+
selectExpressions.add(RequestContextUtils.getExpression(expressionWithoutAlias));
+ }
+
+ // WHERE
+ FilterContext filter = null;
+ Expression filterExpression = pinotQuery.getFilterExpression();
+ if (filterExpression != null) {
+ filter = RequestContextUtils.getFilter(filterExpression);
+ }
+
+ // GROUP BY
+ List<ExpressionContext> groupByExpressions = null;
+ List<Expression> groupByList = pinotQuery.getGroupByList();
+ if (CollectionUtils.isNotEmpty(groupByList)) {
+ groupByExpressions = new ArrayList<>(groupByList.size());
+ for (Expression thriftExpression : groupByList) {
+
groupByExpressions.add(RequestContextUtils.getExpression(thriftExpression));
+ }
+ }
+
+ // ORDER BY
+ List<OrderByExpressionContext> orderByExpressions = null;
+ List<Expression> orderByList = pinotQuery.getOrderByList();
+ if (CollectionUtils.isNotEmpty(orderByList)) {
+ // Deduplicate the order-by expressions
+ orderByExpressions = new ArrayList<>(orderByList.size());
+ Set<ExpressionContext> expressionSet = new HashSet<>();
+ for (Expression orderBy : orderByList) {
+ // NOTE: Order-by is always a Function with the ordering of the
Expression
+ Function thriftFunction = orderBy.getFunctionCall();
+ ExpressionContext expression =
RequestContextUtils.getExpression(thriftFunction.getOperands().get(0));
+ if (expressionSet.add(expression)) {
+ boolean isAsc = thriftFunction.getOperator().equalsIgnoreCase("ASC");
+ orderByExpressions.add(new OrderByExpressionContext(expression,
isAsc));
+ }
+ }
+ }
+
+ // HAVING
+ FilterContext havingFilter = null;
+ Expression havingExpression = pinotQuery.getHavingExpression();
+ if (havingExpression != null) {
+ havingFilter = RequestContextUtils.getFilter(havingExpression);
+ }
+
+ // EXPRESSION OVERRIDE HINTS
+ Map<ExpressionContext, ExpressionContext> expressionContextOverrideHints =
new HashMap<>();
+ Map<Expression, Expression> expressionOverrideHints =
pinotQuery.getExpressionOverrideHints();
+ if (expressionOverrideHints != null) {
+ for (Map.Entry<Expression, Expression> entry :
expressionOverrideHints.entrySet()) {
+
expressionContextOverrideHints.put(RequestContextUtils.getExpression(entry.getKey()),
+ RequestContextUtils.getExpression(entry.getValue()));
+ }
+ }
+
+ return new
QueryContext.Builder().setTableName(tableName).setSubquery(subquery)
+
.setSelectExpressions(selectExpressions).setAliasList(aliasList).setFilter(filter)
+
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
+
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
+
.setQueryOptions(pinotQuery.getQueryOptions()).setDebugOptions(pinotQuery.getDebugOptions())
+
.setExpressionOverrideHints(expressionContextOverrideHints).setExplain(pinotQuery.isExplain()).build();
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
index f83979b482..31d65af0da 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
@@ -26,8 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
@@ -35,14 +35,14 @@ import
org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.common.request.context.predicate.EqPredicate;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
-import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.datasource.DataSource;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -198,15 +198,14 @@ public class QueryOverrideWithHintsTest {
@Test
public void testRewriteExpressionsWithHints() {
- BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+ PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(
"SELECT datetrunc('MONTH', ts), count(*), sum(abc) from myTable group
by datetrunc('MONTH', ts) ");
Expression dateTruncFunctionExpr =
RequestUtils.getFunctionExpression("datetrunc");
dateTruncFunctionExpr.getFunctionCall().setOperands(new ArrayList<>(
ImmutableList.of(RequestUtils.getLiteralExpression("MONTH"),
RequestUtils.getIdentifierExpression("ts"))));
Expression timestampIndexColumn =
RequestUtils.getIdentifierExpression("$ts$MONTH");
- brokerRequest.getPinotQuery()
- .setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr,
timestampIndexColumn));
- QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+
pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr,
timestampIndexColumn));
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext,
_indexSegment);
assertEquals(queryContext.getSelectExpressions().get(0).getIdentifier(),
"$ts$MONTH");
assertEquals(queryContext.getGroupByExpressions().get(0).getIdentifier(),
"$ts$MONTH");
@@ -214,15 +213,14 @@ public class QueryOverrideWithHintsTest {
@Test
public void testNotRewriteExpressionsWithHints() {
- BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+ PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(
"SELECT datetrunc('DAY', ts), count(*), sum(abc) from myTable group by
datetrunc('DAY', ts)");
Expression dateTruncFunctionExpr =
RequestUtils.getFunctionExpression("datetrunc");
dateTruncFunctionExpr.getFunctionCall().setOperands(new ArrayList<>(
ImmutableList.of(RequestUtils.getLiteralExpression("DAY"),
RequestUtils.getIdentifierExpression("ts"))));
Expression timestampIndexColumn =
RequestUtils.getIdentifierExpression("$ts$DAY");
- brokerRequest.getPinotQuery()
- .setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr,
timestampIndexColumn));
- QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+
pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr,
timestampIndexColumn));
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext,
_indexSegment);
assertEquals(queryContext.getSelectExpressions().get(0).getFunction(),
queryContext.getExpressionOverrideHints().keySet().iterator().next().getFunction());
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index fc33e780e7..ee7ab37778 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -38,7 +38,7 @@ import
org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.query.request.context.QueryContext;
-import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -49,6 +49,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
/**
@@ -74,7 +75,7 @@ public abstract class BaseQueriesTest {
protected <T extends Operator> T getOperator(String query) {
BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest(query);
BrokerRequest serverBrokerRequest =
GapfillUtils.stripGapfill(brokerRequest);
- QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(serverBrokerRequest);
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(),
queryContext).run();
}
@@ -130,8 +131,7 @@ public abstract class BaseQueriesTest {
*/
private BrokerResponseNative getBrokerResponse(String query, PlanMaker
planMaker,
@Nullable Map<String, String> extraQueryOptions) {
- BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest(query);
- PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+ PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
if (extraQueryOptions != null) {
Map<String, String> queryOptions = pinotQuery.getQueryOptions();
if (queryOptions == null) {
@@ -140,11 +140,7 @@ public abstract class BaseQueriesTest {
}
queryOptions.putAll(extraQueryOptions);
}
- BrokerRequest serverBrokerRequest =
GapfillUtils.stripGapfill(brokerRequest);
- QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
- QueryContext serverQueryContext = brokerRequest == serverBrokerRequest ?
queryContext
- : BrokerRequestToQueryContextConverter.convert(serverBrokerRequest);
- return getBrokerResponse(queryContext, serverQueryContext, planMaker);
+ return getBrokerResponse(pinotQuery, planMaker);
}
/**
@@ -152,14 +148,18 @@ public abstract class BaseQueriesTest {
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
*/
- private BrokerResponseNative getBrokerResponse(QueryContext queryContext,
QueryContext serverQueryContext,
- PlanMaker planMaker) {
+ private BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery,
PlanMaker planMaker) {
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
+ BrokerRequest serverBrokerRequest =
GapfillUtils.stripGapfill(brokerRequest);
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
+ QueryContext serverQueryContext = serverBrokerRequest == brokerRequest ?
queryContext
+ :
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
+
// Server side
serverQueryContext.setEndTimeMs(System.currentTimeMillis() +
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
Plan plan = planMaker.makeInstancePlan(getIndexSegments(),
serverQueryContext, EXECUTOR_SERVICE);
- PinotQuery pinotQuery =
serverQueryContext.getBrokerRequest().getPinotQuery();
DataTable instanceResponse =
- pinotQuery.isExplain() ?
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute();
+ queryContext.isExplain() ?
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute();
// Broker side
// Use 2 Threads for 2 data-tables
@@ -177,8 +177,8 @@ public abstract class BaseQueriesTest {
throw new RuntimeException(e);
}
BrokerResponseNative brokerResponse =
- brokerReduceService.reduceOnDataTable(queryContext.getBrokerRequest(),
serverQueryContext.getBrokerRequest(),
- dataTableMap, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS,
null);
+ brokerReduceService.reduceOnDataTable(brokerRequest,
serverBrokerRequest, dataTableMap,
+ CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS, null);
brokerReduceService.shutDown();
return brokerResponse;
@@ -191,12 +191,8 @@ public abstract class BaseQueriesTest {
*/
protected BrokerResponseNative getBrokerResponseForOptimizedQuery(String
query, @Nullable TableConfig config,
@Nullable Schema schema) {
- BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest(query);
- PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+ PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
OPTIMIZER.optimize(pinotQuery, config, schema);
- BrokerRequest serverBrokerRequest =
GapfillUtils.stripGapfill(brokerRequest);
- QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
- QueryContext serverQueryContext =
BrokerRequestToQueryContextConverter.convert(serverBrokerRequest);
- return getBrokerResponse(queryContext, serverQueryContext, PLAN_MAKER);
+ return getBrokerResponse(pinotQuery, PLAN_MAKER);
}
}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 3d69f5d4c5..fbea4e6220 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -59,6 +59,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -563,11 +564,11 @@ public class ClusterIntegrationTestUtils {
if (!QueryContextUtils.isAggregationQuery(queryContext)) {
// selection/distinct
- List<String> orderByColumns = new ArrayList<>();
+ Set<String> orderByColumns = new HashSet<>();
if (queryContext.getOrderByExpressions() != null) {
- orderByColumns.addAll(
-
CalciteSqlParser.extractIdentifiers(queryContext.getBrokerRequest().getPinotQuery().getOrderByList(),
- false));
+ for (OrderByExpressionContext orderByExpression :
queryContext.getOrderByExpressions()) {
+ orderByExpression.getColumns(orderByColumns);
+ }
}
Set<String> expectedValues = new HashSet<>();
List<String> expectedOrderByValues = new ArrayList<>();
@@ -717,7 +718,7 @@ public class ClusterIntegrationTestUtils {
private static void comparePinotResultsWithExpectedValues(Set<String>
expectedValues,
List<String> expectedOrderByValues, org.apache.pinot.client.ResultSet
connectionResultSet,
- Collection<String> orderByColumns, String pinotQuery, String h2Query,
int h2NumRows,
+ Set<String> orderByColumns, String pinotQuery, String h2Query, int
h2NumRows,
long pinotNumRecordsSelected) {
int pinotNumRows = connectionResultSet.getRowCount();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8d6c7646f9..15fdc312a5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -616,7 +616,6 @@ public class CommonConstants {
public static final String ENABLE_TRACE = "enableTrace";
public static final String ENABLE_STREAMING = "enableStreaming";
public static final String PAYLOAD_TYPE = "payloadType";
- public static final String EXPLAIN = "explain";
}
public static class PayloadType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]