siddharthteotia commented on code in PR #10171:
URL: https://github.com/apache/pinot/pull/10171#discussion_r1092303699


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -258,435 +259,447 @@ private BrokerResponseNative handleRequest(long 
requestId, String query,
       throws Exception {
     LOGGER.debug("SQL query for request {}: {}", requestId, query);
 
-    long compilationStartTimeNs;
-    PinotQuery pinotQuery;
+    Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId));
+
     try {
-      // Parse the request
-      sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : 
RequestUtils.parseQuery(query, request);
-      // Compile the request into PinotQuery
-      compilationStartTimeNs = System.nanoTime();
-      pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
-      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
-      requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
-    }
-
-    if (isLiteralOnlyQuery(pinotQuery)) {
-      LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
+      long compilationStartTimeNs;
+      PinotQuery pinotQuery;
       try {
-        if (pinotQuery.isExplain()) {
-          // EXPLAIN PLAN results to show that query is evaluated exclusively 
by Broker.
-          return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
-        }
-        return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, 
requestContext);
+        // Parse the request
+        sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : 
RequestUtils.parseQuery(query, request);
+        // Compile the request into PinotQuery
+        compilationStartTimeNs = System.nanoTime();
+        pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
       } catch (Exception e) {
-        // TODO: refine the exceptions here to early termination the queries 
won't requires to send to servers.
-        LOGGER.warn("Unable to execute literal request {}: {} at broker, 
fallback to server query. {}", requestId,
-            query, e.getMessage());
+        LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
+        requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
       }
-    }
-
-    PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
-    if (serverPinotQuery.getDataSource() == null) {
-      LOGGER.info("Data source (FROM clause) not found in request {}: {}", 
request, query);
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      return new BrokerResponseNative(
-          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Data source (FROM clause) not found"));
-    }
 
-    try {
-      handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, 
requestContext);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while handling the subquery in request {}: 
{}, {}", requestId, query,
-          e.getMessage());
-      requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
-    }
+      if (isLiteralOnlyQuery(pinotQuery)) {
+        LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
+        try {
+          if (pinotQuery.isExplain()) {
+            // EXPLAIN PLAN results to show that query is evaluated 
exclusively by Broker.
+            return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+          }
+          return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, 
requestContext);
+        } catch (Exception e) {
+          // TODO: refine the exceptions here to early termination the queries 
won't requires to send to servers.
+          LOGGER.warn("Unable to execute literal request {}: {} at broker, 
fallback to server query. {}", requestId,
+              query, e.getMessage());
+        }
+      }
 
-    String tableName = 
getActualTableName(serverPinotQuery.getDataSource().getTableName(), 
_tableCache);
-    serverPinotQuery.getDataSource().setTableName(tableName);
-    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    requestContext.setTableName(rawTableName);
+      PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+      if (serverPinotQuery.getDataSource() == null) {
+        LOGGER.info("Data source (FROM clause) not found in request {}: {}", 
request, query);
+        
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+        return new BrokerResponseNative(
+            QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Data source (FROM clause) not found"));
+      }
 
-    try {
-      boolean isCaseInsensitive = _tableCache.isIgnoreCase();
-      Map<String, String> columnNameMap = 
_tableCache.getColumnNameMap(rawTableName);
-      if (columnNameMap != null) {
-        updateColumnNames(rawTableName, serverPinotQuery, isCaseInsensitive, 
columnNameMap);
-      }
-    } catch (Exception e) {
-      // Throw exceptions with column in-existence error.
-      if (e instanceof BadQueryRequestException) {
-        LOGGER.info("Caught exception while checking column names in request 
{}: {}, {}", requestId, query,
+      try {
+        handleSubquery(serverPinotQuery, requestId, request, 
requesterIdentity, requestContext);
+      } catch (Exception e) {
+        LOGGER.info("Caught exception while handling the subquery in request 
{}: {}, {}", requestId, query,
             e.getMessage());
-        requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
-        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
-        return new 
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
 e));
-      }
-      LOGGER.warn("Caught exception while updating column names in request {}: 
{}, {}", requestId, query,
-          e.getMessage());
-    }
-    if (_defaultHllLog2m > 0) {
-      handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
-    }
-    if (_enableQueryLimitOverride) {
-      handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
-    }
-    handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
-        getSegmentPartitionedColumns(_tableCache, tableName));
-    if (_enableDistinctCountBitmapOverride) {
-      handleDistinctCountBitmapOverride(serverPinotQuery);
-    }
-
-    long compilationEndTimeNs = System.nanoTime();
-    // full request compile time = compilationTimeNs + parserTimeNs
-    _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
-        (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
-
-    // Second-stage table-level access control
-    // TODO: Modify AccessControl interface to directly take PinotQuery
-    BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
-    BrokerRequest serverBrokerRequest =
-        serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
-    boolean hasTableAccess = 
_accessControlFactory.create().hasAccess(requesterIdentity, 
serverBrokerRequest);
-    if (!hasTableAccess) {
-      _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
-      LOGGER.info("Access denied for request {}: {}, table: {}", requestId, 
query, tableName);
-      requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
-      return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
-    }
-    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
-        System.nanoTime() - compilationEndTimeNs);
+        requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+      }
+
+      String tableName = 
getActualTableName(serverPinotQuery.getDataSource().getTableName(), 
_tableCache);
+      serverPinotQuery.getDataSource().setTableName(tableName);
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      requestContext.setTableName(rawTableName);
 
-    // Get the tables hit by the request
-    String offlineTableName = null;
-    String realtimeTableName = null;
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType == TableType.OFFLINE) {
-      // Offline table
-      if (_routingManager.routingExists(tableName)) {
-        offlineTableName = tableName;
+      try {
+        boolean isCaseInsensitive = _tableCache.isIgnoreCase();
+        Map<String, String> columnNameMap = 
_tableCache.getColumnNameMap(rawTableName);
+        if (columnNameMap != null) {
+          updateColumnNames(rawTableName, serverPinotQuery, isCaseInsensitive, 
columnNameMap);
+        }
+      } catch (Exception e) {
+        // Throw exceptions with column in-existence error.
+        if (e instanceof BadQueryRequestException) {
+          LOGGER.info("Caught exception while checking column names in request 
{}: {}, {}", requestId, query,
+              e.getMessage());
+          
requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
+          _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
+          return new 
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
 e));
+        }
+        LOGGER.warn("Caught exception while updating column names in request 
{}: {}, {}", requestId, query,
+            e.getMessage());
       }
-    } else if (tableType == TableType.REALTIME) {
-      // Realtime table
-      if (_routingManager.routingExists(tableName)) {
-        realtimeTableName = tableName;
+      if (_defaultHllLog2m > 0) {
+        handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
       }
-    } else {
-      // Hybrid table (check both OFFLINE and REALTIME)
-      String offlineTableNameToCheck = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-      if (_routingManager.routingExists(offlineTableNameToCheck)) {
-        offlineTableName = offlineTableNameToCheck;
+      if (_enableQueryLimitOverride) {
+        handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
       }
-      String realtimeTableNameToCheck = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
-      if (_routingManager.routingExists(realtimeTableNameToCheck)) {
-        realtimeTableName = realtimeTableNameToCheck;
+      handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
+          getSegmentPartitionedColumns(_tableCache, tableName));
+      if (_enableDistinctCountBitmapOverride) {
+        handleDistinctCountBitmapOverride(serverPinotQuery);
       }
-    }
 
-    TableConfig offlineTableConfig =
-        
_tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
-    TableConfig realtimeTableConfig =
-        
_tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
+      long compilationEndTimeNs = System.nanoTime();
+      // full request compile time = compilationTimeNs + parserTimeNs
+      _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
+          (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
+
+      // Second-stage table-level access control
+      // TODO: Modify AccessControl interface to directly take PinotQuery
+      BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
+      BrokerRequest serverBrokerRequest =
+          serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
+      boolean hasTableAccess = 
_accessControlFactory.create().hasAccess(requesterIdentity, 
serverBrokerRequest);
+      if (!hasTableAccess) {
+        _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
+        LOGGER.info("Access denied for request {}: {}, table: {}", requestId, 
query, tableName);
+        requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+        return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
+      }
+      _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.AUTHORIZATION,
+          System.nanoTime() - compilationEndTimeNs);
+
+      // Get the tables hit by the request
+      String offlineTableName = null;
+      String realtimeTableName = null;
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+      if (tableType == TableType.OFFLINE) {
+        // Offline table
+        if (_routingManager.routingExists(tableName)) {
+          offlineTableName = tableName;
+        }
+      } else if (tableType == TableType.REALTIME) {
+        // Realtime table
+        if (_routingManager.routingExists(tableName)) {
+          realtimeTableName = tableName;
+        }
+      } else {
+        // Hybrid table (check both OFFLINE and REALTIME)
+        String offlineTableNameToCheck = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+        if (_routingManager.routingExists(offlineTableNameToCheck)) {
+          offlineTableName = offlineTableNameToCheck;
+        }
+        String realtimeTableNameToCheck = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+        if (_routingManager.routingExists(realtimeTableNameToCheck)) {
+          realtimeTableName = realtimeTableNameToCheck;
+        }
+      }
 
-    if (offlineTableName == null && realtimeTableName == null) {
-      // No table matches the request
-      if (realtimeTableConfig == null && offlineTableConfig == null) {
-        LOGGER.info("Table not found for request {}: {}", requestId, query);
-        
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
-        return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+      TableConfig offlineTableConfig =
+          
_tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
+      TableConfig realtimeTableConfig =
+          
_tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
+
+      if (offlineTableName == null && realtimeTableName == null) {
+        // No table matches the request
+        if (realtimeTableConfig == null && offlineTableConfig == null) {
+          LOGGER.info("Table not found for request {}: {}", requestId, query);
+          
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
+          return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+        }
+        LOGGER.info("No table matches for request {}: {}", requestId, query);
+        
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 
1);
+        return BrokerResponseNative.NO_TABLE_RESULT;
       }
-      LOGGER.info("No table matches for request {}: {}", requestId, query);
-      
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
-      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 
1);
-      return BrokerResponseNative.NO_TABLE_RESULT;
-    }
 
-    // Handle query rewrite that can be overridden by the table configs
-    if (offlineTableName == null) {
-      offlineTableConfig = null;
-    }
-    if (realtimeTableName == null) {
-      realtimeTableConfig = null;
-    }
-    HandlerContext handlerContext = getHandlerContext(offlineTableConfig, 
realtimeTableConfig);
-    if (handlerContext._disableGroovy) {
-      rejectGroovyQuery(serverPinotQuery);
-    }
-    if (handlerContext._useApproximateFunction) {
-      handleApproximateFunctionOverride(serverPinotQuery);
-    }
+      // Handle query rewrite that can be overridden by the table configs

Review Comment:
   (nit) you may want to check formatting of changes in this class. Seems weird



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -258,435 +259,447 @@ private BrokerResponseNative handleRequest(long 
requestId, String query,
       throws Exception {
     LOGGER.debug("SQL query for request {}: {}", requestId, query);
 
-    long compilationStartTimeNs;
-    PinotQuery pinotQuery;
+    Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId));
+
     try {
-      // Parse the request
-      sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : 
RequestUtils.parseQuery(query, request);
-      // Compile the request into PinotQuery
-      compilationStartTimeNs = System.nanoTime();
-      pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
-      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
-      requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
-    }
-
-    if (isLiteralOnlyQuery(pinotQuery)) {
-      LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
+      long compilationStartTimeNs;
+      PinotQuery pinotQuery;
       try {
-        if (pinotQuery.isExplain()) {
-          // EXPLAIN PLAN results to show that query is evaluated exclusively 
by Broker.
-          return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
-        }
-        return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, 
requestContext);
+        // Parse the request
+        sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : 
RequestUtils.parseQuery(query, request);
+        // Compile the request into PinotQuery
+        compilationStartTimeNs = System.nanoTime();
+        pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
       } catch (Exception e) {
-        // TODO: refine the exceptions here to early termination the queries 
won't requires to send to servers.
-        LOGGER.warn("Unable to execute literal request {}: {} at broker, 
fallback to server query. {}", requestId,
-            query, e.getMessage());
+        LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
+        requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
       }
-    }
-
-    PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
-    if (serverPinotQuery.getDataSource() == null) {
-      LOGGER.info("Data source (FROM clause) not found in request {}: {}", 
request, query);
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      return new BrokerResponseNative(
-          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Data source (FROM clause) not found"));
-    }
 
-    try {
-      handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, 
requestContext);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while handling the subquery in request {}: 
{}, {}", requestId, query,
-          e.getMessage());
-      requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
-    }
+      if (isLiteralOnlyQuery(pinotQuery)) {
+        LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
+        try {
+          if (pinotQuery.isExplain()) {
+            // EXPLAIN PLAN results to show that query is evaluated 
exclusively by Broker.
+            return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+          }
+          return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs, 
requestContext);
+        } catch (Exception e) {
+          // TODO: refine the exceptions here to early termination the queries 
won't requires to send to servers.
+          LOGGER.warn("Unable to execute literal request {}: {} at broker, 
fallback to server query. {}", requestId,
+              query, e.getMessage());
+        }
+      }
 
-    String tableName = 
getActualTableName(serverPinotQuery.getDataSource().getTableName(), 
_tableCache);
-    serverPinotQuery.getDataSource().setTableName(tableName);
-    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    requestContext.setTableName(rawTableName);
+      PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+      if (serverPinotQuery.getDataSource() == null) {
+        LOGGER.info("Data source (FROM clause) not found in request {}: {}", 
request, query);
+        
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+        return new BrokerResponseNative(
+            QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Data source (FROM clause) not found"));
+      }
 
-    try {
-      boolean isCaseInsensitive = _tableCache.isIgnoreCase();
-      Map<String, String> columnNameMap = 
_tableCache.getColumnNameMap(rawTableName);
-      if (columnNameMap != null) {
-        updateColumnNames(rawTableName, serverPinotQuery, isCaseInsensitive, 
columnNameMap);
-      }
-    } catch (Exception e) {
-      // Throw exceptions with column in-existence error.
-      if (e instanceof BadQueryRequestException) {
-        LOGGER.info("Caught exception while checking column names in request 
{}: {}, {}", requestId, query,
+      try {
+        handleSubquery(serverPinotQuery, requestId, request, 
requesterIdentity, requestContext);
+      } catch (Exception e) {
+        LOGGER.info("Caught exception while handling the subquery in request 
{}: {}, {}", requestId, query,
             e.getMessage());
-        requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
-        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
-        return new 
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
 e));
-      }
-      LOGGER.warn("Caught exception while updating column names in request {}: 
{}, {}", requestId, query,
-          e.getMessage());
-    }
-    if (_defaultHllLog2m > 0) {
-      handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
-    }
-    if (_enableQueryLimitOverride) {
-      handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
-    }
-    handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
-        getSegmentPartitionedColumns(_tableCache, tableName));
-    if (_enableDistinctCountBitmapOverride) {
-      handleDistinctCountBitmapOverride(serverPinotQuery);
-    }
-
-    long compilationEndTimeNs = System.nanoTime();
-    // full request compile time = compilationTimeNs + parserTimeNs
-    _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
-        (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
-
-    // Second-stage table-level access control
-    // TODO: Modify AccessControl interface to directly take PinotQuery
-    BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
-    BrokerRequest serverBrokerRequest =
-        serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
-    boolean hasTableAccess = 
_accessControlFactory.create().hasAccess(requesterIdentity, 
serverBrokerRequest);
-    if (!hasTableAccess) {
-      _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
-      LOGGER.info("Access denied for request {}: {}, table: {}", requestId, 
query, tableName);
-      requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
-      return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
-    }
-    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
-        System.nanoTime() - compilationEndTimeNs);
+        requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+      }
+
+      String tableName = 
getActualTableName(serverPinotQuery.getDataSource().getTableName(), 
_tableCache);
+      serverPinotQuery.getDataSource().setTableName(tableName);
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      requestContext.setTableName(rawTableName);
 
-    // Get the tables hit by the request
-    String offlineTableName = null;
-    String realtimeTableName = null;
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType == TableType.OFFLINE) {
-      // Offline table
-      if (_routingManager.routingExists(tableName)) {
-        offlineTableName = tableName;
+      try {
+        boolean isCaseInsensitive = _tableCache.isIgnoreCase();
+        Map<String, String> columnNameMap = 
_tableCache.getColumnNameMap(rawTableName);
+        if (columnNameMap != null) {
+          updateColumnNames(rawTableName, serverPinotQuery, isCaseInsensitive, 
columnNameMap);
+        }
+      } catch (Exception e) {
+        // Throw exceptions with column in-existence error.
+        if (e instanceof BadQueryRequestException) {
+          LOGGER.info("Caught exception while checking column names in request 
{}: {}, {}", requestId, query,
+              e.getMessage());
+          
requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
+          _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
+          return new 
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
 e));
+        }
+        LOGGER.warn("Caught exception while updating column names in request 
{}: {}, {}", requestId, query,
+            e.getMessage());
       }
-    } else if (tableType == TableType.REALTIME) {
-      // Realtime table
-      if (_routingManager.routingExists(tableName)) {
-        realtimeTableName = tableName;
+      if (_defaultHllLog2m > 0) {
+        handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
       }
-    } else {
-      // Hybrid table (check both OFFLINE and REALTIME)
-      String offlineTableNameToCheck = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-      if (_routingManager.routingExists(offlineTableNameToCheck)) {
-        offlineTableName = offlineTableNameToCheck;
+      if (_enableQueryLimitOverride) {
+        handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
       }
-      String realtimeTableNameToCheck = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
-      if (_routingManager.routingExists(realtimeTableNameToCheck)) {
-        realtimeTableName = realtimeTableNameToCheck;
+      handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
+          getSegmentPartitionedColumns(_tableCache, tableName));
+      if (_enableDistinctCountBitmapOverride) {
+        handleDistinctCountBitmapOverride(serverPinotQuery);
       }
-    }
 
-    TableConfig offlineTableConfig =
-        
_tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
-    TableConfig realtimeTableConfig =
-        
_tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
+      long compilationEndTimeNs = System.nanoTime();
+      // full request compile time = compilationTimeNs + parserTimeNs
+      _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
+          (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
+
+      // Second-stage table-level access control
+      // TODO: Modify AccessControl interface to directly take PinotQuery
+      BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
+      BrokerRequest serverBrokerRequest =
+          serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
+      boolean hasTableAccess = 
_accessControlFactory.create().hasAccess(requesterIdentity, 
serverBrokerRequest);
+      if (!hasTableAccess) {
+        _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
+        LOGGER.info("Access denied for request {}: {}, table: {}", requestId, 
query, tableName);
+        requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+        return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
+      }
+      _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.AUTHORIZATION,
+          System.nanoTime() - compilationEndTimeNs);
+
+      // Get the tables hit by the request
+      String offlineTableName = null;
+      String realtimeTableName = null;
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+      if (tableType == TableType.OFFLINE) {
+        // Offline table
+        if (_routingManager.routingExists(tableName)) {
+          offlineTableName = tableName;
+        }
+      } else if (tableType == TableType.REALTIME) {
+        // Realtime table
+        if (_routingManager.routingExists(tableName)) {
+          realtimeTableName = tableName;
+        }
+      } else {
+        // Hybrid table (check both OFFLINE and REALTIME)
+        String offlineTableNameToCheck = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+        if (_routingManager.routingExists(offlineTableNameToCheck)) {
+          offlineTableName = offlineTableNameToCheck;
+        }
+        String realtimeTableNameToCheck = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+        if (_routingManager.routingExists(realtimeTableNameToCheck)) {
+          realtimeTableName = realtimeTableNameToCheck;
+        }
+      }
 
-    if (offlineTableName == null && realtimeTableName == null) {
-      // No table matches the request
-      if (realtimeTableConfig == null && offlineTableConfig == null) {
-        LOGGER.info("Table not found for request {}: {}", requestId, query);
-        
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
-        return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+      TableConfig offlineTableConfig =
+          
_tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
+      TableConfig realtimeTableConfig =
+          
_tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
+
+      if (offlineTableName == null && realtimeTableName == null) {
+        // No table matches the request
+        if (realtimeTableConfig == null && offlineTableConfig == null) {
+          LOGGER.info("Table not found for request {}: {}", requestId, query);
+          
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
+          return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+        }
+        LOGGER.info("No table matches for request {}: {}", requestId, query);
+        
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 
1);
+        return BrokerResponseNative.NO_TABLE_RESULT;
       }
-      LOGGER.info("No table matches for request {}: {}", requestId, query);
-      
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
-      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 
1);
-      return BrokerResponseNative.NO_TABLE_RESULT;
-    }
 
-    // Handle query rewrite that can be overridden by the table configs
-    if (offlineTableName == null) {
-      offlineTableConfig = null;
-    }
-    if (realtimeTableName == null) {
-      realtimeTableConfig = null;
-    }
-    HandlerContext handlerContext = getHandlerContext(offlineTableConfig, 
realtimeTableConfig);
-    if (handlerContext._disableGroovy) {
-      rejectGroovyQuery(serverPinotQuery);
-    }
-    if (handlerContext._useApproximateFunction) {
-      handleApproximateFunctionOverride(serverPinotQuery);
-    }
+      // Handle query rewrite that can be overridden by the table configs
+      if (offlineTableName == null) {
+        offlineTableConfig = null;
+      }
+      if (realtimeTableName == null) {
+        realtimeTableConfig = null;
+      }
+      HandlerContext handlerContext = getHandlerContext(offlineTableConfig, 
realtimeTableConfig);
+      if (handlerContext._disableGroovy) {
+        rejectGroovyQuery(serverPinotQuery);
+      }
+      if (handlerContext._useApproximateFunction) {
+        handleApproximateFunctionOverride(serverPinotQuery);
+      }
 
-    // Validate QPS quota
-    if (!_queryQuotaManager.acquire(tableName)) {
-      String errorMessage =
-          String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
-      LOGGER.info(errorMessage);
-      requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
-      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
-    }
+      // Validate QPS quota
+      if (!_queryQuotaManager.acquire(tableName)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
+        LOGGER.info(errorMessage);
+        
requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
+      }
 
-    // Validate the request
-    try {
-      validateRequest(serverPinotQuery, _queryResponseLimit);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e));
-    }
-
-    _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
-    _brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.REQUEST_SIZE, query.length());
-
-    // Prepare OFFLINE and REALTIME requests
-    BrokerRequest offlineBrokerRequest = null;
-    BrokerRequest realtimeBrokerRequest = null;
-    TimeBoundaryInfo timeBoundaryInfo = null;
-    Schema schema = _tableCache.getSchema(rawTableName);
-    if (offlineTableName != null && realtimeTableName != null) {
-      // Time boundary info might be null when there is no segment in the 
offline table, query real-time side only
-      timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(offlineTableName);
-      if (timeBoundaryInfo == null) {
-        LOGGER.debug("No time boundary info found for hybrid table: {}", 
rawTableName);
-        offlineTableName = null;
-      }
-    }
-    if (offlineTableName != null && realtimeTableName != null) {
-      // Hybrid
-      PinotQuery offlinePinotQuery = serverPinotQuery.deepCopy();
-      offlinePinotQuery.getDataSource().setTableName(offlineTableName);
-      attachTimeBoundary(offlinePinotQuery, timeBoundaryInfo, true);
-      handleExpressionOverride(offlinePinotQuery, 
_tableCache.getExpressionOverrideMap(offlineTableName));
-      handleTimestampIndexOverride(offlinePinotQuery, offlineTableConfig);
-      _queryOptimizer.optimize(offlinePinotQuery, offlineTableConfig, schema);
-      offlineBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(offlinePinotQuery);
-
-      PinotQuery realtimePinotQuery = serverPinotQuery.deepCopy();
-      realtimePinotQuery.getDataSource().setTableName(realtimeTableName);
-      attachTimeBoundary(realtimePinotQuery, timeBoundaryInfo, false);
-      handleExpressionOverride(realtimePinotQuery, 
_tableCache.getExpressionOverrideMap(realtimeTableName));
-      handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig);
-      _queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, 
schema);
-      realtimeBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery);
-
-      requestContext.setFanoutType(RequestContext.FanoutType.HYBRID);
-      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
-      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
-    } else if (offlineTableName != null) {
-      // OFFLINE only
-      setTableName(serverBrokerRequest, offlineTableName);
-      handleExpressionOverride(serverPinotQuery, 
_tableCache.getExpressionOverrideMap(offlineTableName));
-      handleTimestampIndexOverride(serverPinotQuery, offlineTableConfig);
-      _queryOptimizer.optimize(serverPinotQuery, offlineTableConfig, schema);
-      offlineBrokerRequest = serverBrokerRequest;
-
-      requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE);
-      requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
-    } else {
-      // REALTIME only
-      setTableName(serverBrokerRequest, realtimeTableName);
-      handleExpressionOverride(serverPinotQuery, 
_tableCache.getExpressionOverrideMap(realtimeTableName));
-      handleTimestampIndexOverride(serverPinotQuery, realtimeTableConfig);
-      _queryOptimizer.optimize(serverPinotQuery, realtimeTableConfig, schema);
-      realtimeBrokerRequest = serverBrokerRequest;
+      // Validate the request
+      try {
+        validateRequest(serverPinotQuery, _queryResponseLimit);
+      } catch (Exception e) {
+        LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
+        
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
+        return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e));
+      }
 
-      requestContext.setFanoutType(RequestContext.FanoutType.REALTIME);
-      
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
-    }
+      _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 
1);
+      _brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.REQUEST_SIZE, query.length());
+
+      // Prepare OFFLINE and REALTIME requests
+      BrokerRequest offlineBrokerRequest = null;
+      BrokerRequest realtimeBrokerRequest = null;
+      TimeBoundaryInfo timeBoundaryInfo = null;
+      Schema schema = _tableCache.getSchema(rawTableName);
+      if (offlineTableName != null && realtimeTableName != null) {
+        // Time boundary info might be null when there is no segment in the 
offline table, query real-time side only
+        timeBoundaryInfo = 
_routingManager.getTimeBoundaryInfo(offlineTableName);
+        if (timeBoundaryInfo == null) {
+          LOGGER.debug("No time boundary info found for hybrid table: {}", 
rawTableName);
+          offlineTableName = null;
+        }
+      }
+      if (offlineTableName != null && realtimeTableName != null) {
+        // Hybrid
+        PinotQuery offlinePinotQuery = serverPinotQuery.deepCopy();
+        offlinePinotQuery.getDataSource().setTableName(offlineTableName);
+        attachTimeBoundary(offlinePinotQuery, timeBoundaryInfo, true);
+        handleExpressionOverride(offlinePinotQuery, 
_tableCache.getExpressionOverrideMap(offlineTableName));
+        handleTimestampIndexOverride(offlinePinotQuery, offlineTableConfig);
+        _queryOptimizer.optimize(offlinePinotQuery, offlineTableConfig, 
schema);
+        offlineBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(offlinePinotQuery);
+
+        PinotQuery realtimePinotQuery = serverPinotQuery.deepCopy();
+        realtimePinotQuery.getDataSource().setTableName(realtimeTableName);
+        attachTimeBoundary(realtimePinotQuery, timeBoundaryInfo, false);
+        handleExpressionOverride(realtimePinotQuery, 
_tableCache.getExpressionOverrideMap(realtimeTableName));
+        handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig);
+        _queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, 
schema);
+        realtimeBrokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery);
+
+        requestContext.setFanoutType(RequestContext.FanoutType.HYBRID);
+        
requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
+        
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+      } else if (offlineTableName != null) {
+        // OFFLINE only
+        setTableName(serverBrokerRequest, offlineTableName);
+        handleExpressionOverride(serverPinotQuery, 
_tableCache.getExpressionOverrideMap(offlineTableName));
+        handleTimestampIndexOverride(serverPinotQuery, offlineTableConfig);
+        _queryOptimizer.optimize(serverPinotQuery, offlineTableConfig, schema);
+        offlineBrokerRequest = serverBrokerRequest;
+
+        requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE);
+        
requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
+      } else {
+        // REALTIME only
+        setTableName(serverBrokerRequest, realtimeTableName);
+        handleExpressionOverride(serverPinotQuery, 
_tableCache.getExpressionOverrideMap(realtimeTableName));
+        handleTimestampIndexOverride(serverPinotQuery, realtimeTableConfig);
+        _queryOptimizer.optimize(serverPinotQuery, realtimeTableConfig, 
schema);
+        realtimeBrokerRequest = serverBrokerRequest;
+
+        requestContext.setFanoutType(RequestContext.FanoutType.REALTIME);
+        
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+      }
 
-    // Check if response can be sent without server query evaluation.
-    if (offlineBrokerRequest != null && 
isFilterAlwaysFalse(offlineBrokerRequest.getPinotQuery())) {
-      // We don't need to evaluate offline request
-      offlineBrokerRequest = null;
-    }
-    if (realtimeBrokerRequest != null && 
isFilterAlwaysFalse(realtimeBrokerRequest.getPinotQuery())) {
-      // We don't need to evaluate realtime request
-      realtimeBrokerRequest = null;
-    }
+      // Check if response can be sent without server query evaluation.

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to