This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch support_catalog_in_from_clause in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit fbc3ef3ba04a674e45f6872ec5b05d210601a154 Author: Xiang Fu <[email protected]> AuthorDate: Wed Jul 15 01:44:39 2020 -0700 Allow Pinot to accept query with FROM clause in the format of [database].[table] --- .../requesthandler/BaseBrokerRequestHandler.java | 41 ++++++++++++++++++++++ .../pinot/common/utils/helix/TableCache.java | 4 +++ .../tests/OfflineClusterIntegrationTest.java | 6 ++-- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 13e170c..d7a892c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -201,6 +201,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { e.getMessage()); } } + updateQuerySource(brokerRequest); if (_enableCaseInsensitive) { try { handleCaseSensitivity(brokerRequest); @@ -439,6 +440,46 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } /** + * Check if table is in the format of [database_name].[table_name]. + * + * Only update TableName in QuerySource if there is no existing table in the format of [database_name].[table_name], + * but only [table_name]. + * + * @param brokerRequest + */ + private void updateQuerySource(BrokerRequest brokerRequest) { + String tableName = brokerRequest.getQuerySource().getTableName(); + // Check if table is in the format of [database_name].[table_name] + String[] querySourceSplits = tableName.split(".", 2); + if (querySourceSplits.length != 2) { + return; + } + // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name] + if (_enableCaseInsensitive && _tableCache.existTableName(querySourceSplits[1]) && !_tableCache + .existTableName(tableName)) { + // Use TableCache to check case insensitive table name. + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + return; + } + // Use RoutingManager to check case sensitive table name. + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType != null && _routingManager.routingExists(querySourceSplits[1]) && !_routingManager + .routingExists(tableName)) { + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + return; + } + if (_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(querySourceSplits[1])) + && !_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName))) { + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + return; + } + if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(querySourceSplits[1])) + && !_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) { + brokerRequest.getQuerySource().setTableName(querySourceSplits[1]); + } + } + + /** * Set Log2m value for DistinctCountHLL Function * @param brokerRequest * @param hllLog2mOverride diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java index 6f46ac2..88cf3dd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java @@ -67,6 +67,10 @@ public class TableCache { return _tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), tableName); } + public boolean existTableName(String tableName) { + return _tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase()); + } + public String getActualColumnName(String tableName, String columnName) { String schemaName = _tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase()); if (schemaName != null) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 774f209..b305f9f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1133,7 +1133,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet public void testCaseInsensitivity() { int daysSinceEpoch = 16138; long secondsSinceEpoch = 16138 * 24 * 60 * 60; - List<String> queries = Arrays.asList("SELECT * FROM mytable", + List<String> baseQueries = Arrays.asList("SELECT * FROM mytable", "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable", "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000", "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000", @@ -1142,7 +1142,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch, "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable", "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')"); - queries.replaceAll(query -> query.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")); + List<String> queries = new ArrayList<>(); + baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); + baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); // Wait for at most 10 seconds for broker to get the ZK callback of the schema change TestUtils.waitForCondition(aVoid -> { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
