This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d6732ffd19 Logical table query quota enforcement - SSE (#15839)
d6732ffd19 is described below

commit d6732ffd1943a6a11358ad36645e07769e1c84ff
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Tue May 27 12:57:28 2025 +0530

    Logical table query quota enforcement - SSE (#15839)
---
 ...okerResourceOnlineOfflineStateModelFactory.java |   1 +
 .../BrokerUserDefinedMessageHandlerFactory.java    |   1 +
 .../HelixExternalViewBasedQueryQuotaManager.java   | 117 +++++++++++++--------
 .../pinot/broker/queryquota/QueryQuotaManager.java |   7 ++
 .../requesthandler/BaseBrokerRequestHandler.java   |   3 +-
 .../BaseSingleStageBrokerRequestHandler.java       |  23 ++--
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +-
 .../controller/helix/ControllerRequestClient.java  |  23 ++++
 .../helix/core/PinotHelixResourceManager.java      |   9 +-
 .../pinot/controller/helix/ControllerTest.java     |  12 ++-
 .../tests/QueryQuotaClusterIntegrationTest.java    |  94 ++++++++++++++++-
 .../BaseLogicalTableIntegrationTest.java           |  34 +++---
 ...hTwoOfflineOneRealtimeTableIntegrationTest.java |   2 +-
 13 files changed, 241 insertions(+), 90 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index cc2f2f406a..d9e83d4e4c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -80,6 +80,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory 
extends StateModelFact
       try {
         if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
           _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable);
+          
_queryQuotaManager.initOrUpdateLogicalTableQueryQuota(physicalOrLogicalTable);
         } else {
           _routingManager.buildRouting(physicalOrLogicalTable);
           TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 033a126ea6..81ea3d0d4f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -154,6 +154,7 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
     @Override
     public HelixTaskResult handleMessage() {
       _routingManager.buildRoutingForLogicalTable(_logicalTableName);
+      _queryQuotaManager.initOrUpdateLogicalTableQueryQuota(_logicalTableName);
       HelixTaskResult result = new HelixTaskResult();
       result.setSuccess(true);
       return result;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 925fbc4860..7bb8641271 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -49,6 +49,7 @@ import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.zookeeper.data.Stat;
@@ -175,6 +176,20 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
   }
 
+  public void initOrUpdateLogicalTableQueryQuota(String logicalTableName) {
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+    if (logicalTableConfig == null) {
+      LOGGER.info("No query quota to update since logical table config is 
null");
+      return;
+    }
+
+    LOGGER.info("Initializing rate limiter for logical table {}", 
logicalTableName);
+
+    ExternalView brokerResourceEV = getBrokerResource();
+    Stat stat = 
_propertyStore.getStat(constructLogicalTableConfigPath(logicalTableName), 
AccessOption.PERSISTENT);
+    createOrUpdateRateLimiter(logicalTableName, brokerResourceEV, 
logicalTableConfig.getQuotaConfig(), stat);
+  }
+
   public void initOrUpdateTableQueryQuota(String tableNameWithType) {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     ExternalView brokerResourceEV = getBrokerResource();
@@ -195,24 +210,25 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     LOGGER.info("Initializing rate limiter for table {}", tableNameWithType);
 
     // Create rate limiter if query quota config is specified.
-    createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, 
tableConfig.getQuotaConfig());
+    Stat stat = 
_propertyStore.getStat(constructTableConfigPath(tableNameWithType), 
AccessOption.PERSISTENT);
+    createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, 
tableConfig.getQuotaConfig(), stat);
   }
 
   /**
    * Drop table query quota.
-   * @param tableNameWithType table name with type.
+   * @param physicalOrLogicalTable physical or logical table name.
    */
-  public void dropTableQueryQuota(String tableNameWithType) {
-    LOGGER.info("Dropping rate limiter for table {}", tableNameWithType);
-    removeRateLimiter(tableNameWithType);
+  public void dropTableQueryQuota(String physicalOrLogicalTable) {
+    LOGGER.info("Dropping rate limiter for table {}", physicalOrLogicalTable);
+    removeRateLimiter(physicalOrLogicalTable);
   }
 
   /** Remove or update rate limiter if another table with the same raw table 
name but different type is still using
    * the quota config.
-   * @param tableNameWithType table name with type
+   * @param physicalOrLogicalTable physical or logical table name.
    */
-  private void removeRateLimiter(String tableNameWithType) {
-    _rateLimiterMap.remove(tableNameWithType);
+  private void removeRateLimiter(String physicalOrLogicalTable) {
+    _rateLimiterMap.remove(physicalOrLogicalTable);
   }
 
   /**
@@ -230,26 +246,27 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
 
   /**
    * Create or update a rate limiter for a table.
-   * @param tableNameWithType table name with table type.
+   * @param physicalOrLogicalTableName physical or logical table name.
    * @param brokerResource broker resource which stores all the broker states 
of each table.
    * @param quotaConfig quota config of the table.
+   * @param tableStat stat of the table config.
    */
-  private void createOrUpdateRateLimiter(String tableNameWithType, 
ExternalView brokerResource,
-      QuotaConfig quotaConfig) {
+  private void createOrUpdateRateLimiter(String physicalOrLogicalTableName, 
ExternalView brokerResource,
+      QuotaConfig quotaConfig, Stat tableStat) {
     if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
-      LOGGER.info("No qps config specified for table: {}", tableNameWithType);
-      buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType);
+      LOGGER.info("No qps config specified for table: {}", 
physicalOrLogicalTableName);
+      
buildEmptyOrResetRateLimiterInQueryQuotaEntity(physicalOrLogicalTableName);
       return;
     }
 
     if (brokerResource == null) {
-      LOGGER.warn("Failed to init qps quota for table {}. No broker resource 
connected!", tableNameWithType);
+      LOGGER.warn("Failed to init qps quota for table {}. No broker resource 
connected!", physicalOrLogicalTableName);
       // It could be possible that brokerResourceEV is null due to ZK 
connection issue.
       // In this case, the rate limiter should not be reset. Simply exit the 
method would be sufficient.
       return;
     }
 
-    Map<String, String> stateMap = 
brokerResource.getStateMap(tableNameWithType);
+    Map<String, String> stateMap = 
brokerResource.getStateMap(physicalOrLogicalTableName);
     int otherOnlineBrokerCount = 0;
 
     // If stateMap is null, that means this broker is the first broker for 
this table.
@@ -263,26 +280,22 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
 
     int onlineCount = otherOnlineBrokerCount + 1;
-    LOGGER.info("The number of online brokers for table {} is {}", 
tableNameWithType, onlineCount);
+    LOGGER.info("The number of online brokers for table {} is {}", 
physicalOrLogicalTableName, onlineCount);
 
     // Get the dynamic rate
     double overallRate = quotaConfig.getMaxQPS();
-
-    // Get stat from property store
-    String tableConfigPath = constructTableConfigPath(tableNameWithType);
-    Stat stat = _propertyStore.getStat(tableConfigPath, 
AccessOption.PERSISTENT);
     double perBrokerRate = overallRate / onlineCount;
 
-    QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
+    QueryQuotaEntity queryQuotaEntity = 
_rateLimiterMap.get(physicalOrLogicalTableName);
     if (queryQuotaEntity == null) {
       queryQuotaEntity =
           new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
-              new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 
onlineCount, overallRate, stat.getVersion());
-      _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
+              new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 
onlineCount, overallRate, tableStat.getVersion());
+      _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity);
       LOGGER.info(
           "Rate limiter for table: {} has been initialized. Overall rate: {}. 
Per-broker rate: {}. Number of online "
-              + "broker instances: {}. Table config stat version: {}", 
tableNameWithType, overallRate, perBrokerRate,
-          onlineCount, stat.getVersion());
+              + "broker instances: {}. Table config stat version: {}", 
physicalOrLogicalTableName, overallRate,
+          perBrokerRate, onlineCount, tableStat.getVersion());
     } else {
       RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
       double previousRate = -1;
@@ -297,14 +310,14 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       }
       queryQuotaEntity.setNumOnlineBrokers(onlineCount);
       queryQuotaEntity.setOverallRate(overallRate);
-      queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
+      queryQuotaEntity.setTableConfigStatVersion(tableStat.getVersion());
       LOGGER.info(
           "Rate limiter for table: {} has been updated. Overall rate: {}. 
Previous per-broker rate: {}. New "
               + "per-broker rate: {}. Number of online broker instances: {}. 
Table config stat version: {}",
-          tableNameWithType, overallRate, previousRate, perBrokerRate, 
onlineCount, stat.getVersion());
+          physicalOrLogicalTableName, overallRate, previousRate, 
perBrokerRate, onlineCount, tableStat.getVersion());
     }
-    addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
-    addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
+    addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, 
queryQuotaEntity);
+    
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName,
 queryQuotaEntity);
     if (isQueryRateLimitDisabled()) {
       LOGGER.info("Query rate limiting is currently disabled for this broker. 
So it won't take effect immediately.");
     }
@@ -536,19 +549,19 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
    * Build an empty rate limiter in the new query quota entity, or set the 
rate limiter to null in an existing query
    * quota entity.
    */
-  private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String 
tableNameWithType) {
-    QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
+  private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String 
physicalOrLogicalTableName) {
+    QueryQuotaEntity queryQuotaEntity = 
_rateLimiterMap.get(physicalOrLogicalTableName);
     if (queryQuotaEntity == null) {
       // Create an QueryQuotaEntity object without setting a rate limiter.
       queryQuotaEntity = new QueryQuotaEntity(null, new 
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
           new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
-      _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
+      _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity);
     } else {
       // Set rate limiter to null for an existing QueryQuotaEntity object.
       queryQuotaEntity.setRateLimiter(null);
     }
-    addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
-    addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
+    addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, 
queryQuotaEntity);
+    
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName,
 queryQuotaEntity);
   }
 
   /**
@@ -680,6 +693,16 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return offlineQuotaOk && realtimeQuotaOk;
   }
 
+  @Override
+  public boolean acquireLogicalTable(String logicalTableName) {
+    QueryQuotaEntity logicalTableQueryQuotaEntity = 
_rateLimiterMap.get(logicalTableName);
+    if (logicalTableQueryQuotaEntity != null) {
+      LOGGER.debug("Trying to acquire token for logical table: {}", 
logicalTableName);
+      return tryAcquireToken(logicalTableName, logicalTableQueryQuotaEntity);
+    }
+    return true;
+  }
+
   /**
    * Try to acquire token from rate limiter. Emit the utilization of the qps 
quota if broker metric isn't null.
    * @param resourceName resource name to acquire.
@@ -749,7 +772,7 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     int numRebuilt = 0;
     for (Iterator<Map.Entry<String, QueryQuotaEntity>> it = 
_rateLimiterMap.entrySet().iterator(); it.hasNext(); ) {
       Map.Entry<String, QueryQuotaEntity> entry = it.next();
-      String tableNameWithType = entry.getKey();
+      String physicalOrLogicalTableName = entry.getKey();
       QueryQuotaEntity queryQuotaEntity = entry.getValue();
       if (queryQuotaEntity.getRateLimiter() == null) {
         // No rate limiter set, skip this table.
@@ -757,9 +780,9 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       }
 
       // Get number of online brokers.
-      Map<String, String> stateMap = 
currentBrokerResourceEV.getStateMap(tableNameWithType);
+      Map<String, String> stateMap = 
currentBrokerResourceEV.getStateMap(physicalOrLogicalTableName);
       if (stateMap == null) {
-        LOGGER.info("No broker resource for Table {}. Removing its rate 
limit.", tableNameWithType);
+        LOGGER.info("No broker resource for Table {}. Removing its rate 
limit.", physicalOrLogicalTableName);
         it.remove();
         continue;
       }
@@ -773,10 +796,14 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       int onlineBrokerCount = otherOnlineBrokerCount + 1;
 
       // Get stat from property store
-      String tableConfigPath = constructTableConfigPath(tableNameWithType);
-      Stat stat = _propertyStore.getStat(tableConfigPath, 
AccessOption.PERSISTENT);
+      String physicalOrLogicalTableConfigPath =
+          ZKMetadataProvider.isTableConfigExists(_propertyStore, 
physicalOrLogicalTableName)
+              ? constructTableConfigPath(physicalOrLogicalTableName)
+              : constructLogicalTableConfigPath(physicalOrLogicalTableName);
+      Stat stat = _propertyStore.getStat(physicalOrLogicalTableConfigPath, 
AccessOption.PERSISTENT);
       if (stat == null) {
-        LOGGER.info("Table {} has been deleted from property store. Removing 
its rate limit.", tableNameWithType);
+        LOGGER.info("Table {} has been deleted from property store. Removing 
its rate limit.",
+            physicalOrLogicalTableName);
         it.remove();
         continue;
       }
@@ -790,10 +817,10 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       double overallRate;
       // Get latest quota config only if stat don't match.
       if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) {
-        QuotaConfig quotaConfig = 
getQuotaConfigFromPropertyStore(tableNameWithType);
+        QuotaConfig quotaConfig = 
getQuotaConfigFromPropertyStore(physicalOrLogicalTableName);
         if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == 
null) {
           LOGGER.info("No query quota config or the config is invalid for 
Table {}. Removing its rate limit.",
-              tableNameWithType);
+              physicalOrLogicalTableName);
           it.remove();
           continue;
         }
@@ -810,7 +837,7 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
         queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
         LOGGER.info("Rate limiter for table: {} has been updated. Overall 
rate: {}. Previous per-broker rate: {}. New "
                 + "per-broker rate: {}. Number of online broker instances: {}. 
Table config stat version: {}.",
-            tableNameWithType, overallRate, previousRate, latestRate, 
onlineBrokerCount, stat.getVersion());
+            physicalOrLogicalTableName, overallRate, previousRate, latestRate, 
onlineBrokerCount, stat.getVersion());
         numRebuilt++;
       }
     }
@@ -931,4 +958,8 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
   private String constructTableConfigPath(String tableNameWithType) {
     return "/CONFIGS/TABLE/" + tableNameWithType;
   }
+
+  private String constructLogicalTableConfigPath(String tableName) {
+    return "/LOGICAL/TABLE/" + tableName;
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
index 70c3ef7588..eb3de472d9 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -27,6 +27,13 @@ public interface QueryQuotaManager {
    */
   boolean acquire(String tableName);
 
+  /**
+   * Try to acquire a quota for the given logical table.
+   * @param logicalTableName Logical table name
+   * @return {@code true} if the table quota has not been reached, {@code 
false} otherwise
+   */
+  boolean acquireLogicalTable(String logicalTableName);
+
   /**
    * Try to acquire a quota for the given database.
    * @param databaseName database name
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 2367334497..2a344876c3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -285,7 +285,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
    * @return true if the query was successfully cancelled, false otherwise.
    */
   protected abstract boolean handleCancel(long queryId, int timeoutMs, 
Executor executor,
-      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses) throws Exception;
+      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses)
+      throws Exception;
 
   protected static void augmentStatistics(RequestContext statistics, 
BrokerResponse response) {
     statistics.setNumRowsResultSet(response.getNumRowsResultSet());
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 7901772362..ab013085cb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -366,8 +366,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     // Compile the request into PinotQuery
     long compilationStartTimeNs = System.nanoTime();
     CompileResult compileResult =
-          compileRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext, httpHeaders,
-              accessControl);
+        compileRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext, httpHeaders,
+            accessControl);
 
     if (compileResult._errorOrLiteralOnlyBrokerResponse != null) {
       /*
@@ -406,8 +406,17 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       }
 
       // Validate QPS
-      if (hasExceededQPSQuota(database, physicalTableNames, requestContext)) {
-        String errorMessage = String.format("Request %d: %s exceeds query 
quota.", requestId, query);
+      if (!_queryQuotaManager.acquireDatabase(database)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for database: 
%s", requestId, query, database);
+        LOGGER.info(errorMessage);
+        requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+        return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
+      }
+      if (!_queryQuotaManager.acquireLogicalTable(tableName)) {
+        String errorMessage =
+            String.format("Request %d: %s exceeds query quota for table: %s.", 
requestId, query, tableName);
+        requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
         return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
       }
 
@@ -815,9 +824,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       if (ParserUtils.canCompileWithMultiStageEngine(query, database, 
_tableCache)) {
         return new CompileResult(new 
BrokerResponseNative(QueryErrorCode.SQL_PARSING,
             "It seems that the query is only supported by the multi-stage 
query engine, please retry the query "
-                    + "using "
-                    + "the multi-stage query engine "
-                    + 
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"));
+                + "using "
+                + "the multi-stage query engine "
+                + 
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"));
       } else {
         return new CompileResult(
             new BrokerResponseNative(QueryErrorCode.SQL_PARSING, 
e.getMessage()));
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 029f5c3491..eff90fcf49 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -884,7 +884,8 @@ public class ZKMetadataProvider {
     return 
propertyStore.exists(constructPropertyStorePathForLogical(tableName), 
AccessOption.PERSISTENT);
   }
 
-  public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableName) {
-    return 
propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName), 
AccessOption.PERSISTENT);
+  public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableNameWithType) {
+    return 
propertyStore.exists(constructPropertyStorePathForResourceConfig(tableNameWithType),
+        AccessOption.PERSISTENT);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index e619cbf008..7969d2aa86 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
@@ -137,6 +138,17 @@ public class ControllerRequestClient {
     }
   }
 
+  public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(
+          _httpClient.sendJsonPostRequest(new 
URI(_controllerRequestURLBuilder.forLogicalTableCreate()),
+              logicalTableConfig.toJsonString(), _headers));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void updateTableConfig(TableConfig tableConfig)
       throws IOException {
     try {
@@ -148,6 +160,17 @@ public class ControllerRequestClient {
     }
   }
 
+  public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPutRequest(
+          new 
URI(_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableConfig.getTableName())),
+          logicalTableConfig.toJsonString(), _headers));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void toggleTableState(String tableName, TableType type, boolean 
enable)
       throws IOException {
     try {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 892e3fbc11..09bd904aee 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -180,7 +180,6 @@ import org.apache.pinot.spi.config.user.UserConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -2152,13 +2151,7 @@ public class PinotHelixResourceManager {
       updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
     }
 
-    TimeBoundaryConfig oldTimeBoundaryConfig = 
oldLogicalTableConfig.getTimeBoundaryConfig();
-    TimeBoundaryConfig newTimeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
-    // compare the old and new time boundary config and send message if they 
are different
-    if ((oldTimeBoundaryConfig != null && 
!oldTimeBoundaryConfig.equals(newTimeBoundaryConfig))
-    || (oldTimeBoundaryConfig == null && newTimeBoundaryConfig != null)) {
-      sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName());
-    }
+    sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName());
 
     LOGGER.info("Updated logical table {}: Successfully updated table", 
tableName);
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index cfd0f3f6cc..cb04aed946 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -407,7 +407,7 @@ public class ControllerTest {
         .setBrokerTenant(brokerTenant)
         .setRefOfflineTableName(offlineTableName)
         .setRefRealtimeTableName(realtimeTableName)
-        .setQuotaConfig(new QuotaConfig(null, "999"))
+        .setQuotaConfig(new QuotaConfig(null, "99999"))
         .setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L))
         .setTimeBoundaryConfig(new TimeBoundaryConfig("min", 
Map.of("includedTables", physicalTableNames)))
         .setPhysicalTableConfigMap(physicalTableConfigMap);
@@ -752,11 +752,21 @@ public class ControllerTest {
     getControllerRequestClient().addTableConfig(tableConfig);
   }
 
+  public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+      throws IOException {
+    getControllerRequestClient().addLogicalTableConfig(logicalTableConfig);
+  }
+
   public void updateTableConfig(TableConfig tableConfig)
       throws IOException {
     getControllerRequestClient().updateTableConfig(tableConfig);
   }
 
+  public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+      throws IOException {
+    getControllerRequestClient().updateLogicalTableConfig(logicalTableConfig);
+  }
+
   public void toggleTableState(String tableName, TableType type, boolean 
enable)
       throws IOException {
     getControllerRequestClient().toggleTableState(tableName, type, enable);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index f3945bcf1f..f91a4a9148 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.URI;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
 import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest;
@@ -35,10 +36,12 @@ import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -76,6 +79,12 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     TableConfig tableConfig = createOfflineTableConfig();
     addTableConfig(tableConfig);
 
+    // Create and upload schema and logical table
+    schema.setSchemaName(getLogicalTableName());
+    addSchema(schema);
+    LogicalTableConfig logicalTableConfig = getLogicalTableConfig();
+    addLogicalTableConfig(logicalTableConfig);
+
     Properties properties = new Properties();
     properties.put(FAIL_ON_EXCEPTIONS, "FALSE");
     _pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory()
@@ -96,9 +105,11 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     setQueryQuotaForApplication(null);
     addQueryQuotaToDatabaseConfig(null);
     addQueryQuotaToTableConfig(null);
+    addQueryQuotaToLogicalTableConfig(null);
 
     _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
     verifyQuotaUpdate(0);
+    verifyQuotaUpdateWithTableName(0, getLogicalTableName());
   }
 
   @Test
@@ -258,6 +269,48 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     }
   }
 
+  @Test
+  public void testLogicalTableQueryQuota()
+      throws Exception {
+    int maxQps = 10;
+    addQueryQuotaToLogicalTableConfig(maxQps);
+    verifyQuotaUpdateWithTableName(maxQps, getLogicalTableName());
+    runQueries(maxQps, false, "default", getLogicalTableName());
+    //increase the qps and some of the queries should be throttled.
+    runQueries(maxQps * 2, true, "default", getLogicalTableName());
+
+    // queries on broker
+    runQueriesOnBroker(maxQps, false, getLogicalTableName());
+    //increase the qps and some of the queries should be throttled.
+    runQueriesOnBroker(maxQps * 2, true, getLogicalTableName());
+  }
+
+  @Test
+  public void testLogicalTableWithDatabaseQueryQuota()
+      throws Exception {
+    int databaseMaxQps = 25;
+    int logicalTableMaxQps = 10;
+    addQueryQuotaToDatabaseConfig(databaseMaxQps);
+    addQueryQuotaToLogicalTableConfig(logicalTableMaxQps);
+    // table quota within database quota. Queries should fail upon table quota 
(10 qps) breach
+    verifyQuotaUpdateWithTableName(logicalTableMaxQps, getLogicalTableName());
+    runQueries(logicalTableMaxQps, false, "default", getLogicalTableName());
+    // queries on broker
+    runQueriesOnBroker(logicalTableMaxQps, false, getLogicalTableName());
+
+    //increase the logical table qps.
+    logicalTableMaxQps = 50;
+    addQueryQuotaToLogicalTableConfig(logicalTableMaxQps);
+    verifyQuotaUpdateWithTableName(databaseMaxQps, getLogicalTableName());
+    runQueries(databaseMaxQps, false, "default", getLogicalTableName());
+    // broker queries
+    runQueriesOnBroker(databaseMaxQps, false, getLogicalTableName());
+
+    //increase the qps and some of the queries should be throttled.
+    runQueries(databaseMaxQps * 2, true, "default", getLogicalTableName());
+    runQueriesOnBroker(databaseMaxQps * 2, true, getLogicalTableName());
+  }
+
   /**
    * Runs the query load with the max rate that the quota can allow and 
ensures queries are not failing.
    * Then runs the query load with double the max rate and expects queries to 
fail due to quota breach.
@@ -295,15 +348,18 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
   private void runQueries(int qps, boolean shouldFail) {
     runQueries(qps, shouldFail, "default");
   }
+  private void runQueries(int qps, boolean shouldFail, String applicationName) 
{
+    runQueries(qps, shouldFail, applicationName, getTableName());
+  }
 
   // try to keep the qps below 50 to ensure that the time lost between 2 query 
runs on top of the sleepMillis
   // is not comparable to sleepMillis, else the actual qps would end up being 
much lower than required qps
-  private void runQueries(int qps, boolean shouldFail, String applicationName) 
{
+  private void runQueries(int qps, boolean shouldFail, String applicationName, 
String tableName) {
     int failCount = 0;
     boolean isLastFail = false;
     long deadline = System.currentTimeMillis() + 1000;
 
-    String query = "SET applicationName='" + applicationName + "'; SELECT 
COUNT(*) FROM " + getTableName();
+    String query = "SET applicationName='" + applicationName + "'; SELECT 
COUNT(*) FROM " + tableName;
 
     for (int i = 0; i < qps; i++) {
       sleep(deadline, qps - i);
@@ -333,11 +389,15 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
   private static volatile String _quotaSource;
 
   private void verifyQuotaUpdate(double quotaQps) {
+    verifyQuotaUpdateWithTableName(quotaQps, getTableName() + "_OFFLINE");
+  }
+
+  private void verifyQuotaUpdateWithTableName(double quotaQps, String 
tableName) {
     try {
       TestUtils.waitForCondition(aVoid -> {
         try {
           double tableQuota = Double.parseDouble(sendGetRequest(
-              "http://"; + _brokerHostPort + "/debug/tables/queryQuota/" + 
getTableName() + "_OFFLINE"));
+              "http://"; + _brokerHostPort + "/debug/tables/queryQuota/" + 
tableName));
           double dbQuota = Double.parseDouble(
               sendGetRequest("http://"; + _brokerHostPort + 
"/debug/databases/queryQuota/default"));
           double appQuota = Double.parseDouble(
@@ -363,7 +423,7 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
-      }, 10000, "Failed to reflect query quota on rate limiter in 5s.");
+      }, 10000, "Failed to reflect query quota on rate limiter in 10s.");
     } catch (AssertionError ae) {
       throw new AssertionError(
           ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + 
_quota + " set on: " + _quotaSource, ae);
@@ -375,13 +435,17 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
   }
 
   private void runQueriesOnBroker(float qps, boolean shouldFail) {
+    runQueriesOnBroker(qps, shouldFail, getTableName());
+  }
+
+  private void runQueriesOnBroker(float qps, boolean shouldFail, String 
tableName) {
     int failCount = 0;
     long deadline = System.currentTimeMillis() + 1000;
 
     for (int i = 0; i < qps; i++) {
       sleep(deadline, qps - i);
       BrokerResponse resultSetGroup =
-          executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) 
FROM " + getTableName());
+          executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) 
FROM " + tableName);
       for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); 
it.hasNext(); ) {
         JsonNode exception = it.next();
         if (exception.get("errorCode").asInt() == 
QueryErrorCode.TOO_MANY_REQUESTS.getId()) {
@@ -406,6 +470,14 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     // to allow change propagation to QueryQuotaManager
   }
 
+  public void addQueryQuotaToLogicalTableConfig(Integer maxQps)
+      throws Exception {
+    LogicalTableConfig logicalTableConfig = getLogicalTableConfig();
+    logicalTableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? 
null : maxQps.toString()));
+    updateLogicalTableConfig(logicalTableConfig);
+    // to allow change propagation to QueryQuotaManager
+  }
+
   public void addQueryQuotaToDatabaseConfig(Integer maxQps)
       throws Exception {
     String url = _controllerRequestURLBuilder.getBaseUrl() + 
"/databases/default/quotas";
@@ -453,4 +525,16 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     }
     // to allow change propagation to QueryQuotaManager
   }
+
+  private static String getLogicalTableName() {
+    return "logical_table";
+  }
+
+  private LogicalTableConfig getLogicalTableConfig() {
+    List<String> physicalTableNames = 
List.of(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()));
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig(getLogicalTableName(), physicalTableNames, 
"DefaultTenant");
+    logicalTableConfig.setQuotaConfig(new QuotaConfig(null, null));
+    return logicalTableConfig;
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 29c6f05516..7fb4802cc1 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -324,16 +324,6 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     return LogicalTableConfig.fromString(resp);
   }
 
-  protected void updateLogicalTableConfig(String logicalTableName, 
LogicalTableConfig logicalTableConfig)
-      throws IOException {
-    String updateLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName);
-    String resp =
-        ControllerTest.sendPutRequest(updateLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-
-    assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + 
getLogicalTableName()
-        + " logical table successfully updated.\"}");
-  }
-
   protected void deleteLogicalTable()
       throws IOException {
     String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName());
@@ -448,7 +438,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     QueryConfig queryConfig = new QueryConfig(null, false, null, null, null, 
null);
     LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
 
     String groovyQuery = "SELECT 
GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', "
         + "'arg0 + arg1', FlightNum, Origin) FROM mytable";
@@ -460,7 +450,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     queryConfig = new QueryConfig(null, true, null, null, null, null);
 
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
 
     // grpc and http throw different exceptions. So only check error message.
     Exception athrows = expectThrows(Exception.class, () -> 
postQuery(groovyQuery));
@@ -468,7 +458,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
 
     // Remove query config
     logicalTableConfig.setQueryConfig(null);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
 
     athrows = expectThrows(Exception.class, () -> postQuery(groovyQuery));
     assertTrue(athrows.getMessage().contains("Groovy transform functions are 
disabled for queries"));
@@ -482,7 +472,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     QueryConfig queryConfig = new QueryConfig(null, null, null, null, 100L, 
null);
     LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
 
     JsonNode response = postQuery(starQuery);
     JsonNode exceptions = response.get("exceptions");
@@ -492,7 +482,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     // Query Succeeds with a high limit.
     queryConfig = new QueryConfig(null, null, null, null, 1000000L, null);
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     response = postQuery(starQuery);
     exceptions = response.get("exceptions");
     assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -500,7 +490,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     //Reset to null.
     queryConfig = new QueryConfig(null, null, null, null, null, null);
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     response = postQuery(starQuery);
     exceptions = response.get("exceptions");
     assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -514,7 +504,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 
1000L);
     LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     JsonNode response = postQuery(starQuery);
     JsonNode exceptions = response.get("exceptions");
     assertTrue(!exceptions.isEmpty()
@@ -523,7 +513,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     // Query Succeeds with a high limit.
     queryConfig = new QueryConfig(null, null, null, null, null, 1000000L);
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     response = postQuery(starQuery);
     exceptions = response.get("exceptions");
     assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -531,7 +521,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     //Reset to null.
     queryConfig = new QueryConfig(null, null, null, null, null, null);
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     response = postQuery(starQuery);
     exceptions = response.get("exceptions");
     assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -544,7 +534,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     QueryConfig queryConfig = new QueryConfig(1L, null, null, null, null, 
null);
     LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     JsonNode response = postQuery(starQuery);
     JsonNode exceptions = response.get("exceptions");
     assertTrue(
@@ -555,7 +545,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     // Query Succeeds with a high limit.
     queryConfig = new QueryConfig(1000000L, null, null, null, null, null);
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     response = postQuery(starQuery);
     exceptions = response.get("exceptions");
     assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -563,7 +553,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     //Reset to null.
     queryConfig = new QueryConfig(null, null, null, null, null, null);
     logicalTableConfig.setQueryConfig(queryConfig);
-    updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
     response = postQuery(starQuery);
     exceptions = response.get("exceptions");
     assertTrue(exceptions.isEmpty(), "Query should not throw exception");
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
index ba594a303c..cafa33a56d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
@@ -76,6 +76,6 @@ public class 
LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B
     logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters);
     logicalTableConfig.setQueryConfig(null);
 
-    updateLogicalTableConfig(logicalTableConfig.getTableName(), 
logicalTableConfig);
+    updateLogicalTableConfig(logicalTableConfig);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to