jasperjiaguo commented on code in PR #10171:
URL: https://github.com/apache/pinot/pull/10171#discussion_r1092713956
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -258,435 +259,447 @@ private BrokerResponseNative handleRequest(long
requestId, String query,
throws Exception {
LOGGER.debug("SQL query for request {}: {}", requestId, query);
- long compilationStartTimeNs;
- PinotQuery pinotQuery;
+ Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId));
+
try {
- // Parse the request
- sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions :
RequestUtils.parseQuery(query, request);
- // Compile the request into PinotQuery
- compilationStartTimeNs = System.nanoTime();
- pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
- } catch (Exception e) {
- LOGGER.info("Caught exception while compiling SQL request {}: {}, {}",
requestId, query, e.getMessage());
-
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
1);
- requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
- return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
e));
- }
-
- if (isLiteralOnlyQuery(pinotQuery)) {
- LOGGER.debug("Request {} contains only Literal, skipping server query:
{}", requestId, query);
+ long compilationStartTimeNs;
+ PinotQuery pinotQuery;
try {
- if (pinotQuery.isExplain()) {
- // EXPLAIN PLAN results to show that query is evaluated exclusively
by Broker.
- return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
- }
- return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs,
requestContext);
+ // Parse the request
+ sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions :
RequestUtils.parseQuery(query, request);
+ // Compile the request into PinotQuery
+ compilationStartTimeNs = System.nanoTime();
+ pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
} catch (Exception e) {
- // TODO: refine the exceptions here to early termination the queries
won't requires to send to servers.
- LOGGER.warn("Unable to execute literal request {}: {} at broker,
fallback to server query. {}", requestId,
- query, e.getMessage());
+ LOGGER.info("Caught exception while compiling SQL request {}: {}, {}",
requestId, query, e.getMessage());
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
1);
+ requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+ return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
e));
}
- }
-
- PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
- if (serverPinotQuery.getDataSource() == null) {
- LOGGER.info("Data source (FROM clause) not found in request {}: {}",
request, query);
- requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
- return new BrokerResponseNative(
- QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"Data source (FROM clause) not found"));
- }
- try {
- handleSubquery(serverPinotQuery, requestId, request, requesterIdentity,
requestContext);
- } catch (Exception e) {
- LOGGER.info("Caught exception while handling the subquery in request {}:
{}, {}", requestId, query,
- e.getMessage());
- requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
- return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
- }
+ if (isLiteralOnlyQuery(pinotQuery)) {
+ LOGGER.debug("Request {} contains only Literal, skipping server query:
{}", requestId, query);
+ try {
+ if (pinotQuery.isExplain()) {
+ // EXPLAIN PLAN results to show that query is evaluated
exclusively by Broker.
+ return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+ }
+ return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs,
requestContext);
+ } catch (Exception e) {
+ // TODO: refine the exceptions here to early termination the queries
won't requires to send to servers.
+ LOGGER.warn("Unable to execute literal request {}: {} at broker,
fallback to server query. {}", requestId,
+ query, e.getMessage());
+ }
+ }
- String tableName =
getActualTableName(serverPinotQuery.getDataSource().getTableName(),
_tableCache);
- serverPinotQuery.getDataSource().setTableName(tableName);
- String rawTableName = TableNameBuilder.extractRawTableName(tableName);
- requestContext.setTableName(rawTableName);
+ PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+ if (serverPinotQuery.getDataSource() == null) {
+ LOGGER.info("Data source (FROM clause) not found in request {}: {}",
request, query);
+
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+ return new BrokerResponseNative(
+ QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
"Data source (FROM clause) not found"));
+ }
- try {
- boolean isCaseInsensitive = _tableCache.isIgnoreCase();
- Map<String, String> columnNameMap =
_tableCache.getColumnNameMap(rawTableName);
- if (columnNameMap != null) {
- updateColumnNames(rawTableName, serverPinotQuery, isCaseInsensitive,
columnNameMap);
- }
- } catch (Exception e) {
- // Throw exceptions with column in-existence error.
- if (e instanceof BadQueryRequestException) {
- LOGGER.info("Caught exception while checking column names in request
{}: {}, {}", requestId, query,
+ try {
+ handleSubquery(serverPinotQuery, requestId, request,
requesterIdentity, requestContext);
+ } catch (Exception e) {
+ LOGGER.info("Caught exception while handling the subquery in request
{}: {}, {}", requestId, query,
e.getMessage());
- requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
- _brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
- return new
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
e));
- }
- LOGGER.warn("Caught exception while updating column names in request {}:
{}, {}", requestId, query,
- e.getMessage());
- }
- if (_defaultHllLog2m > 0) {
- handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
- }
- if (_enableQueryLimitOverride) {
- handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
- }
- handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
- getSegmentPartitionedColumns(_tableCache, tableName));
- if (_enableDistinctCountBitmapOverride) {
- handleDistinctCountBitmapOverride(serverPinotQuery);
- }
-
- long compilationEndTimeNs = System.nanoTime();
- // full request compile time = compilationTimeNs + parserTimeNs
- _brokerMetrics.addPhaseTiming(rawTableName,
BrokerQueryPhase.REQUEST_COMPILATION,
- (compilationEndTimeNs - compilationStartTimeNs) +
sqlNodeAndOptions.getParseTimeNs());
-
- // Second-stage table-level access control
- // TODO: Modify AccessControl interface to directly take PinotQuery
- BrokerRequest brokerRequest =
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
- BrokerRequest serverBrokerRequest =
- serverPinotQuery == pinotQuery ? brokerRequest :
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
- boolean hasTableAccess =
_accessControlFactory.create().hasAccess(requesterIdentity,
serverBrokerRequest);
- if (!hasTableAccess) {
- _brokerMetrics.addMeteredTableValue(tableName,
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
- LOGGER.info("Access denied for request {}: {}, table: {}", requestId,
query, tableName);
- requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
- return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
- }
- _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
- System.nanoTime() - compilationEndTimeNs);
+ requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+ return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
+ }
+
+ String tableName =
getActualTableName(serverPinotQuery.getDataSource().getTableName(),
_tableCache);
+ serverPinotQuery.getDataSource().setTableName(tableName);
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ requestContext.setTableName(rawTableName);
- // Get the tables hit by the request
- String offlineTableName = null;
- String realtimeTableName = null;
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType == TableType.OFFLINE) {
- // Offline table
- if (_routingManager.routingExists(tableName)) {
- offlineTableName = tableName;
+ try {
+ boolean isCaseInsensitive = _tableCache.isIgnoreCase();
+ Map<String, String> columnNameMap =
_tableCache.getColumnNameMap(rawTableName);
+ if (columnNameMap != null) {
+ updateColumnNames(rawTableName, serverPinotQuery, isCaseInsensitive,
columnNameMap);
+ }
+ } catch (Exception e) {
+ // Throw exceptions with column in-existence error.
+ if (e instanceof BadQueryRequestException) {
+ LOGGER.info("Caught exception while checking column names in request
{}: {}, {}", requestId, query,
+ e.getMessage());
+
requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
+ _brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
+ return new
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
e));
+ }
+ LOGGER.warn("Caught exception while updating column names in request
{}: {}, {}", requestId, query,
+ e.getMessage());
}
- } else if (tableType == TableType.REALTIME) {
- // Realtime table
- if (_routingManager.routingExists(tableName)) {
- realtimeTableName = tableName;
+ if (_defaultHllLog2m > 0) {
+ handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
}
- } else {
- // Hybrid table (check both OFFLINE and REALTIME)
- String offlineTableNameToCheck =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
- if (_routingManager.routingExists(offlineTableNameToCheck)) {
- offlineTableName = offlineTableNameToCheck;
+ if (_enableQueryLimitOverride) {
+ handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
}
- String realtimeTableNameToCheck =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
- if (_routingManager.routingExists(realtimeTableNameToCheck)) {
- realtimeTableName = realtimeTableNameToCheck;
+ handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
+ getSegmentPartitionedColumns(_tableCache, tableName));
+ if (_enableDistinctCountBitmapOverride) {
+ handleDistinctCountBitmapOverride(serverPinotQuery);
}
- }
- TableConfig offlineTableConfig =
-
_tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
- TableConfig realtimeTableConfig =
-
_tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
+ long compilationEndTimeNs = System.nanoTime();
+ // full request compile time = compilationTimeNs + parserTimeNs
+ _brokerMetrics.addPhaseTiming(rawTableName,
BrokerQueryPhase.REQUEST_COMPILATION,
+ (compilationEndTimeNs - compilationStartTimeNs) +
sqlNodeAndOptions.getParseTimeNs());
+
+ // Second-stage table-level access control
+ // TODO: Modify AccessControl interface to directly take PinotQuery
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
+ BrokerRequest serverBrokerRequest =
+ serverPinotQuery == pinotQuery ? brokerRequest :
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
+ boolean hasTableAccess =
_accessControlFactory.create().hasAccess(requesterIdentity,
serverBrokerRequest);
+ if (!hasTableAccess) {
+ _brokerMetrics.addMeteredTableValue(tableName,
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
+ LOGGER.info("Access denied for request {}: {}, table: {}", requestId,
query, tableName);
+ requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+ return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
+ }
+ _brokerMetrics.addPhaseTiming(rawTableName,
BrokerQueryPhase.AUTHORIZATION,
+ System.nanoTime() - compilationEndTimeNs);
+
+ // Get the tables hit by the request
+ String offlineTableName = null;
+ String realtimeTableName = null;
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType == TableType.OFFLINE) {
+ // Offline table
+ if (_routingManager.routingExists(tableName)) {
+ offlineTableName = tableName;
+ }
+ } else if (tableType == TableType.REALTIME) {
+ // Realtime table
+ if (_routingManager.routingExists(tableName)) {
+ realtimeTableName = tableName;
+ }
+ } else {
+ // Hybrid table (check both OFFLINE and REALTIME)
+ String offlineTableNameToCheck =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+ if (_routingManager.routingExists(offlineTableNameToCheck)) {
+ offlineTableName = offlineTableNameToCheck;
+ }
+ String realtimeTableNameToCheck =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ if (_routingManager.routingExists(realtimeTableNameToCheck)) {
+ realtimeTableName = realtimeTableNameToCheck;
+ }
+ }
- if (offlineTableName == null && realtimeTableName == null) {
- // No table matches the request
- if (realtimeTableConfig == null && offlineTableConfig == null) {
- LOGGER.info("Table not found for request {}: {}", requestId, query);
-
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
- return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+ TableConfig offlineTableConfig =
+
_tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
+ TableConfig realtimeTableConfig =
+
_tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
+
+ if (offlineTableName == null && realtimeTableName == null) {
+ // No table matches the request
+ if (realtimeTableConfig == null && offlineTableConfig == null) {
+ LOGGER.info("Table not found for request {}: {}", requestId, query);
+
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
+ return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+ }
+ LOGGER.info("No table matches for request {}: {}", requestId, query);
+
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS,
1);
+ return BrokerResponseNative.NO_TABLE_RESULT;
}
- LOGGER.info("No table matches for request {}: {}", requestId, query);
-
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
-
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS,
1);
- return BrokerResponseNative.NO_TABLE_RESULT;
- }
- // Handle query rewrite that can be overridden by the table configs
- if (offlineTableName == null) {
- offlineTableConfig = null;
- }
- if (realtimeTableName == null) {
- realtimeTableConfig = null;
- }
- HandlerContext handlerContext = getHandlerContext(offlineTableConfig,
realtimeTableConfig);
- if (handlerContext._disableGroovy) {
- rejectGroovyQuery(serverPinotQuery);
- }
- if (handlerContext._useApproximateFunction) {
- handleApproximateFunctionOverride(serverPinotQuery);
- }
+ // Handle query rewrite that can be overridden by the table configs
Review Comment:
You can probably view this class using the omit whitespace change mode. It
show correctly in intellij, I guess some bug with github default diff view.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]