This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d7eeb44  Update quota manager to reduce zk access (#4979)
d7eeb44 is described below

commit d7eeb44430c75b2684680fa09b6244f989368940
Author: Jialiang Li <[email protected]>
AuthorDate: Tue Feb 11 11:53:51 2020 -0800

    Update quota manager to reduce zk access (#4979)
    
    * Update quota manager to reduce zk access
---
 .../HelixExternalViewBasedQueryQuotaManager.java   | 146 +++++++++++++--------
 ...QueryQuotaConfig.java => QueryQuotaEntity.java} |  36 ++++-
 ...elixExternalViewBasedQueryQuotaManagerTest.java |  69 ++++++----
 3 files changed, 169 insertions(+), 82 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 4bc4ee6..0261914 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.RateLimiter;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
@@ -39,34 +41,43 @@ import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-
 
+/**
+ * This class is to support the qps quota feature.
+ * It depends on the broker source change to update the dynamic rate limit,
+ *  which means it only gets updated when a new table added or a broker 
restarted.
+ * TODO: support adding new rate limiter for existing tables without 
restarting the broker.
+ */
 public class HelixExternalViewBasedQueryQuotaManager implements 
ClusterChangeHandler, QueryQuotaManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
   private static final int TIME_RANGE_IN_SECOND = 1;
 
   private final AtomicInteger _lastKnownBrokerResourceVersion = new 
AtomicInteger(-1);
-  private final Map<String, QueryQuotaConfig> _rateLimiterMap = new 
ConcurrentHashMap<>();
+  private final Map<String, QueryQuotaEntity> _rateLimiterMap = new 
ConcurrentHashMap<>();
 
   private HelixManager _helixManager;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private BrokerMetrics _brokerMetrics;
 
   @Override
   public void init(HelixManager helixManager) {
     Preconditions.checkState(_helixManager == null, 
"HelixExternalViewBasedQueryQuotaManager is already initialized");
     _helixManager = helixManager;
+    _propertyStore = _helixManager.getHelixPropertyStore();
   }
 
   @Override
   public void processClusterChange(HelixConstants.ChangeType changeType) {
     Preconditions
         .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW, 
"Illegal change type: " + changeType);
-    processQueryQuotaChange();
+    ExternalView brokerResourceEV = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    processQueryQuotaChange(brokerResourceEV);
   }
 
   /**
@@ -85,10 +96,10 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     CommonConstants.Helix.TableType tableType = tableConfig.getTableType();
     if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
       offlineQuotaConfig = tableConfig.getQuotaConfig();
-      realtimeQuotaConfig = getQuotaConfigFromPropertyStore(rawTableName, 
CommonConstants.Helix.TableType.REALTIME);
+      realtimeQuotaConfig = 
getQuotaConfigFromPropertyStore(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
     } else {
       realtimeQuotaConfig = tableConfig.getQuotaConfig();
-      offlineQuotaConfig = getQuotaConfigFromPropertyStore(rawTableName, 
CommonConstants.Helix.TableType.OFFLINE);
+      offlineQuotaConfig = 
getQuotaConfigFromPropertyStore(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
     }
     // Log a warning if MaxQueriesPerSecond are set different.
     if ((offlineQuotaConfig != null && 
!Strings.isNullOrEmpty(offlineQuotaConfig.getMaxQueriesPerSecond())) && (
@@ -122,15 +133,11 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
 
   /**
    * Get QuotaConfig from property store.
-   * @param rawTableName table name without table type.
-   * @param tableType table type: offline or real-time.
+   * @param tableNameWithType table name with table type.
    * @return QuotaConfig, which could be null.
    */
-  private QuotaConfig getQuotaConfigFromPropertyStore(String rawTableName, 
CommonConstants.Helix.TableType tableType) {
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixManager.getHelixPropertyStore();
-
-    String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
-    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(propertyStore, 
tableNameWithType);
+  private QuotaConfig getQuotaConfigFromPropertyStore(String 
tableNameWithType) {
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     if (tableConfig == null) {
       return null;
     }
@@ -180,13 +187,18 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       return;
     }
 
+    // Get stat from property store
+    String tableConfigPath = constructTableConfigPath(tableNameWithType);
+    Stat stat = _propertyStore.getStat(tableConfigPath, 
AccessOption.PERSISTENT);
+
     double perBrokerRate = overallRate / onlineCount;
-    QueryQuotaConfig queryQuotaConfig =
-        new QueryQuotaConfig(RateLimiter.create(perBrokerRate), new 
HitCounter(TIME_RANGE_IN_SECOND));
-    _rateLimiterMap.put(tableNameWithType, queryQuotaConfig);
+    QueryQuotaEntity queryQuotaEntity =
+        new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new 
HitCounter(TIME_RANGE_IN_SECOND), onlineCount,
+            overallRate, stat.getVersion());
+    _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
     LOGGER.info(
-        "Rate limiter for table: {} has been initialized. Overall rate: {}. 
Per-broker rate: {}. Number of online broker instances: {}",
-        tableNameWithType, overallRate, perBrokerRate, onlineCount);
+        "Rate limiter for table: {} has been initialized. Overall rate: {}. 
Per-broker rate: {}. Number of online broker instances: {}. Table config stat 
version: {}",
+        tableNameWithType, overallRate, perBrokerRate, onlineCount, 
stat.getVersion());
   }
 
   /**
@@ -200,27 +212,27 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     LOGGER.debug("Trying to acquire token for table: {}", tableName);
     String offlineTableName = null;
     String realtimeTableName = null;
-    QueryQuotaConfig offlineTableQueryQuotaConfig = null;
-    QueryQuotaConfig realtimeTableQueryQuotaConfig = null;
+    QueryQuotaEntity offlineTableQueryQuotaEntity = null;
+    QueryQuotaEntity realtimeTableQueryQuotaEntity = null;
 
     CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
       offlineTableName = tableName;
-      offlineTableQueryQuotaConfig = _rateLimiterMap.get(tableName);
+      offlineTableQueryQuotaEntity = _rateLimiterMap.get(tableName);
     } else if (tableType == CommonConstants.Helix.TableType.REALTIME) {
       realtimeTableName = tableName;
-      realtimeTableQueryQuotaConfig = _rateLimiterMap.get(tableName);
+      realtimeTableQueryQuotaEntity = _rateLimiterMap.get(tableName);
     } else {
       offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
       realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
-      offlineTableQueryQuotaConfig = _rateLimiterMap.get(offlineTableName);
-      realtimeTableQueryQuotaConfig = _rateLimiterMap.get(realtimeTableName);
+      offlineTableQueryQuotaEntity = _rateLimiterMap.get(offlineTableName);
+      realtimeTableQueryQuotaEntity = _rateLimiterMap.get(realtimeTableName);
     }
 
     boolean offlineQuotaOk =
-        offlineTableQueryQuotaConfig == null || 
tryAcquireToken(offlineTableName, offlineTableQueryQuotaConfig);
+        offlineTableQueryQuotaEntity == null || 
tryAcquireToken(offlineTableName, offlineTableQueryQuotaEntity);
     boolean realtimeQuotaOk =
-        realtimeTableQueryQuotaConfig == null || 
tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaConfig);
+        realtimeTableQueryQuotaEntity == null || 
tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity);
 
     return offlineQuotaOk && realtimeQuotaOk;
   }
@@ -228,18 +240,18 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
   /**
    * Try to acquire token from rate limiter. Emit the utilization of the qps 
quota if broker metric isn't null.
    * @param tableNameWithType table name with type.
-   * @param queryQuotaConfig query quota config for type-specific table.
+   * @param queryQuotaEntity query quota entity for type-specific table.
    * @return true if there's no qps quota for that table, or a token is 
acquired successfully.
    */
-  private boolean tryAcquireToken(String tableNameWithType, QueryQuotaConfig 
queryQuotaConfig) {
+  private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity 
queryQuotaEntity) {
     // Use hit counter to count the number of hits.
-    queryQuotaConfig.getHitCounter().hit();
+    queryQuotaEntity.getHitCounter().hit();
 
-    RateLimiter rateLimiter = queryQuotaConfig.getRateLimiter();
+    RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
     double perBrokerRate = rateLimiter.getRate();
 
     // Emit the qps capacity utilization rate.
-    int numHits = queryQuotaConfig.getHitCounter().getHitCount();
+    int numHits = queryQuotaEntity.getHitCounter().getHitCount();
     if (_brokerMetrics != null) {
       int percentageOfCapacityUtilization = (int) (numHits * 100 / 
perBrokerRate);
       LOGGER.debug("The percentage of rate limit capacity utilization is {}", 
percentageOfCapacityUtilization);
@@ -273,13 +285,10 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
   /**
    * Process query quota change when number of online brokers has changed.
    */
-  public void processQueryQuotaChange() {
+  public void processQueryQuotaChange(ExternalView currentBrokerResource) {
     LOGGER.info("Start processing qps quota change.");
     long startTime = System.currentTimeMillis();
 
-    ExternalView currentBrokerResource = HelixHelper
-        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
-            BROKER_RESOURCE_INSTANCE);
     if (currentBrokerResource == null) {
       LOGGER.warn("Finish processing qps quota change: external view for 
broker resource is null!");
       return;
@@ -291,27 +300,16 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
 
     int numRebuilt = 0;
-    for (Map.Entry<String, QueryQuotaConfig> entry : 
_rateLimiterMap.entrySet()) {
+    for (Iterator<Map.Entry<String, QueryQuotaEntity>> it = 
_rateLimiterMap.entrySet().iterator(); it.hasNext(); ) {
+      Map.Entry<String, QueryQuotaEntity> entry = it.next();
       String tableNameWithType = entry.getKey();
-      QueryQuotaConfig queryQuotaConfig = entry.getValue();
-      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-
-      // Get latest quota config for table.
-      QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(rawTableName, 
tableType);
-      if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null 
|| !quotaConfig
-          .isMaxQueriesPerSecondValid()) {
-        LOGGER.info("No query quota config or the config is invalid for Table 
{}. Removing its rate limit.",
-            tableNameWithType);
-        removeRateLimiter(tableNameWithType);
-        continue;
-      }
+      QueryQuotaEntity queryQuotaEntity = entry.getValue();
 
       // Get number of online brokers.
       Map<String, String> stateMap = 
currentBrokerResource.getStateMap(tableNameWithType);
       if (stateMap == null) {
         LOGGER.info("No broker resource for Table {}. Removing its rate 
limit.", tableNameWithType);
-        removeRateLimiter(tableNameWithType);
+        it.remove();
         continue;
       }
       int otherOnlineBrokerCount = 0;
@@ -323,11 +321,43 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       }
       int onlineBrokerCount = otherOnlineBrokerCount + 1;
 
-      double overallRate = 
Double.parseDouble(quotaConfig.getMaxQueriesPerSecond());
+      // Get stat from property store
+      String tableConfigPath = constructTableConfigPath(tableNameWithType);
+      Stat stat = _propertyStore.getStat(tableConfigPath, 
AccessOption.PERSISTENT);
+      if (stat == null) {
+        LOGGER.info("Table {} has been deleted from property store. Removing 
its rate limit.", tableNameWithType);
+        it.remove();
+        continue;
+      }
+
+      // If number of online brokers and table config don't change, there is 
no need to re-calculate the dynamic rate.
+      if (onlineBrokerCount == queryQuotaEntity.getNumOnlineBrokers() && 
stat.getVersion() == queryQuotaEntity
+          .getTableConfigStatVersion()) {
+        continue;
+      }
+
+      double overallRate;
+      // Get latest quota config only if stat don't match.
+      if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) {
+        QuotaConfig quotaConfig = 
getQuotaConfigFromPropertyStore(tableNameWithType);
+        if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == 
null || !quotaConfig
+            .isMaxQueriesPerSecondValid()) {
+          LOGGER.info("No query quota config or the config is invalid for 
Table {}. Removing its rate limit.",
+              tableNameWithType);
+          it.remove();
+          continue;
+        }
+        overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond());
+      } else {
+        overallRate = queryQuotaEntity.getOverallRate();
+      }
       double latestRate = overallRate / onlineBrokerCount;
-      double previousRate = queryQuotaConfig.getRateLimiter().getRate();
+      double previousRate = queryQuotaEntity.getRateLimiter().getRate();
       if (Math.abs(latestRate - previousRate) > 0.001) {
-        queryQuotaConfig.getRateLimiter().setRate(latestRate);
+        queryQuotaEntity.getRateLimiter().setRate(latestRate);
+        queryQuotaEntity.setNumOnlineBrokers(onlineBrokerCount);
+        queryQuotaEntity.setOverallRate(overallRate);
+        queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
         LOGGER.info(
             "Rate limiter for table: {} has been updated. Overall rate: {}. 
Previous per-broker rate: {}. New per-broker rate: {}. Number of online broker 
instances: {}",
             tableNameWithType, overallRate, previousRate, latestRate, 
onlineBrokerCount);
@@ -340,4 +370,12 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
         .info("Processed query quota change in {}ms, {} out of {} query quota 
configs rebuilt.", (endTime - startTime),
             numRebuilt, _rateLimiterMap.size());
   }
+
+  /**
+   * Construct table config path
+   * @param tableNameWithType table name with table type
+   */
+  private String constructTableConfigPath(String tableNameWithType) {
+    return "/CONFIGS/TABLE/" + tableNameWithType;
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaConfig.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
similarity index 55%
rename from 
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaConfig.java
rename to 
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
index 0ba54f3..40a2c96 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaConfig.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
@@ -19,17 +19,23 @@
 package org.apache.pinot.broker.queryquota;
 
 import com.google.common.util.concurrent.RateLimiter;
-import javax.annotation.Nonnull;
 
 
-public class QueryQuotaConfig {
+public class QueryQuotaEntity {
 
   private RateLimiter _rateLimiter;
   private HitCounter _hitCounter;
+  private int _numOnlineBrokers;
+  private double _overallRate;
+  private int _tableConfigStatVersion;
 
-  public QueryQuotaConfig(@Nonnull RateLimiter rateLimiter, @Nonnull 
HitCounter hitCounter) {
+  public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter hitCounter, int 
numOnlineBrokers, double overallRate,
+      int tableConfigStatVersion) {
     _rateLimiter = rateLimiter;
     _hitCounter = hitCounter;
+    _numOnlineBrokers = numOnlineBrokers;
+    _overallRate = overallRate;
+    _tableConfigStatVersion = tableConfigStatVersion;
   }
 
   public RateLimiter getRateLimiter() {
@@ -39,4 +45,28 @@ public class QueryQuotaConfig {
   public HitCounter getHitCounter() {
     return _hitCounter;
   }
+
+  public int getNumOnlineBrokers() {
+    return _numOnlineBrokers;
+  }
+
+  public void setNumOnlineBrokers(int numOnlineBrokers) {
+    _numOnlineBrokers = numOnlineBrokers;
+  }
+
+  public double getOverallRate() {
+    return _overallRate;
+  }
+
+  public void setOverallRate(double overallRate) {
+    _overallRate = overallRate;
+  }
+
+  public int getTableConfigStatVersion() {
+    return _tableConfigStatVersion;
+  }
+
+  public void setTableConfigStatVersion(int tableConfigStatVersion) {
+    _tableConfigStatVersion = tableConfigStatVersion;
+  }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index b3d74d3..4b6ffe3 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.config.QuotaConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.testng.Assert;
@@ -40,9 +41,6 @@ import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-
 
 public class HelixExternalViewBasedQueryQuotaManagerTest {
   private ZkHelixPropertyStore<ZNRecord> _testPropertyStore;
@@ -117,6 +115,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, 
OFFLINE_TABLE_NAME, tableConfig.toZNRecord());
     setQps(tableConfig);
     _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
@@ -142,9 +141,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     QuotaConfig quotaConfig = new QuotaConfig("6G", null);
     TableConfig realtimeTableConfig =
-        new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-            
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-            
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+            
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
     ZKMetadataProvider
         .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, 
realtimeTableConfig.toZNRecord());
 
@@ -163,9 +162,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
     TableConfig realtimeTableConfig =
-        new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-            
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-            
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+            
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
     ZKMetadataProvider
         .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, 
realtimeTableConfig.toZNRecord());
 
@@ -188,13 +187,13 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
 
     QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
     TableConfig realtimeTableConfig =
-        new 
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-            
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-            
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+            
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
     TableConfig offlineTableConfig =
-        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-            
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-            
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+            
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+            
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
 
     ZKMetadataProvider
         .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, 
realtimeTableConfig.toZNRecord());
@@ -224,6 +223,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
+    ZKMetadataProvider.setRealtimeTableConfig(_testPropertyStore, 
REALTIME_TABLE_NAME, tableConfig.toZNRecord());
     setQps(tableConfig);
     _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
@@ -235,6 +235,23 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   }
 
   @Test
+  public void testRealtimeTableNotnullQuotaWhileTableConfigGetsDeleted()
+      throws Exception {
+    ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
+    TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
+    ZKMetadataProvider.setRealtimeTableConfig(_testPropertyStore, 
REALTIME_TABLE_NAME, tableConfig.toZNRecord());
+    setQps(tableConfig);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
+    runQueries(70, 10L);
+
+    
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, 
REALTIME_TABLE_NAME);
+    _queryQuotaManager.processQueryQuotaChange(brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+  }
+
+  @Test
   public void testRealtimeTableWithNullQuotaAndNoOfflineTableConfig()
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
@@ -248,9 +265,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     QuotaConfig quotaConfig = new QuotaConfig("6G", null);
     TableConfig offlineTableConfig =
-        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-            
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-            
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+            
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+            
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
     ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, 
OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
@@ -264,9 +281,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
       throws Exception {
     QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
     TableConfig offlineTableConfig =
-        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-            
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-            
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+            
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+            
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
     ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, 
OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
@@ -311,8 +328,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   @Test
   public void testNoBrokerServiceOnBrokerResource()
       throws Exception {
-    ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
+    ExternalView brokerResource = new 
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, 
OFFLINE_TABLE_NAME, tableConfig.toZNRecord());
     setQps(tableConfig);
     _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
     Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
@@ -321,9 +339,10 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   @Test
   public void testNoOnlineBrokerServiceOnBrokerResource()
       throws Exception {
-    ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
+    ExternalView brokerResource = new 
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     brokerResource.setState(OFFLINE_TABLE_NAME, "broker_instance_2", 
"OFFLINE");
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+    ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, 
OFFLINE_TABLE_NAME, tableConfig.toZNRecord());
     setQps(tableConfig);
     _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
 
@@ -333,7 +352,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   }
 
   private TableConfig generateDefaultTableConfig(String tableName) {
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     TableConfig.Builder builder = new TableConfig.Builder(tableType);
     builder.setTableName(tableName);
     return builder.build();
@@ -345,7 +364,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   }
 
   private ExternalView generateBrokerResource(String tableName) {
-    ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
+    ExternalView brokerResource = new 
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE");
     brokerResource.setState(tableName, "broker_instance_2", "OFFLINE");
     return brokerResource;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to