This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 98dd54d52a Fix application qps quota stalls. (#14859)
98dd54d52a is described below
commit 98dd54d52a99144d404f035f6ea77317162999ab
Author: Bolek Ziobrowski <[email protected]>
AuthorDate: Thu Feb 20 19:49:18 2025 +0100
Fix application qps quota stalls. (#14859)
---
.../HelixExternalViewBasedQueryQuotaManager.java | 104 ++++++++++++++-------
...elixExternalViewBasedQueryQuotaManagerTest.java | 2 +-
.../PinotApplicationQuotaRestletResource.java | 14 +--
.../tests/QueryQuotaClusterIntegrationTest.java | 68 ++++++++++----
4 files changed, 129 insertions(+), 59 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 48c5c33d0a..925fbc4860 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -74,6 +74,13 @@ import org.slf4j.LoggerFactory;
* - broker added or removed from cluster
*/
public class HelixExternalViewBasedQueryQuotaManager implements
ClusterChangeHandler, QueryQuotaManager {
+
+ // Maximum 'disabled' value for app quota. If actual value is equal or less
than this, it is considered as
+ // disabled, otherwise it's enabled. This is a side effect of rate limiter
accepting only positive values.
+ private static final double MAX_DISABLED_APP_QUOTA = 0.0d;
+ // standard value meaning - no app quota limit set
+ private static final double DISABLED_APP_QUOTA = -1;
+
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
@@ -130,9 +137,9 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
String appName = entry.getKey();
double appQpsQuota =
- entry.getValue() != null && entry.getValue() != -1.0d ?
entry.getValue() : _defaultQpsQuotaForApplication;
+ entry.getValue() != null ? entry.getValue() :
_defaultQpsQuotaForApplication;
- if (appQpsQuota < 0) {
+ if (isDisabled(appQpsQuota)) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}
@@ -144,8 +151,14 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
numOnlineBrokers, appQpsQuota, -1);
_applicationRateLimiterMap.put(appName, queryQuotaEntity);
}
+ }
+
+ private static boolean isEnabled(double appQpsQuota) {
+ return appQpsQuota > MAX_DISABLED_APP_QUOTA;
+ }
- return;
+ private static boolean isDisabled(double appQpsQuota) {
+ return appQpsQuota <= MAX_DISABLED_APP_QUOTA;
}
@Override
@@ -348,19 +361,45 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
public synchronized void createOrUpdateApplicationRateLimiter(String
applicationName) {
-
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName),
DISABLED_APP_QUOTA);
+ }
+
+ public synchronized void createOrUpdateApplicationRateLimiter(String
applicationName, double newQps) {
+
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName),
newQps);
}
- // Caller method need not worry about getting lock on
_applicationRateLimiterMap
- // as this method will do idempotent updates to the application rate limiters
private synchronized void createOrUpdateApplicationRateLimiter(List<String>
applicationNames) {
+ createOrUpdateApplicationRateLimiter(applicationNames, DISABLED_APP_QUOTA);
+ }
+
+ /**
+ * Caller method need not worry about getting lock on
_applicationRateLimiterMap
+ * as this method will do idempotent updates to the application rate
limiters
+ * @param applicationNames application names for which to update the rate
limiter
+ * @param newQps - if > 0, fixed value to use for rate limiter(s), otherwise
value is fetched from ZK.
+ */
+ private synchronized void createOrUpdateApplicationRateLimiter(List<String>
applicationNames, double newQps) {
ExternalView brokerResource = getBrokerResource();
+ Map<String, Double> quotas = null;
+ if (applicationNames.size() > 0 && !isEnabled(newQps)) {
+ quotas =
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
+ }
+
for (String appName : applicationNames) {
- double qpsQuota = getEffectiveQueryQuotaOnApplication(appName);
- if (qpsQuota < 0) {
+ double qpsQuota;
+ if (isEnabled(newQps)) {
+ qpsQuota = newQps;
+ } else if (quotas != null && quotas.get(appName) != null) {
+ qpsQuota = quotas.get(appName);
+ } else {
+ qpsQuota = _defaultQpsQuotaForApplication;
+ }
+
+ if (isDisabled(qpsQuota)) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}
+
int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
double perBrokerQpsQuota = qpsQuota / numOnlineBrokers;
QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName);
@@ -436,22 +475,6 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return _defaultQpsQuotaForDatabase;
}
- /**
- * Utility to get the effective query quota being imposed on an application.
It is computed based on the default quota
- * set at cluster config.
- *
- * @param applicationName application name to get the query quota on.
- * @return effective query quota limit being applied
- */
- private double getEffectiveQueryQuotaOnApplication(String applicationName) {
- Map<String, Double> quotas =
-
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
- if (quotas != null && quotas.get(applicationName) != null &&
quotas.get(applicationName) != -1.0d) {
- return quotas.get(applicationName);
- }
- return _defaultQpsQuotaForApplication;
- }
-
/**
* Creates a new database rate limiter. Will not update the database rate
limiter if it already exists.
* @param databaseName database name for which rate limiter needs to be
created
@@ -472,7 +495,7 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
if (_applicationRateLimiterMap.containsKey(applicationName)) {
return;
}
-
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+ createOrUpdateApplicationRateLimiter(applicationName);
}
/**
@@ -579,10 +602,12 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
QueryQuotaEntity queryQuota =
_applicationRateLimiterMap.get(applicationName);
if (queryQuota == null) {
- if (getDefaultQueryQuotaForApplication() < 0) {
+ // do not create a new rate limiter because that could lead to OOM if
client floods us with many unique app names
+ if (isDisabled(_defaultQpsQuotaForApplication)) {
return true;
} else {
- createOrUpdateApplicationRateLimiter(applicationName);
+ // create limiter without querying ZK
+ createOrUpdateApplicationRateLimiter(applicationName,
_defaultQpsQuotaForApplication);
queryQuota = _applicationRateLimiterMap.get(applicationName);
}
}
@@ -809,9 +834,12 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
quota.setNumOnlineBrokers(onlineBrokerCount);
}
- if (quota.getOverallRate() > 0) {
+ if (isEnabled(quota.getOverallRate())) {
double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
- quota.setRateLimiter(RateLimiter.create(qpsQuota));
+ // dividing small qps value by broker's count can result in 0 and blow
up in rate limiter
+ if (isEnabled(qpsQuota)) {
+ quota.setRateLimiter(RateLimiter.create(qpsQuota));
+ }
}
}
@@ -820,9 +848,8 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
_lastKnownBrokerResourceVersion.set(currentVersionNumber);
long endTime = System.currentTimeMillis();
- LOGGER
- .info("Processed query quota change in {}ms, {} out of {} query quota
configs rebuilt.", (endTime - startTime),
- numRebuilt, _rateLimiterMap.size());
+ LOGGER.info("Processed query quota change in {}ms, {} out of {} query
quota configs rebuilt.",
+ (endTime - startTime), numRebuilt, _rateLimiterMap.size());
}
/**
@@ -857,11 +884,16 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
private double getDefaultQueryQuotaForApplication() {
HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
- HelixConfigScope configScope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
- _helixManager.getClusterName()).build();
- return Double.parseDouble(helixAdmin.getConfig(configScope,
+ HelixConfigScope configScope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(_helixManager.getClusterName()).build();
+ String value = helixAdmin.getConfig(configScope,
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
-
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1"));
+ .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
+ if (value != null) {
+ return Double.parseDouble(value);
+ } else {
+ return DISABLED_APP_QUOTA;
+ }
}
/**
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index 131faee022..e2efb05f45 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -338,7 +338,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
}
@Test
- public void tesCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
+ public void testCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
Map<String, Double> apps = new HashMap<>();
apps.put("app1", null);
apps.put("app2", 1d);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
index db050168fa..d539e6c3ab 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
@@ -71,7 +71,7 @@ public class PinotApplicationQuotaRestletResource {
PinotHelixResourceManager _pinotHelixResourceManager;
/**
- * API to get application quota configs. Will return null if application
quotas are not defined
+ * API to get application quota configs. Will return empty map if
application quotas are not defined at all.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@@ -88,7 +88,7 @@ public class PinotApplicationQuotaRestletResource {
}
/**
- * API to get application quota configs. Will return null if application
quotas are not defined
+ * API to get application quota config. Will return null if application
quotas is not defined.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@@ -102,17 +102,19 @@ public class PinotApplicationQuotaRestletResource {
return quotas.get(appName);
}
- HelixConfigScope scope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
- _pinotHelixResourceManager.getHelixClusterName()).build();
+ HelixConfigScope scope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(_pinotHelixResourceManager.getHelixClusterName())
+ .build();
+
HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
String defaultQuota =
helixAdmin.getConfig(scope,
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
-
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null);
+ .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
return defaultQuota != null ? Double.parseDouble(defaultQuota) : null;
}
/**
- * API to update the quota configs for application
+ * API to update the quota config for application.
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index a40cbdf290..d87731961f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -142,10 +142,35 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
public void testDefaultApplicationQueryQuotaOverride()
throws Exception {
addAppQueryQuotaToClusterConfig(25);
+
// override lower than default quota
setQueryQuotaForApplication(10);
testQueryRate(10);
+
// override higher than default quota
+ setQueryQuotaForApplication(27);
+ testQueryRate(27);
+
+ // disable
+ setQueryQuotaForApplication(-1);
+ verifyQuotaUpdate(Long.MAX_VALUE);
+ runQueries(50, false);
+
+ // verify that default still applies to other application names
+ runQueries(20, false, "other");
+ //increase the qps and some of the queries should be throttled.
+ runQueries(50, true, "other");
+ }
+
+ @Test
+ public void testDisabledDefaultApplicationQueryQuotaOverride()
+ throws Exception {
+ addAppQueryQuotaToClusterConfig(-1);
+
+ verifyQuotaUpdate(Long.MAX_VALUE);
+ runQueries(10, false);
+
+ // override default quota
setQueryQuotaForApplication(40);
testQueryRate(40);
}
@@ -264,48 +289,58 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
}
+ private void runQueries(int qps, boolean shouldFail) {
+ runQueries(qps, shouldFail, "default");
+ }
+
// try to keep the qps below 50 to ensure that the time lost between 2 query
runs on top of the sleepMillis
// is not comparable to sleepMillis, else the actual qps would end up being
much lower than required qps
- private void runQueries(int qps, boolean shouldFail) {
+ private void runQueries(int qps, boolean shouldFail, String applicationName)
{
int failCount = 0;
+ boolean isLastFail = false;
long deadline = System.currentTimeMillis() + 1000;
+ String query = "SET applicationName='" + applicationName + "'; SELECT
COUNT(*) FROM " + getTableName();
+
for (int i = 0; i < qps; i++) {
sleep(deadline, qps - i);
- ResultSetGroup resultSetGroup =
- _pinotConnection.execute("SET applicationName='default'; SELECT
COUNT(*) FROM " + getTableName());
+ ResultSetGroup resultSetGroup = _pinotConnection.execute(query);
for (PinotClientException exception : resultSetGroup.getExceptions()) {
if (exception.getMessage().contains("QuotaExceededError")) {
failCount++;
+ isLastFail = i == qps - 1;
break;
}
}
}
if (shouldFail) {
- assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps);
+ Assert.assertNotEquals(failCount, 0, "Expected nonzero failures for qps:
" + qps + " isLastFail: " + isLastFail);
} else {
- Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " +
qps);
+ Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " +
qps + " isLastFail: " + isLastFail);
}
}
- private static volatile float _quota;
+ private static volatile double _quota;
private static volatile String _quotaSource;
- private void verifyQuotaUpdate(float quotaQps) {
+ private void verifyQuotaUpdate(double quotaQps) {
try {
TestUtils.waitForCondition(aVoid -> {
try {
- float tableQuota = Float.parseFloat(sendGetRequest(
- String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE",
_brokerHostPort, getTableName())));
+ double tableQuota = Double.parseDouble(sendGetRequest(
+ "http://" + _brokerHostPort + "/debug/tables/queryQuota/" +
getTableName() + "_OFFLINE"));
+ double dbQuota = Double.parseDouble(
+ sendGetRequest("http://" + _brokerHostPort +
"/debug/databases/queryQuota/default"));
+ double appQuota = Double.parseDouble(
+ sendGetRequest("http://" + _brokerHostPort +
"/debug/applicationQuotas/default"));
+
tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
- float dbQuota = Float.parseFloat(
-
sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default",
_brokerHostPort)));
- float appQuota = Float.parseFloat(
-
sendGetRequest(String.format("http://%s/debug/applicationQuotas/default",
_brokerHostPort)));
dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
appQuota = appQuota == 0 ? Long.MAX_VALUE : appQuota;
- float actualQuota = Math.min(Math.min(tableQuota, dbQuota),
appQuota);
+
+ double actualQuota = Math.min(Math.min(tableQuota, dbQuota),
appQuota);
+
_quota = actualQuota;
if (_quota == dbQuota) {
_quotaSource = "database";
@@ -314,12 +349,13 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
} else {
_quotaSource = "application";
}
- return quotaQps == actualQuota || (quotaQps == 0 && tableQuota ==
Long.MAX_VALUE && dbQuota == Long.MAX_VALUE
+ return Math.abs(quotaQps - actualQuota) < 0.01
+ || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota ==
Long.MAX_VALUE
&& appQuota == Long.MAX_VALUE);
} catch (IOException e) {
throw new RuntimeException(e);
}
- }, 5000, "Failed to reflect query quota on rate limiter in 5s.");
+ }, 10000, "Failed to reflect query quota on rate limiter in 5s.");
} catch (AssertionError ae) {
throw new AssertionError(
ae.getMessage() + " Expected quota:" + quotaQps + " but is: " +
_quota + " set on: " + _quotaSource, ae);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]