This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch support_catalog_in_from_clause in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d2f709aadd038785cd04f9d5e7d49353fcdbda82 Author: Xiang Fu <[email protected]> AuthorDate: Wed Jul 15 01:44:39 2020 -0700 Allow Pinot to accept query with FROM clause in the format of [database].[table] --- .../requesthandler/BaseBrokerRequestHandler.java | 41 ++++++++++++++++++++++ .../pinot/common/utils/helix/TableCache.java | 4 +++ .../tests/OfflineClusterIntegrationTest.java | 17 +++++++-- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 13e170c..9f2184e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -201,6 +201,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { e.getMessage()); } } + updateQuerySource(brokerRequest); if (_enableCaseInsensitive) { try { handleCaseSensitivity(brokerRequest); @@ -439,6 +440,46 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } /** + * Check if table is in the format of [database_name].[table_name]. + * + * Only update TableName in QuerySource if there is no existing table in the format of [database_name].[table_name], + * but only [table_name]. + * + * @param brokerRequest + */ + private void updateQuerySource(BrokerRequest brokerRequest) { + String tableName = brokerRequest.getQuerySource().getTableName(); + // Check if table is in the format of [database_name].[table_name] + String[] querySourceSplits = tableName.split("\\.", 2); + if (querySourceSplits.length != 2) { + return; + } + // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name] + if (_enableCaseInsensitive && _tableCache.existTableName(querySourceSplits[1]) && !_tableCache + .existTableName(tableName)) { + // Use TableCache to check case insensitive table name. + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + return; + } + // Use RoutingManager to check case sensitive table name. + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType != null && _routingManager.routingExists(querySourceSplits[1]) && !_routingManager + .routingExists(tableName)) { + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + return; + } + if (_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(querySourceSplits[1])) + && !_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName))) { + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + return; + } + if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(querySourceSplits[1])) + && !_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) { + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + } + } + + /** * Set Log2m value for DistinctCountHLL Function * @param brokerRequest * @param hllLog2mOverride diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java index 6f46ac2..88cf3dd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java @@ -67,6 +67,10 @@ public class TableCache { return _tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), tableName); } + public boolean existTableName(String tableName) { + return _tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase()); + } + public String getActualColumnName(String tableName, String columnName) { String schemaName = _tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase()); if (schemaName != null) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 774f209..4408ce2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1133,7 +1133,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet public void testCaseInsensitivity() { int daysSinceEpoch = 16138; long secondsSinceEpoch = 16138 * 24 * 60 * 60; - List<String> queries = Arrays.asList("SELECT * FROM mytable", + List<String> baseQueries = Arrays.asList("SELECT * FROM mytable", "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable", "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000", "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000", @@ -1142,7 +1142,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch, "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable", "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')"); - queries.replaceAll(query -> query.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")); + List<String> queries = new ArrayList<>(); + baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); + baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); // Wait for at most 10 seconds for broker to get the ZK callback of the schema change TestUtils.waitForCondition(aVoid -> { @@ -1162,6 +1164,17 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } @Test + public void testQuerySourceWithDatabaseName() + throws Exception { + // by default 10 rows will be returned, so use high limit + String pql = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000"; + String sql = "SELECT DISTINCT Carrier FROM mytable"; + testQuery(pql, Collections.singletonList(sql)); + pql = "SELECT DISTINCT Carrier FROM db.mytable LIMIT 1000000"; + testSqlQuery(pql, Collections.singletonList(sql)); + } + + @Test public void testDistinctCountHll() throws Exception { String query; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
