jackjlli commented on a change in pull request #3568: Add guava cache to cache 
table schema in pinot broker
URL: https://github.com/apache/incubator-pinot/pull/3568#discussion_r238044599
 
 

 ##########
 File path: 
pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 ##########
 @@ -311,6 +329,68 @@ private void validateRequest(BrokerRequest brokerRequest) 
{
             "Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value 
of " + _queryResponseLimit);
       }
     }
+
+    // Checks whether the query contains non-existence columns.
+    // Table name has already been verified before hitting this line.
+    String tableName = brokerRequest.getQuerySource().getTableName();
+    Schema schema = _tableSchemaCache.getIfTableSchemaPresent(tableName);
+    if (schema != null) {
+      Set<String> allColumns = getAllColumnsFromBrokerRequest(brokerRequest);
+      Set<String> copied = new HashSet<>(allColumns);
+      copied.removeAll(schema.getColumnNames());
+      if (!copied.isEmpty()) {
+        _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.QUERY_NON_EXISTENCE_COLUMNS, 1L);
+        throw new RuntimeException("Found non-existence columns from the 
query: " + copied.toString());
+      }
+    } else {
+      // If the cache doesn't have the schema, loads the schema to the cache 
asynchronously.
+      _tableSchemaCache.refreshTableSchema(tableName);
+    }
+  }
+
+  /**
+   * Helper to get all the columns from broker request.
+   * Returns the set of all the columns.
+   */
+  private Set<String> getAllColumnsFromBrokerRequest(BrokerRequest 
brokerRequest) {
+    Set<String> allColumns = new HashSet<>();
+    // Filter
+    FilterQueryTree filterQueryTree = 
RequestUtils.generateFilterQueryTree(brokerRequest);
+    if (filterQueryTree != null) {
+      allColumns.addAll(RequestUtils.extractFilterColumns(filterQueryTree));
+    }
+
+    // Aggregation
+    List<AggregationInfo> aggregationsInfo = 
brokerRequest.getAggregationsInfo();
+    if (aggregationsInfo != null) {
+      Set<TransformExpressionTree> _aggregationExpressions = new HashSet<>();
+      for (AggregationInfo aggregationInfo : aggregationsInfo) {
+        if 
(!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName()))
 {
+          _aggregationExpressions.add(
+              
TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo)));
+        }
+      }
+      
allColumns.addAll(RequestUtils.extractColumnsFromExpressions(_aggregationExpressions));
+    }
+
+    // Group-by
+    GroupBy groupBy = brokerRequest.getGroupBy();
+    if (groupBy != null) {
+      Set<TransformExpressionTree> groupByExpressions = new HashSet<>();
+      for (String expression : groupBy.getExpressions()) {
+        
groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+      }
+      
allColumns.addAll(RequestUtils.extractColumnsFromExpressions(groupByExpressions));
+    }
+
+
+    // Selection
+    Selection selection = brokerRequest.getSelections();
+    if (selection != null) {
+      allColumns.addAll(RequestUtils.extractSelectionColumns(selection));
+    }
+
+    return allColumns;
 
 Review comment:
   A extra field called `validateQuery` in query has been added to this PR. 
Only when it's specified and it's set to true do we validate the schema.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to