This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d7eeb44 Update quota manager to reduce zk access (#4979)
d7eeb44 is described below
commit d7eeb44430c75b2684680fa09b6244f989368940
Author: Jialiang Li <[email protected]>
AuthorDate: Tue Feb 11 11:53:51 2020 -0800
Update quota manager to reduce zk access (#4979)
* Update quota manager to reduce zk access
---
.../HelixExternalViewBasedQueryQuotaManager.java | 146 +++++++++++++--------
...QueryQuotaConfig.java => QueryQuotaEntity.java} | 36 ++++-
...elixExternalViewBasedQueryQuotaManagerTest.java | 69 ++++++----
3 files changed, 169 insertions(+), 82 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 4bc4ee6..0261914 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
@@ -39,34 +41,43 @@ import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-
+/**
+ * This class is to support the qps quota feature.
+ * It depends on the broker source change to update the dynamic rate limit,
+ * which means it only gets updated when a new table added or a broker
restarted.
+ * TODO: support adding new rate limiter for existing tables without
restarting the broker.
+ */
public class HelixExternalViewBasedQueryQuotaManager implements
ClusterChangeHandler, QueryQuotaManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
private static final int TIME_RANGE_IN_SECOND = 1;
private final AtomicInteger _lastKnownBrokerResourceVersion = new
AtomicInteger(-1);
- private final Map<String, QueryQuotaConfig> _rateLimiterMap = new
ConcurrentHashMap<>();
+ private final Map<String, QueryQuotaEntity> _rateLimiterMap = new
ConcurrentHashMap<>();
private HelixManager _helixManager;
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private BrokerMetrics _brokerMetrics;
@Override
public void init(HelixManager helixManager) {
Preconditions.checkState(_helixManager == null,
"HelixExternalViewBasedQueryQuotaManager is already initialized");
_helixManager = helixManager;
+ _propertyStore = _helixManager.getHelixPropertyStore();
}
@Override
public void processClusterChange(HelixConstants.ChangeType changeType) {
Preconditions
.checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW,
"Illegal change type: " + changeType);
- processQueryQuotaChange();
+ ExternalView brokerResourceEV = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ processQueryQuotaChange(brokerResourceEV);
}
/**
@@ -85,10 +96,10 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
CommonConstants.Helix.TableType tableType = tableConfig.getTableType();
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
offlineQuotaConfig = tableConfig.getQuotaConfig();
- realtimeQuotaConfig = getQuotaConfigFromPropertyStore(rawTableName,
CommonConstants.Helix.TableType.REALTIME);
+ realtimeQuotaConfig =
getQuotaConfigFromPropertyStore(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
} else {
realtimeQuotaConfig = tableConfig.getQuotaConfig();
- offlineQuotaConfig = getQuotaConfigFromPropertyStore(rawTableName,
CommonConstants.Helix.TableType.OFFLINE);
+ offlineQuotaConfig =
getQuotaConfigFromPropertyStore(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
}
// Log a warning if MaxQueriesPerSecond are set different.
if ((offlineQuotaConfig != null &&
!Strings.isNullOrEmpty(offlineQuotaConfig.getMaxQueriesPerSecond())) && (
@@ -122,15 +133,11 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
/**
* Get QuotaConfig from property store.
- * @param rawTableName table name without table type.
- * @param tableType table type: offline or real-time.
+ * @param tableNameWithType table name with table type.
* @return QuotaConfig, which could be null.
*/
- private QuotaConfig getQuotaConfigFromPropertyStore(String rawTableName,
CommonConstants.Helix.TableType tableType) {
- ZkHelixPropertyStore<ZNRecord> propertyStore =
_helixManager.getHelixPropertyStore();
-
- String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
- TableConfig tableConfig = ZKMetadataProvider.getTableConfig(propertyStore,
tableNameWithType);
+ private QuotaConfig getQuotaConfigFromPropertyStore(String
tableNameWithType) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
if (tableConfig == null) {
return null;
}
@@ -180,13 +187,18 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return;
}
+ // Get stat from property store
+ String tableConfigPath = constructTableConfigPath(tableNameWithType);
+ Stat stat = _propertyStore.getStat(tableConfigPath,
AccessOption.PERSISTENT);
+
double perBrokerRate = overallRate / onlineCount;
- QueryQuotaConfig queryQuotaConfig =
- new QueryQuotaConfig(RateLimiter.create(perBrokerRate), new
HitCounter(TIME_RANGE_IN_SECOND));
- _rateLimiterMap.put(tableNameWithType, queryQuotaConfig);
+ QueryQuotaEntity queryQuotaEntity =
+ new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new
HitCounter(TIME_RANGE_IN_SECOND), onlineCount,
+ overallRate, stat.getVersion());
+ _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
LOGGER.info(
- "Rate limiter for table: {} has been initialized. Overall rate: {}.
Per-broker rate: {}. Number of online broker instances: {}",
- tableNameWithType, overallRate, perBrokerRate, onlineCount);
+ "Rate limiter for table: {} has been initialized. Overall rate: {}.
Per-broker rate: {}. Number of online broker instances: {}. Table config stat
version: {}",
+ tableNameWithType, overallRate, perBrokerRate, onlineCount,
stat.getVersion());
}
/**
@@ -200,27 +212,27 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
LOGGER.debug("Trying to acquire token for table: {}", tableName);
String offlineTableName = null;
String realtimeTableName = null;
- QueryQuotaConfig offlineTableQueryQuotaConfig = null;
- QueryQuotaConfig realtimeTableQueryQuotaConfig = null;
+ QueryQuotaEntity offlineTableQueryQuotaEntity = null;
+ QueryQuotaEntity realtimeTableQueryQuotaEntity = null;
CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
offlineTableName = tableName;
- offlineTableQueryQuotaConfig = _rateLimiterMap.get(tableName);
+ offlineTableQueryQuotaEntity = _rateLimiterMap.get(tableName);
} else if (tableType == CommonConstants.Helix.TableType.REALTIME) {
realtimeTableName = tableName;
- realtimeTableQueryQuotaConfig = _rateLimiterMap.get(tableName);
+ realtimeTableQueryQuotaEntity = _rateLimiterMap.get(tableName);
} else {
offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
- offlineTableQueryQuotaConfig = _rateLimiterMap.get(offlineTableName);
- realtimeTableQueryQuotaConfig = _rateLimiterMap.get(realtimeTableName);
+ offlineTableQueryQuotaEntity = _rateLimiterMap.get(offlineTableName);
+ realtimeTableQueryQuotaEntity = _rateLimiterMap.get(realtimeTableName);
}
boolean offlineQuotaOk =
- offlineTableQueryQuotaConfig == null ||
tryAcquireToken(offlineTableName, offlineTableQueryQuotaConfig);
+ offlineTableQueryQuotaEntity == null ||
tryAcquireToken(offlineTableName, offlineTableQueryQuotaEntity);
boolean realtimeQuotaOk =
- realtimeTableQueryQuotaConfig == null ||
tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaConfig);
+ realtimeTableQueryQuotaEntity == null ||
tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity);
return offlineQuotaOk && realtimeQuotaOk;
}
@@ -228,18 +240,18 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
/**
* Try to acquire token from rate limiter. Emit the utilization of the qps
quota if broker metric isn't null.
* @param tableNameWithType table name with type.
- * @param queryQuotaConfig query quota config for type-specific table.
+ * @param queryQuotaEntity query quota entity for type-specific table.
* @return true if there's no qps quota for that table, or a token is
acquired successfully.
*/
- private boolean tryAcquireToken(String tableNameWithType, QueryQuotaConfig
queryQuotaConfig) {
+ private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity
queryQuotaEntity) {
// Use hit counter to count the number of hits.
- queryQuotaConfig.getHitCounter().hit();
+ queryQuotaEntity.getHitCounter().hit();
- RateLimiter rateLimiter = queryQuotaConfig.getRateLimiter();
+ RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
double perBrokerRate = rateLimiter.getRate();
// Emit the qps capacity utilization rate.
- int numHits = queryQuotaConfig.getHitCounter().getHitCount();
+ int numHits = queryQuotaEntity.getHitCounter().getHitCount();
if (_brokerMetrics != null) {
int percentageOfCapacityUtilization = (int) (numHits * 100 /
perBrokerRate);
LOGGER.debug("The percentage of rate limit capacity utilization is {}",
percentageOfCapacityUtilization);
@@ -273,13 +285,10 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
/**
* Process query quota change when number of online brokers has changed.
*/
- public void processQueryQuotaChange() {
+ public void processQueryQuotaChange(ExternalView currentBrokerResource) {
LOGGER.info("Start processing qps quota change.");
long startTime = System.currentTimeMillis();
- ExternalView currentBrokerResource = HelixHelper
- .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
- BROKER_RESOURCE_INSTANCE);
if (currentBrokerResource == null) {
LOGGER.warn("Finish processing qps quota change: external view for
broker resource is null!");
return;
@@ -291,27 +300,16 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
int numRebuilt = 0;
- for (Map.Entry<String, QueryQuotaConfig> entry :
_rateLimiterMap.entrySet()) {
+ for (Iterator<Map.Entry<String, QueryQuotaEntity>> it =
_rateLimiterMap.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, QueryQuotaEntity> entry = it.next();
String tableNameWithType = entry.getKey();
- QueryQuotaConfig queryQuotaConfig = entry.getValue();
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-
- // Get latest quota config for table.
- QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(rawTableName,
tableType);
- if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null
|| !quotaConfig
- .isMaxQueriesPerSecondValid()) {
- LOGGER.info("No query quota config or the config is invalid for Table
{}. Removing its rate limit.",
- tableNameWithType);
- removeRateLimiter(tableNameWithType);
- continue;
- }
+ QueryQuotaEntity queryQuotaEntity = entry.getValue();
// Get number of online brokers.
Map<String, String> stateMap =
currentBrokerResource.getStateMap(tableNameWithType);
if (stateMap == null) {
LOGGER.info("No broker resource for Table {}. Removing its rate
limit.", tableNameWithType);
- removeRateLimiter(tableNameWithType);
+ it.remove();
continue;
}
int otherOnlineBrokerCount = 0;
@@ -323,11 +321,43 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
int onlineBrokerCount = otherOnlineBrokerCount + 1;
- double overallRate =
Double.parseDouble(quotaConfig.getMaxQueriesPerSecond());
+ // Get stat from property store
+ String tableConfigPath = constructTableConfigPath(tableNameWithType);
+ Stat stat = _propertyStore.getStat(tableConfigPath,
AccessOption.PERSISTENT);
+ if (stat == null) {
+ LOGGER.info("Table {} has been deleted from property store. Removing
its rate limit.", tableNameWithType);
+ it.remove();
+ continue;
+ }
+
+ // If number of online brokers and table config don't change, there is
no need to re-calculate the dynamic rate.
+ if (onlineBrokerCount == queryQuotaEntity.getNumOnlineBrokers() &&
stat.getVersion() == queryQuotaEntity
+ .getTableConfigStatVersion()) {
+ continue;
+ }
+
+ double overallRate;
+ // Get latest quota config only if stat don't match.
+ if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) {
+ QuotaConfig quotaConfig =
getQuotaConfigFromPropertyStore(tableNameWithType);
+ if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() ==
null || !quotaConfig
+ .isMaxQueriesPerSecondValid()) {
+ LOGGER.info("No query quota config or the config is invalid for
Table {}. Removing its rate limit.",
+ tableNameWithType);
+ it.remove();
+ continue;
+ }
+ overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond());
+ } else {
+ overallRate = queryQuotaEntity.getOverallRate();
+ }
double latestRate = overallRate / onlineBrokerCount;
- double previousRate = queryQuotaConfig.getRateLimiter().getRate();
+ double previousRate = queryQuotaEntity.getRateLimiter().getRate();
if (Math.abs(latestRate - previousRate) > 0.001) {
- queryQuotaConfig.getRateLimiter().setRate(latestRate);
+ queryQuotaEntity.getRateLimiter().setRate(latestRate);
+ queryQuotaEntity.setNumOnlineBrokers(onlineBrokerCount);
+ queryQuotaEntity.setOverallRate(overallRate);
+ queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
LOGGER.info(
"Rate limiter for table: {} has been updated. Overall rate: {}.
Previous per-broker rate: {}. New per-broker rate: {}. Number of online broker
instances: {}",
tableNameWithType, overallRate, previousRate, latestRate,
onlineBrokerCount);
@@ -340,4 +370,12 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
.info("Processed query quota change in {}ms, {} out of {} query quota
configs rebuilt.", (endTime - startTime),
numRebuilt, _rateLimiterMap.size());
}
+
+ /**
+ * Construct table config path
+ * @param tableNameWithType table name with table type
+ */
+ private String constructTableConfigPath(String tableNameWithType) {
+ return "/CONFIGS/TABLE/" + tableNameWithType;
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaConfig.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
similarity index 55%
rename from
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaConfig.java
rename to
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
index 0ba54f3..40a2c96 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaConfig.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
@@ -19,17 +19,23 @@
package org.apache.pinot.broker.queryquota;
import com.google.common.util.concurrent.RateLimiter;
-import javax.annotation.Nonnull;
-public class QueryQuotaConfig {
+public class QueryQuotaEntity {
private RateLimiter _rateLimiter;
private HitCounter _hitCounter;
+ private int _numOnlineBrokers;
+ private double _overallRate;
+ private int _tableConfigStatVersion;
- public QueryQuotaConfig(@Nonnull RateLimiter rateLimiter, @Nonnull
HitCounter hitCounter) {
+ public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter hitCounter, int
numOnlineBrokers, double overallRate,
+ int tableConfigStatVersion) {
_rateLimiter = rateLimiter;
_hitCounter = hitCounter;
+ _numOnlineBrokers = numOnlineBrokers;
+ _overallRate = overallRate;
+ _tableConfigStatVersion = tableConfigStatVersion;
}
public RateLimiter getRateLimiter() {
@@ -39,4 +45,28 @@ public class QueryQuotaConfig {
public HitCounter getHitCounter() {
return _hitCounter;
}
+
+ public int getNumOnlineBrokers() {
+ return _numOnlineBrokers;
+ }
+
+ public void setNumOnlineBrokers(int numOnlineBrokers) {
+ _numOnlineBrokers = numOnlineBrokers;
+ }
+
+ public double getOverallRate() {
+ return _overallRate;
+ }
+
+ public void setOverallRate(double overallRate) {
+ _overallRate = overallRate;
+ }
+
+ public int getTableConfigStatVersion() {
+ return _tableConfigStatVersion;
+ }
+
+ public void setTableConfigStatVersion(int tableConfigStatVersion) {
+ _tableConfigStatVersion = tableConfigStatVersion;
+ }
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index b3d74d3..4b6ffe3 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.config.QuotaConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.ZkStarter;
import org.testng.Assert;
@@ -40,9 +41,6 @@ import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-
public class HelixExternalViewBasedQueryQuotaManagerTest {
private ZkHelixPropertyStore<ZNRecord> _testPropertyStore;
@@ -117,6 +115,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
throws Exception {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore,
OFFLINE_TABLE_NAME, tableConfig.toZNRecord());
setQps(tableConfig);
_queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
@@ -142,9 +141,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
throws Exception {
QuotaConfig quotaConfig = new QuotaConfig("6G", null);
TableConfig realtimeTableConfig =
- new
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
ZKMetadataProvider
.setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME,
realtimeTableConfig.toZNRecord());
@@ -163,9 +162,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
throws Exception {
QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
TableConfig realtimeTableConfig =
- new
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
ZKMetadataProvider
.setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME,
realtimeTableConfig.toZNRecord());
@@ -188,13 +187,13 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
TableConfig realtimeTableConfig =
- new
TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
TableConfig offlineTableConfig =
- new
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
ZKMetadataProvider
.setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME,
realtimeTableConfig.toZNRecord());
@@ -224,6 +223,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
throws Exception {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
+ ZKMetadataProvider.setRealtimeTableConfig(_testPropertyStore,
REALTIME_TABLE_NAME, tableConfig.toZNRecord());
setQps(tableConfig);
_queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
@@ -235,6 +235,23 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
}
@Test
+ public void testRealtimeTableNotnullQuotaWhileTableConfigGetsDeleted()
+ throws Exception {
+ ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
+ ZKMetadataProvider.setRealtimeTableConfig(_testPropertyStore,
REALTIME_TABLE_NAME, tableConfig.toZNRecord());
+ setQps(tableConfig);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
+ runQueries(70, 10L);
+
+
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore,
REALTIME_TABLE_NAME);
+ _queryQuotaManager.processQueryQuotaChange(brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
public void testRealtimeTableWithNullQuotaAndNoOfflineTableConfig()
throws Exception {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
@@ -248,9 +265,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
throws Exception {
QuotaConfig quotaConfig = new QuotaConfig("6G", null);
TableConfig offlineTableConfig =
- new
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore,
OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
@@ -264,9 +281,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
throws Exception {
QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
TableConfig offlineTableConfig =
- new
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
-
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setQuotaConfig(quotaConfig).setRetentionTimeUnit("DAYS").setRetentionTimeValue("1")
+
.setSegmentPushType("APPEND").setBrokerTenant("testBroker").setServerTenant("testServer").build();
ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore,
OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
@@ -311,8 +328,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
@Test
public void testNoBrokerServiceOnBrokerResource()
throws Exception {
- ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
+ ExternalView brokerResource = new
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore,
OFFLINE_TABLE_NAME, tableConfig.toZNRecord());
setQps(tableConfig);
_queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
@@ -321,9 +339,10 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
@Test
public void testNoOnlineBrokerServiceOnBrokerResource()
throws Exception {
- ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
+ ExternalView brokerResource = new
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
brokerResource.setState(OFFLINE_TABLE_NAME, "broker_instance_2",
"OFFLINE");
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore,
OFFLINE_TABLE_NAME, tableConfig.toZNRecord());
setQps(tableConfig);
_queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
@@ -333,7 +352,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
}
private TableConfig generateDefaultTableConfig(String tableName) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
TableConfig.Builder builder = new TableConfig.Builder(tableType);
builder.setTableName(tableName);
return builder.build();
@@ -345,7 +364,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
}
private ExternalView generateBrokerResource(String tableName) {
- ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
+ ExternalView brokerResource = new
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE");
brokerResource.setState(tableName, "broker_instance_2", "OFFLINE");
return brokerResource;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]