This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9f22322 Adding column name rewrite for the identifiers in the format
of [table_name].[column_name] (#5734)
9f22322 is described below
commit 9f22322132fccd8ca1c777a8568962952daa27a3
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Jul 25 11:51:11 2020 -0700
Adding column name rewrite for the identifiers in the format of
[table_name].[column_name] (#5734)
---
.../requesthandler/BaseBrokerRequestHandler.java | 90 +++++++++++++---------
.../tests/OfflineClusterIntegrationTest.java | 73 ++++++++++++++----
2 files changed, 115 insertions(+), 48 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 9654554..e442b5d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -202,12 +202,10 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
}
updateQuerySource(brokerRequest);
- if (_enableCaseInsensitive) {
- try {
- handleCaseSensitivity(brokerRequest);
- } catch (Exception e) {
- LOGGER.warn("Caught exception while rewriting PQL to make it
case-insensitive {}: {}, {}", requestId, query, e);
- }
+ try {
+ updateColumnNames(brokerRequest);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while updating Column names in Query {}:
{}, {}", requestId, query, e);
}
if (_defaultHllLog2m > 0) {
handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m);
@@ -450,18 +448,22 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
private void updateQuerySource(BrokerRequest brokerRequest) {
String tableName = brokerRequest.getQuerySource().getTableName();
// Check if table is in the format of [database_name].[table_name]
- String[] tableNameSplits = StringUtils.split(tableName, '.');
- if (tableNameSplits.length != 2) {
- return;
- }
+ String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
// Update table name if there is no existing table in the format of
[database_name].[table_name] but only [table_name]
if (_enableCaseInsensitive) {
+ if (tableNameSplits.length < 2) {
+
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableName));
+ return;
+ }
if (_tableCache.containsTable(tableNameSplits[1]) &&
!_tableCache.containsTable(tableName)) {
// Use TableCache to check case insensitive table name.
- brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableNameSplits[1]));
}
return;
}
+ if (tableNameSplits.length < 2) {
+ return;
+ }
// Use RoutingManager to check case sensitive table name.
if (TableNameBuilder.isTableResource(tableName)) {
if (_routingManager.routingExists(tableNameSplits[1]) &&
!_routingManager.routingExists(tableName)) {
@@ -667,19 +669,17 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
/**
- * Fixes the case-insensitive column names to the actual column names in the
given broker request.
+ * Fixes the column names to the actual column names in the given broker
request.
*/
- private void handleCaseSensitivity(BrokerRequest brokerRequest) {
- String inputTableName = brokerRequest.getQuerySource().getTableName();
- String actualTableName = _tableCache.getActualTableName(inputTableName);
- brokerRequest.getQuerySource().setTableName(actualTableName);
+ private void updateColumnNames(BrokerRequest brokerRequest) {
+ String tableName = brokerRequest.getQuerySource().getTableName();
//fix columns
if (brokerRequest.getFilterSubQueryMap() != null) {
Collection<FilterQuery> values =
brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
for (FilterQuery filterQuery : values) {
if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
String expression = filterQuery.getColumn();
- filterQuery.setColumn(fixColumnNameCase(actualTableName,
expression));
+ filterQuery.setColumn(fixColumnName(tableName, expression));
}
}
}
@@ -688,14 +688,14 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
if
(!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName()))
{
// Always read from backward compatible api in
AggregationFunctionUtils.
List<String> arguments = AggregationFunctionUtils.getArguments(info);
- arguments.replaceAll(e -> fixColumnNameCase(actualTableName, e));
+ arguments.replaceAll(e -> fixColumnName(tableName, e));
info.setExpressions(arguments);
}
}
if (brokerRequest.isSetGroupBy()) {
List<String> expressions = brokerRequest.getGroupBy().getExpressions();
for (int i = 0; i < expressions.size(); i++) {
- expressions.set(i, fixColumnNameCase(actualTableName,
expressions.get(i)));
+ expressions.set(i, fixColumnName(tableName, expressions.get(i)));
}
}
} else {
@@ -704,7 +704,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
for (int i = 0; i < selectionColumns.size(); i++) {
String expression = selectionColumns.get(i);
if (!expression.equals("*")) {
- selectionColumns.set(i, fixColumnNameCase(actualTableName,
expression));
+ selectionColumns.set(i, fixColumnName(tableName, expression));
}
}
}
@@ -712,66 +712,86 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
List<SelectionSort> orderBy = brokerRequest.getOrderBy();
for (SelectionSort selectionSort : orderBy) {
String expression = selectionSort.getColumn();
- selectionSort.setColumn(fixColumnNameCase(actualTableName,
expression));
+ selectionSort.setColumn(fixColumnName(tableName, expression));
}
}
PinotQuery pinotQuery = brokerRequest.getPinotQuery();
if (pinotQuery != null) {
- pinotQuery.getDataSource().setTableName(actualTableName);
+ pinotQuery.getDataSource().setTableName(tableName);
for (Expression expression : pinotQuery.getSelectList()) {
- fixColumnNameCase(actualTableName, expression);
+ fixColumnName(tableName, expression);
}
Expression filterExpression = pinotQuery.getFilterExpression();
if (filterExpression != null) {
- fixColumnNameCase(actualTableName, filterExpression);
+ fixColumnName(tableName, filterExpression);
}
List<Expression> groupByList = pinotQuery.getGroupByList();
if (groupByList != null) {
for (Expression expression : groupByList) {
- fixColumnNameCase(actualTableName, expression);
+ fixColumnName(tableName, expression);
}
}
List<Expression> orderByList = pinotQuery.getOrderByList();
if (orderByList != null) {
for (Expression expression : orderByList) {
- fixColumnNameCase(actualTableName, expression);
+ fixColumnName(tableName, expression);
}
}
Expression havingExpression = pinotQuery.getHavingExpression();
if (havingExpression != null) {
- fixColumnNameCase(actualTableName, havingExpression);
+ fixColumnName(tableName, havingExpression);
}
}
}
- private String fixColumnNameCase(String tableNameWithType, String
expression) {
+ private String fixColumnName(String tableNameWithType, String expression) {
TransformExpressionTree expressionTree =
TransformExpressionTree.compileToExpressionTree(expression);
- fixColumnNameCase(tableNameWithType, expressionTree);
+ fixColumnName(tableNameWithType, expressionTree);
return expressionTree.toString();
}
- private void fixColumnNameCase(String tableNameWithType,
TransformExpressionTree expression) {
+ private void fixColumnName(String tableNameWithType, TransformExpressionTree
expression) {
TransformExpressionTree.ExpressionType expressionType =
expression.getExpressionType();
if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
- expression.setValue(_tableCache.getActualColumnName(tableNameWithType,
expression.getValue()));
+ String identifier = expression.getValue();
+ expression.setValue(getActualColumnName(tableNameWithType, identifier));
} else if (expressionType ==
TransformExpressionTree.ExpressionType.FUNCTION) {
for (TransformExpressionTree child : expression.getChildren()) {
- fixColumnNameCase(tableNameWithType, child);
+ fixColumnName(tableNameWithType, child);
}
}
}
- private void fixColumnNameCase(String tableNameWithType, Expression
expression) {
+ private void fixColumnName(String tableNameWithType, Expression expression) {
ExpressionType expressionType = expression.getType();
if (expressionType == ExpressionType.IDENTIFIER) {
Identifier identifier = expression.getIdentifier();
- identifier.setName(_tableCache.getActualColumnName(tableNameWithType,
identifier.getName()));
+ identifier.setName(getActualColumnName(tableNameWithType,
identifier.getName()));
} else if (expressionType == ExpressionType.FUNCTION) {
for (Expression operand : expression.getFunctionCall().getOperands()) {
- fixColumnNameCase(tableNameWithType, operand);
+ fixColumnName(tableNameWithType, operand);
+ }
+ }
+ }
+
+ private String getActualColumnName(String tableNameWithType, String
columnName) {
+ String[] splits = StringUtils.split(columnName, ".", 2);
+ if (_enableCaseInsensitive) {
+ if (splits.length == 2) {
+ if
(TableNameBuilder.extractRawTableName(tableNameWithType).equalsIgnoreCase(splits[0]))
{
+ return _tableCache.getActualColumnName(tableNameWithType, splits[1]);
+ }
+ }
+ return _tableCache.getActualColumnName(tableNameWithType, columnName);
+ } else {
+ if (splits.length == 2) {
+ if
(TableNameBuilder.extractRawTableName(tableNameWithType).equals(splits[0])) {
+ return splits[1];
+ }
}
}
+ return columnName;
}
private static Map<String, String> getOptionsFromJson(JsonNode request,
String optionsKey) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 9a7e49d..9827001 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1159,24 +1159,71 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM
mytable",
"SELECT COUNT(*) FROM mytable GROUP BY
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
List<String> queries = new ArrayList<>();
- baseQueries.stream().forEach(q -> queries.add(q.replace("mytable",
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
- baseQueries.stream().forEach(q -> queries.add(q.replace("mytable",
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+ baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+ baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
- // Wait for at most 10 seconds for broker to get the ZK callback of the
schema change
- TestUtils.waitForCondition(aVoid -> {
+ for (String query : queries) {
try {
- for (String query : queries) {
- JsonNode response = postQuery(query);
- // NOTE: When table does not exist, we will get
'BrokerResourceMissingError'.
- // When column does not exist, all segments will be pruned and
'numSegmentsProcessed' will be 0.
- return response.get("exceptions").size() == 0 &&
response.get("numSegmentsProcessed").asInt() > 0;
- }
+ postQuery(query);
} catch (Exception e) {
// Fail the test when exception caught
- throw new RuntimeException(e);
+ throw new RuntimeException("Got Exceptions from query - " + query);
+ }
+ }
+ }
+
+ @Test
+ public void testColumnNameContainsTableName() {
+ int daysSinceEpoch = 16138;
+ long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+ List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS')
FROM mytable",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS')
FROM mytable order by DaysSinceEpoch limit 10000",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS')
FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit
10000",
+ "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " +
daysSinceEpoch,
+ "SELECT count(*) FROM mytable WHERE
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+ "SELECT count(*) FROM mytable WHERE
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+ "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM
mytable",
+ "SELECT COUNT(*) FROM mytable GROUP BY
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
+ List<String> queries = new ArrayList<>();
+ baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch",
"mytable.DAYSSinceEpOch")));
+ baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch",
"mytable.DAYSSinceEpOch")));
+
+ for (String query : queries) {
+ try {
+ postQuery(query);
+ } catch (Exception e) {
+ // Fail the test when exception caught
+ throw new RuntimeException("Got Exceptions from query - " + query);
}
- return true;
- }, 10_000L, "Failed to get results for case-insensitive queries");
+ }
+ }
+
+ @Test
+ public void testCaseInsensitivityWithColumnNameContainsTableName() {
+ int daysSinceEpoch = 16138;
+ long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+ List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS')
FROM mytable",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS')
FROM mytable order by DaysSinceEpoch limit 10000",
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS')
FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit
10000",
+ "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " +
daysSinceEpoch,
+ "SELECT count(*) FROM mytable WHERE
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+ "SELECT count(*) FROM mytable WHERE
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+ "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM
mytable",
+ "SELECT COUNT(*) FROM mytable GROUP BY
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
+ List<String> queries = new ArrayList<>();
+ baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+ baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+
+ for (String query : queries) {
+ try {
+ postQuery(query);
+ } catch (Exception e) {
+ // Fail the test when exception caught
+ throw new RuntimeException("Got Exceptions from query - " + query);
+ }
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]