This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch throw-exception-when-column-mismatch in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e0987e566749cf93c2a60f36eb32b241b3b3374b Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Mon Sep 28 11:24:53 2020 -0700 Add option to fail query when column mismatches --- .../requesthandler/BaseBrokerRequestHandler.java | 99 +++++++++++++++------- .../pinot/common/exception/QueryException.java | 8 +- .../pinot/common/utils/helix/TableCache.java | 12 +-- .../pinot/pql/parsers/pql2/ast/TopAstNode.java | 2 +- .../api/resources/PinotQueryResource.java | 2 +- .../tests/OfflineClusterIntegrationTest.java | 14 ++- 6 files changed, 95 insertions(+), 42 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 4925781..1e9f0b7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -46,6 +46,7 @@ import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.RoutingManager; import org.apache.pinot.broker.routing.RoutingTable; import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo; +import org.apache.pinot.common.Utils; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.function.AggregationFunctionType; import org.apache.pinot.common.metrics.BrokerMeter; @@ -74,6 +75,7 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.helix.TableCache; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; +import org.apache.pinot.core.query.exception.BadQueryRequestException; import org.apache.pinot.core.query.reduce.BrokerReduceService; import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer; import org.apache.pinot.core.requesthandler.PinotQueryParserFactory; @@ -182,8 +184,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } catch (Exception e) { LOGGER.info("Caught exception while compiling request {}: {}, {}", requestId, query, e.getMessage()); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); - requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE); - return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e)); + requestStatistics.setErrorCode(QueryException.QUERY_PARSING_ERROR_CODE); + return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_PARSING_ERROR, e)); } if (isLiteralOnlyQuery(brokerRequest)) { LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); @@ -196,9 +198,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { e.getMessage()); } } + + // Set extra settings into broker request + setOptions(requestId, query, request, brokerRequest); + updateTableName(brokerRequest); try { updateColumnNames(brokerRequest); + } catch(BadQueryRequestException be) { + return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_PARSING_ERROR, be)); } catch (Exception e) { LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e); } @@ -283,9 +291,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); } - // Set extra settings into broker request - setOptions(requestId, query, request, brokerRequest); - // Optimize the query // TODO: get time column name from schema or table config so that we can apply it for REALTIME only case // We get timeColumnName from time boundary service currently, which only exists for offline table @@ -739,12 +744,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName()); Map<String, String> columnNameMap = _tableCache.isCaseInsensitive() ? _tableCache.getColumnNameMap(rawTableName) : null; + Map<String, String> queryOptions = brokerRequest.getQueryOptions(); + boolean failQueryWhenColumnMismatch = + (queryOptions != null && Boolean.parseBoolean(queryOptions.get("failQueryWhenColumnMismatch"))); if (brokerRequest.getFilterSubQueryMap() != null) { Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values(); for (FilterQuery filterQuery : values) { if (filterQuery.getNestedFilterQueryIdsSize() == 0) { - filterQuery.setColumn(fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap)); + filterQuery.setColumn( + fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap, failQueryWhenColumnMismatch)); } } } @@ -753,14 +762,22 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) { // Always read from backward compatible api in AggregationFunctionUtils. List<String> arguments = AggregationFunctionUtils.getArguments(info); - arguments.replaceAll(e -> fixColumnName(rawTableName, e, columnNameMap)); + arguments.replaceAll(e -> { + try { + return fixColumnName(rawTableName, e, columnNameMap, failQueryWhenColumnMismatch); + } catch (Exception ex) { + Utils.rethrowException(ex); + throw new AssertionError("Should not reach this"); + } + }); info.setExpressions(arguments); } } if (brokerRequest.isSetGroupBy()) { List<String> expressions = brokerRequest.getGroupBy().getExpressions(); for (int i = 0; i < expressions.size(); i++) { - expressions.set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap)); + expressions + .set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap, failQueryWhenColumnMismatch)); } } } else { @@ -769,7 +786,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { for (int i = 0; i < selectionColumns.size(); i++) { String expression = selectionColumns.get(i); if (!expression.equals("*")) { - selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap)); + selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch)); } } } @@ -777,78 +794,98 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { List<SelectionSort> orderBy = brokerRequest.getOrderBy(); for (SelectionSort selectionSort : orderBy) { String expression = selectionSort.getColumn(); - selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap)); + selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch)); } } PinotQuery pinotQuery = brokerRequest.getPinotQuery(); if (pinotQuery != null) { for (Expression expression : pinotQuery.getSelectList()) { - fixColumnName(rawTableName, expression, columnNameMap); + fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch); } Expression filterExpression = pinotQuery.getFilterExpression(); if (filterExpression != null) { - fixColumnName(rawTableName, filterExpression, columnNameMap); + fixColumnName(rawTableName, filterExpression, columnNameMap, failQueryWhenColumnMismatch); } List<Expression> groupByList = pinotQuery.getGroupByList(); if (groupByList != null) { for (Expression expression : groupByList) { - fixColumnName(rawTableName, expression, columnNameMap); + fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch); } } List<Expression> orderByList = pinotQuery.getOrderByList(); if (orderByList != null) { for (Expression expression : orderByList) { - fixColumnName(rawTableName, expression, columnNameMap); + fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch); } } Expression havingExpression = pinotQuery.getHavingExpression(); if (havingExpression != null) { - fixColumnName(rawTableName, havingExpression, columnNameMap); + fixColumnName(rawTableName, havingExpression, columnNameMap, failQueryWhenColumnMismatch); } } } - private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap) { + private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap, + boolean failQueryWhenColumnMismatch) { TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression); - fixColumnName(rawTableName, expressionTree, columnNameMap); + fixColumnName(rawTableName, expressionTree, columnNameMap, failQueryWhenColumnMismatch); return expressionTree.toString(); } private void fixColumnName(String rawTableName, TransformExpressionTree expression, - @Nullable Map<String, String> columnNameMap) { + @Nullable Map<String, String> columnNameMap, boolean failQueryWhenColumnMismatch) { TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType(); if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) { - expression.setValue(getActualColumnName(rawTableName, expression.getValue(), columnNameMap)); + expression.setValue( + getActualColumnName(rawTableName, expression.getValue(), columnNameMap, failQueryWhenColumnMismatch)); } else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) { for (TransformExpressionTree child : expression.getChildren()) { - fixColumnName(rawTableName, child, columnNameMap); + fixColumnName(rawTableName, child, columnNameMap, failQueryWhenColumnMismatch); } } } - private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap) { + private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap, + boolean throwExceptionWhenColumnNameMismatch) { ExpressionType expressionType = expression.getType(); if (expressionType == ExpressionType.IDENTIFIER) { Identifier identifier = expression.getIdentifier(); - identifier.setName(getActualColumnName(rawTableName, identifier.getName(), columnNameMap)); + identifier.setName( + getActualColumnName(rawTableName, identifier.getName(), columnNameMap, throwExceptionWhenColumnNameMismatch)); } else if (expressionType == ExpressionType.FUNCTION) { - for (Expression operand : expression.getFunctionCall().getOperands()) { - fixColumnName(rawTableName, operand, columnNameMap); + String operator = expression.getFunctionCall().getOperator(); + List<Expression> expressions = expression.getFunctionCall().getOperands(); + if ("AS".equals(operator)) { + fixColumnName(rawTableName, expressions.get(0), columnNameMap, throwExceptionWhenColumnNameMismatch); + } else if (!isCountStarFromExpression(expression)) { + for (Expression operand : expression.getFunctionCall().getOperands()) { + fixColumnName(rawTableName, operand, columnNameMap, throwExceptionWhenColumnNameMismatch); + } } } } - private String getActualColumnName(String rawTableName, String columnName, - @Nullable Map<String, String> columnNameMap) { + private boolean isCountStarFromExpression(Expression expression) { + String operator = expression.getFunctionCall().getOperator(); + List<Expression> expressions = expression.getFunctionCall().getOperands(); + return "COUNT".equals(operator) && expressions.size() == 1 && "*" + .equals(expressions.get(0).getIdentifier().getName()); + } + + private String getActualColumnName(String rawTableName, String columnName, Map<String, String> columnNameMap, + boolean failQueryWhenColumnMismatch) { // Check if column is in the format of [table_name].[column_name] String[] splits = StringUtils.split(columnName, ".", 2); if (_tableCache.isCaseInsensitive()) { if (splits.length == 2 && rawTableName.equalsIgnoreCase(splits[0])) { columnName = splits[1]; } - if (columnNameMap != null) { - return columnNameMap.getOrDefault(columnName, columnName); + String actualColumnName = columnNameMap.get(columnName.toLowerCase()); + if (actualColumnName != null) { + return actualColumnName; + } else if (failQueryWhenColumnMismatch) { + throw new BadQueryRequestException("Invalid column name in the query: " + columnName); } else { return columnName; } @@ -856,7 +893,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (splits.length == 2 && rawTableName.equals(splits[0])) { columnName = splits[1]; } - return columnName; + if (!columnNameMap.containsKey(columnName) && failQueryWhenColumnMismatch) { + throw new BadQueryRequestException("Invalid column name in the query: " + columnName); + } else { + return columnName; + } } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index 10a1544..7e05582 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -37,7 +37,7 @@ public class QueryException { // TODO: several ProcessingExceptions are never used, clean them up. public static final int JSON_PARSING_ERROR_CODE = 100; public static final int JSON_COMPILATION_ERROR_CODE = 101; - public static final int PQL_PARSING_ERROR_CODE = 150; + public static final int QUERY_PARSING_ERROR_CODE = 150; public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160; public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170; public static final int ACCESS_DENIED_ERROR_CODE = 180; @@ -66,7 +66,7 @@ public class QueryException { public static final ProcessingException JSON_PARSING_ERROR = new ProcessingException(JSON_PARSING_ERROR_CODE); public static final ProcessingException JSON_COMPILATION_ERROR = new ProcessingException(JSON_COMPILATION_ERROR_CODE); - public static final ProcessingException PQL_PARSING_ERROR = new ProcessingException(PQL_PARSING_ERROR_CODE); + public static final ProcessingException QUERY_PARSING_ERROR = new ProcessingException(QUERY_PARSING_ERROR_CODE); public static final ProcessingException ACCESS_DENIED_ERROR = new ProcessingException(ACCESS_DENIED_ERROR_CODE); public static final ProcessingException SEGMENT_PLAN_EXECUTION_ERROR = new ProcessingException(SEGMENT_PLAN_EXECUTION_ERROR_CODE); @@ -105,7 +105,7 @@ public class QueryException { static { JSON_PARSING_ERROR.setMessage("JsonParsingError"); JSON_COMPILATION_ERROR.setMessage("JsonCompilationError"); - PQL_PARSING_ERROR.setMessage("PQLParsingError"); + QUERY_PARSING_ERROR.setMessage("QueryParsingError"); SEGMENT_PLAN_EXECUTION_ERROR.setMessage("SegmentPlanExecutionError"); COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR.setMessage("CombineSegmentPlanTimeoutError"); QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError"); @@ -168,7 +168,7 @@ public class QueryException { case QueryException.JSON_COMPILATION_ERROR_CODE: case QueryException.JSON_PARSING_ERROR_CODE: case QueryException.QUERY_VALIDATION_ERROR_CODE: - case QueryException.PQL_PARSING_ERROR_CODE: + case QueryException.QUERY_PARSING_ERROR_CODE: case QueryException.TOO_MANY_REQUESTS_ERROR_CODE: return true; default: diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java index dbd360d..74bb3e8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java @@ -216,15 +216,15 @@ public class TableCache { throws IOException { Schema schema = SchemaUtils.fromZNRecord(znRecord); String rawTableName = schema.getSchemaName(); - if (_caseInsensitive) { - Map<String, String> columnNameMap = new HashMap<>(); - for (String columnName : schema.getColumnNames()) { + Map<String, String> columnNameMap = new HashMap<>(); + for (String columnName : schema.getColumnNames()) { + if (_caseInsensitive) { columnNameMap.put(columnName.toLowerCase(), columnName); + } else { + columnNameMap.put(columnName, null); } - _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap)); - } else { - _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null)); } + _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap)); } private void removeSchema(String path) { diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java index 4efe455..7c6f617 100644 --- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java +++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java @@ -30,7 +30,7 @@ public class TopAstNode extends BaseAstNode { public TopAstNode(int count) { if (count < 0) { - throw new RuntimeException(QueryException.PQL_PARSING_ERROR); + throw new RuntimeException(QueryException.QUERY_PARSING_ERROR); } if (count == 0) { _count = DEFAULT_TOP_N; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java index 004f863..55c9a35 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java @@ -167,7 +167,7 @@ public class PinotQueryResource { brokerRequest.getQuerySource().setTableName(_pinotHelixResourceManager.getActualTableName(inputTableName)); } catch (Exception e) { LOGGER.error("Caught exception while compiling {} query: {}", querySyntax.toUpperCase(), query, e); - return QueryException.getException(QueryException.PQL_PARSING_ERROR, e).toString(); + return QueryException.getException(QueryException.QUERY_PARSING_ERROR, e).toString(); } String tableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 790968a..bfa5b37 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -59,6 +59,7 @@ import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -504,6 +505,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet queryResponse = postQuery(SELECT_STAR_QUERY); assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); assertEquals(queryResponse.get("selectionResults").get("columns").size(), 79); + + // The query would fail if 'failQueryWhenColumnMismatch' is set true and the query tried to query a non-existing column. + ObjectNode payload = JsonUtils.newObjectNode(); + payload.put("sql", TEST_DEFAULT_COLUMNS_QUERY); + payload.put("queryOptions", "failQueryWhenColumnMismatch=true"); + queryResponse = JsonUtils.stringToJsonNode(sendPostRequest(_brokerBaseApiUrl + "/query", payload.toString())); + Assert.assertEquals(queryResponse.get("totalDocs").asLong(), 0); + assertNotNull(queryResponse.get("exceptions")); } private void reloadDefaultColumns(boolean withExtraColumns) @@ -532,6 +541,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet try { JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY); // Total docs should not change during reload + if (queryResponse.get("totalDocs").asLong() == 0) { + return false; + } assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); long count = queryResponse.get("aggregationResults").get(0).get("value").asLong(); if (withExtraColumns) { @@ -865,7 +877,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet caseStatementBuilder.append(String.format("WHEN origin = '%s' THEN %d ", origins.get(i), i + 1)); } caseStatementBuilder.append("ELSE 0 END"); - String sqlQuery = "SELECT origin, " + caseStatementBuilder + " AS origin_code FROM mytable LIMIT 1000"; + String sqlQuery = "SELECT origin, " + caseStatementBuilder + " as origin_code, AirlineID as aID FROM mytable LIMIT 1000"; JsonNode response = postSqlQuery(sqlQuery, _brokerBaseApiUrl); JsonNode rows = response.get("resultTable").get("rows"); assertEquals(response.get("exceptions").size(), 0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org