This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8563e22716 Extract compileRequest from
BaseSingleStageRequestHandler.doHandleRequest (#15073)
8563e22716 is described below
commit 8563e227168f96de8b0f5fe1fb77529d6725baa3
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Mar 7 11:32:13 2025 +0530
Extract compileRequest from BaseSingleStageRequestHandler.doHandleRequest
(#15073)
---
.../BaseSingleStageBrokerRequestHandler.java | 320 ++++++++++++---------
1 file changed, 192 insertions(+), 128 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index a1cfa18d51..3320636a8a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -311,139 +311,64 @@ public abstract class
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
}
}
+ /**
+ * CompileResult holds the result of the compilation phase. Compilation may
or may not be successful. If compilation
+ * is successful then all member variables other than BrokerResponse will be
available. If compilation is not
+ * successful, then only the BrokerResponse is set. This is done to keep the
current behaviour as is.
+ * It became hard to keep the current behaviour if we were to throw an
exception from the compileRequest method.
+ * The only exception is that a BrokerResponse is returned for a
literal-only query.
+ */
+ private static class CompileResult {
+ final PinotQuery _pinotQuery;
+ final PinotQuery _serverPinotQuery;
+ final Schema _schema;
+ final String _tableName;
+ final String _rawTableName;
+ final BrokerResponse _errorOrLiteralOnlyBrokerResponse;
+
+ public CompileResult(PinotQuery pinotQuery, PinotQuery serverPinotQuery,
Schema schema, String tableName,
+ String rawTableName) {
+ _pinotQuery = pinotQuery;
+ _serverPinotQuery = serverPinotQuery;
+ _schema = schema;
+ _tableName = tableName;
+ _rawTableName = rawTableName;
+ _errorOrLiteralOnlyBrokerResponse = null;
+ }
+
+ public CompileResult(BrokerResponse errorOrLiteralOnlyBrokerResponse) {
+ _pinotQuery = null;
+ _serverPinotQuery = null;
+ _schema = null;
+ _tableName = null;
+ _rawTableName = null;
+ _errorOrLiteralOnlyBrokerResponse = errorOrLiteralOnlyBrokerResponse;
+ }
+ }
+
protected BrokerResponse doHandleRequest(long requestId, String query,
SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity,
RequestContext requestContext,
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception {
// Compile the request into PinotQuery
long compilationStartTimeNs = System.nanoTime();
- PinotQuery pinotQuery;
- try {
- pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
- } catch (Exception e) {
- LOGGER.info("Caught exception while compiling SQL request {}: {}, {}",
requestId, query, e.getMessage());
-
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
1);
- requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
- // Check if the query is a v2 supported query
- String database =
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(),
httpHeaders);
- if (ParserUtils.canCompileWithMultiStageEngine(query, database,
_tableCache)) {
- return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
new Exception(
- "It seems that the query is only supported by the multi-stage
query engine, please retry the query using "
- + "the multi-stage query engine "
- +
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)")));
- } else {
- return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
e));
- }
- }
-
- if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) {
- pinotQuery.setLimit(_defaultQueryLimit);
- }
-
- if (isLiteralOnlyQuery(pinotQuery)) {
- LOGGER.debug("Request {} contains only Literal, skipping server query:
{}", requestId, query);
- try {
- if (pinotQuery.isExplain()) {
- // EXPLAIN PLAN results to show that query is evaluated exclusively
by Broker.
- return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
- }
- return processLiteralOnlyQuery(requestId, pinotQuery, requestContext);
- } catch (Exception e) {
- // TODO: refine the exceptions here to early termination the queries
won't requires to send to servers.
- LOGGER.warn("Unable to execute literal request {}: {} at broker,
fallback to server query. {}", requestId,
- query, e.getMessage());
- }
- }
-
- PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
- DataSource dataSource = serverPinotQuery.getDataSource();
- if (dataSource == null) {
- LOGGER.info("Data source (FROM clause) not found in request {}: {}",
requestId, query);
- requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
- return new BrokerResponseNative(
- QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"Data source (FROM clause) not found"));
- }
- if (dataSource.getJoin() != null) {
- LOGGER.info("JOIN is not supported in request {}: {}", requestId, query);
- requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
- return new BrokerResponseNative(
- QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"JOIN is not supported"));
- }
- if (dataSource.getTableName() == null) {
- LOGGER.info("Table name not found in request {}: {}", requestId, query);
- requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
- return new BrokerResponseNative(
- QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"Table name not found"));
- }
-
- try {
- handleSubquery(serverPinotQuery, requestId, request, requesterIdentity,
requestContext, httpHeaders,
- accessControl);
- } catch (Exception e) {
- LOGGER.info("Caught exception while handling the subquery in request {}:
{}, {}", requestId, query,
- e.getMessage());
- requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
- return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
- }
-
- boolean ignoreCase = _tableCache.isIgnoreCase();
- String tableName;
- try {
- tableName =
-
getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(),
httpHeaders, ignoreCase),
- _tableCache);
- } catch (DatabaseConflictException e) {
- LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query);
-
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS,
1);
- requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
- return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
e));
- }
- dataSource.setTableName(tableName);
- String rawTableName = TableNameBuilder.extractRawTableName(tableName);
- requestContext.setTableName(rawTableName);
-
- AuthorizationResult authorizationResult =
- accessControl.authorize(httpHeaders, TargetType.TABLE, tableName,
Actions.Table.QUERY);
-
- if (!authorizationResult.hasAccess()) {
- throwAccessDeniedError(requestId, query, requestContext, tableName,
authorizationResult);
- }
-
- try {
- Map<String, String> columnNameMap =
_tableCache.getColumnNameMap(rawTableName);
- if (columnNameMap != null) {
- updateColumnNames(rawTableName, serverPinotQuery, ignoreCase,
columnNameMap);
- }
- } catch (Exception e) {
- // Throw exceptions with column in-existence error.
- if (e instanceof BadQueryRequestException) {
- LOGGER.info("Caught exception while checking column names in request
{}: {}, {}", requestId, query,
- e.getMessage());
- requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
- _brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
- return new
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
e));
- }
- LOGGER.warn("Caught exception while updating column names in request {}:
{}, {}", requestId, query,
- e.getMessage());
- }
-
- if (_defaultHllLog2m > 0) {
- handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
- }
- if (_enableQueryLimitOverride) {
- handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
- }
- handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
- getSegmentPartitionedColumns(_tableCache, tableName));
- if (_enableDistinctCountBitmapOverride) {
- handleDistinctCountBitmapOverride(serverPinotQuery);
- }
-
- Schema schema = _tableCache.getSchema(rawTableName);
- if (schema != null) {
- handleDistinctMultiValuedOverride(serverPinotQuery, schema);
- }
-
+ CompileResult compileResult =
+ compileRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext, httpHeaders,
+ accessControl);
+
+ if (compileResult._errorOrLiteralOnlyBrokerResponse != null) {
+ /*
+ * If the compileRequest method sets the BrokerResponse field, then it
is either an error response or
+ * a literal-only query. In either case, we can return the response
directly.
+ */
+ return compileResult._errorOrLiteralOnlyBrokerResponse;
+ }
+
+ Schema schema = compileResult._schema;
+ String tableName = compileResult._tableName;
+ String rawTableName = compileResult._rawTableName;
+ PinotQuery pinotQuery = compileResult._pinotQuery;
+ PinotQuery serverPinotQuery = compileResult._serverPinotQuery;
long compilationEndTimeNs = System.nanoTime();
// full request compile time = compilationTimeNs + parserTimeNs
_brokerMetrics.addPhaseTiming(rawTableName,
BrokerQueryPhase.REQUEST_COMPILATION,
@@ -454,7 +379,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
BrokerRequest brokerRequest =
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
BrokerRequest serverBrokerRequest =
serverPinotQuery == pinotQuery ? brokerRequest :
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
- authorizationResult = accessControl.authorize(requesterIdentity,
serverBrokerRequest);
+ AuthorizationResult authorizationResult =
accessControl.authorize(requesterIdentity, serverBrokerRequest);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
System.nanoTime() - compilationEndTimeNs);
@@ -909,6 +834,145 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
return brokerResponse;
}
+ private CompileResult compileRequest(long requestId, String query,
SqlNodeAndOptions sqlNodeAndOptions,
+ JsonNode request, @Nullable RequesterIdentity requesterIdentity,
RequestContext requestContext,
+ @Nullable HttpHeaders httpHeaders, AccessControl accessControl) {
+ PinotQuery pinotQuery;
+ try {
+ pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
+ } catch (Exception e) {
+ LOGGER.info("Caught exception while compiling SQL request {}: {}, {}",
requestId, query, e.getMessage());
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
1);
+ requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+ // Check if the query is a v2 supported query
+ String database =
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(),
httpHeaders);
+ if (ParserUtils.canCompileWithMultiStageEngine(query, database,
_tableCache)) {
+ return new CompileResult(new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
+ new Exception(
+ "It seems that the query is only supported by the multi-stage
query engine, please retry the query "
+ + "using "
+ + "the multi-stage query engine "
+ +
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"))));
+ } else {
+ return new CompileResult(
+ new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
e)));
+ }
+ }
+
+ if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) {
+ pinotQuery.setLimit(_defaultQueryLimit);
+ }
+
+ if (isLiteralOnlyQuery(pinotQuery)) {
+ LOGGER.debug("Request {} contains only Literal, skipping server query:
{}", requestId, query);
+ try {
+ if (pinotQuery.isExplain()) {
+ // EXPLAIN PLAN results to show that query is evaluated exclusively
by Broker.
+ return new
CompileResult(BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT);
+ }
+ return new CompileResult(processLiteralOnlyQuery(requestId,
pinotQuery, requestContext));
+ } catch (Exception e) {
+ // TODO: refine the exceptions here to early termination the queries
won't requires to send to servers.
+ LOGGER.warn("Unable to execute literal request {}: {} at broker,
fallback to server query. {}", requestId,
+ query, e.getMessage());
+ }
+ }
+
+ PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+ DataSource dataSource = serverPinotQuery.getDataSource();
+ if (dataSource == null) {
+ LOGGER.info("Data source (FROM clause) not found in request {}: {}",
requestId, query);
+ requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+ return new CompileResult(new BrokerResponseNative(
+ QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"Data source (FROM clause) not found")));
+ }
+ if (dataSource.getJoin() != null) {
+ LOGGER.info("JOIN is not supported in request {}: {}", requestId, query);
+ requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+ return new CompileResult(new BrokerResponseNative(
+ QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"JOIN is not supported")));
+ }
+ if (dataSource.getTableName() == null) {
+ LOGGER.info("Table name not found in request {}: {}", requestId, query);
+ requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+ return new CompileResult(new BrokerResponseNative(
+ QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"Table name not found")));
+ }
+
+ try {
+ handleSubquery(serverPinotQuery, requestId, request, requesterIdentity,
requestContext, httpHeaders,
+ accessControl);
+ } catch (Exception e) {
+ LOGGER.info("Caught exception while handling the subquery in request {}:
{}, {}", requestId, query,
+ e.getMessage());
+ requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+ return new CompileResult(
+ new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e)));
+ }
+
+ boolean ignoreCase = _tableCache.isIgnoreCase();
+ String tableName;
+ try {
+ tableName =
+
getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(),
httpHeaders, ignoreCase),
+ _tableCache);
+ } catch (DatabaseConflictException e) {
+ LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS,
1);
+ requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+ return new CompileResult(
+ new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
e)));
+ }
+ dataSource.setTableName(tableName);
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ requestContext.setTableName(rawTableName);
+
+ AuthorizationResult authorizationResult =
+ accessControl.authorize(httpHeaders, TargetType.TABLE, tableName,
Actions.Table.QUERY);
+
+ if (!authorizationResult.hasAccess()) {
+ throwAccessDeniedError(requestId, query, requestContext, tableName,
authorizationResult);
+ }
+
+ try {
+ Map<String, String> columnNameMap =
_tableCache.getColumnNameMap(rawTableName);
+ if (columnNameMap != null) {
+ updateColumnNames(rawTableName, serverPinotQuery, ignoreCase,
columnNameMap);
+ }
+ } catch (Exception e) {
+ // Throw exceptions with column in-existence error.
+ if (e instanceof BadQueryRequestException) {
+ LOGGER.info("Caught exception while checking column names in request
{}: {}, {}", requestId, query,
+ e.getMessage());
+ requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
+ _brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
+ return new CompileResult(
+ new
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
e)));
+ }
+ LOGGER.warn("Caught exception while updating column names in request {}:
{}, {}", requestId, query,
+ e.getMessage());
+ }
+
+ if (_defaultHllLog2m > 0) {
+ handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
+ }
+ if (_enableQueryLimitOverride) {
+ handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
+ }
+ handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
+ getSegmentPartitionedColumns(_tableCache, tableName));
+ if (_enableDistinctCountBitmapOverride) {
+ handleDistinctCountBitmapOverride(serverPinotQuery);
+ }
+
+ Schema schema = _tableCache.getSchema(rawTableName);
+ if (schema != null) {
+ handleDistinctMultiValuedOverride(serverPinotQuery, schema);
+ }
+
+ return new CompileResult(pinotQuery, serverPinotQuery, schema, tableName,
rawTableName);
+ }
+
private void throwAccessDeniedError(long requestId, String query,
RequestContext requestContext, String tableName,
AuthorizationResult authorizationResult) {
_brokerMetrics.addMeteredTableValue(tableName,
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]