This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch support_catalog_in_from_clause
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d2f709aadd038785cd04f9d5e7d49353fcdbda82
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jul 15 01:44:39 2020 -0700

    Allow Pinot to accept query with FROM clause in the format of 
[database].[table]
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 41 ++++++++++++++++++++++
 .../pinot/common/utils/helix/TableCache.java       |  4 +++
 .../tests/OfflineClusterIntegrationTest.java       | 17 +++++++--
 3 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 13e170c..9f2184e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -201,6 +201,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
                 e.getMessage());
       }
     }
+    updateQuerySource(brokerRequest);
     if (_enableCaseInsensitive) {
       try {
         handleCaseSensitivity(brokerRequest);
@@ -439,6 +440,46 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   }
 
   /**
+   * Check if table is in the format of [database_name].[table_name].
+   *
+   * Only update TableName in QuerySource if there is no existing table in the 
format of [database_name].[table_name],
+   * but only [table_name].
+   *
+   * @param brokerRequest
+   */
+  private void updateQuerySource(BrokerRequest brokerRequest) {
+    String tableName = brokerRequest.getQuerySource().getTableName();
+    // Check if table is in the format of [database_name].[table_name]
+    String[] querySourceSplits = tableName.split("\\.", 2);
+    if (querySourceSplits.length != 2) {
+      return;
+    }
+    // Update table name if there is no existing table in the format of 
[database_name].[table_name] but only [table_name]
+    if (_enableCaseInsensitive && 
_tableCache.existTableName(querySourceSplits[1]) && !_tableCache
+        .existTableName(tableName)) {
+      // Use TableCache to check case insensitive table name.
+      brokerRequest.getQuerySource().setTableName(querySourceSplits[1]);
+      return;
+    }
+    // Use RoutingManager to check case sensitive table name.
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType != null && 
_routingManager.routingExists(querySourceSplits[1]) && !_routingManager
+        .routingExists(tableName)) {
+      brokerRequest.getQuerySource().setTableName(querySourceSplits[1]);
+      return;
+    }
+    if 
(_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(querySourceSplits[1]))
+        && 
!_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName)))
 {
+      brokerRequest.getQuerySource().setTableName(querySourceSplits[1]);
+      return;
+    }
+    if 
(_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(querySourceSplits[1]))
+        && 
!_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName)))
 {
+      brokerRequest.getQuerySource().setTableName(querySourceSplits[1]);
+    }
+  }
+
+  /**
    * Set Log2m value for DistinctCountHLL Function
    * @param brokerRequest
    * @param hllLog2mOverride
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 6f46ac2..88cf3dd 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -67,6 +67,10 @@ public class TableCache {
     return 
_tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), 
tableName);
   }
 
+  public boolean existTableName(String tableName) {
+    return 
_tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+  }
+
   public String getActualColumnName(String tableName, String columnName) {
     String schemaName = 
_tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
     if (schemaName != null) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 774f209..4408ce2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1133,7 +1133,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   public void testCaseInsensitivity() {
     int daysSinceEpoch = 16138;
     long secondsSinceEpoch = 16138 * 24 * 60 * 60;
-    List<String> queries = Arrays.asList("SELECT * FROM mytable",
+    List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by DaysSinceEpoch limit 10000",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') 
FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 
10000",
@@ -1142,7 +1142,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         "SELECT count(*) FROM mytable WHERE 
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
         "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM 
mytable",
         "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
-    queries.replaceAll(query -> query.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"));
+    List<String> queries = new ArrayList<>();
+    baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+    baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
 
     // Wait for at most 10 seconds for broker to get the ZK callback of the 
schema change
     TestUtils.waitForCondition(aVoid -> {
@@ -1162,6 +1164,17 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
+  public void testQuerySourceWithDatabaseName()
+      throws Exception {
+    // by default 10 rows will be returned, so use high limit
+    String pql = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000";
+    String sql = "SELECT DISTINCT Carrier FROM mytable";
+    testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier FROM db.mytable LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
+  }
+
+  @Test
   public void testDistinctCountHll()
       throws Exception {
     String query;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to