klsince commented on code in PR #13544:
URL: https://github.com/apache/pinot/pull/13544#discussion_r1700630267
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ public synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (String databaseName : databaseNames) {
+ double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetDatabaseRateLimiter(databaseName);
+ continue;
+ }
+ int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName,
brokerResource);
+ double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+ QueryQuotaEntity oldRateLimiter =
_databaseRateLimiterMap.get(databaseName);
+ String message;
+ if (oldRateLimiter == null) {
+ message = String.format("New query rate limiter added for database %s
with rate %s.", databaseName,
+ perBrokerQpsQuota);
+ } else {
+ boolean changeDetected = false;
+ double oldRate = oldRateLimiter.getRateLimiter() != null ?
oldRateLimiter.getRateLimiter().getRate() : -1;
+ message = String.format("Updated existing query rate limiter for
database %s from rate %s to %s", databaseName,
Review Comment:
would suggest just to log this message in the end
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ public synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (String databaseName : databaseNames) {
+ double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetDatabaseRateLimiter(databaseName);
+ continue;
+ }
+ int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName,
brokerResource);
+ double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+ QueryQuotaEntity oldRateLimiter =
_databaseRateLimiterMap.get(databaseName);
+ String message;
+ if (oldRateLimiter == null) {
+ message = String.format("New query rate limiter added for database %s
with rate %s.", databaseName,
+ perBrokerQpsQuota);
+ } else {
+ boolean changeDetected = false;
+ double oldRate = oldRateLimiter.getRateLimiter() != null ?
oldRateLimiter.getRateLimiter().getRate() : -1;
+ message = String.format("Updated existing query rate limiter for
database %s from rate %s to %s", databaseName,
+ oldRate, perBrokerQpsQuota);
+ if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+ changeDetected = true;
+ message += ". Overall quota changed for the database from " +
oldRateLimiter.getOverallRate() + " to "
Review Comment:
I'd just log this reason, instead of composing a log msg.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ public synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (String databaseName : databaseNames) {
+ double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetDatabaseRateLimiter(databaseName);
+ continue;
+ }
+ int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName,
brokerResource);
+ double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+ QueryQuotaEntity oldRateLimiter =
_databaseRateLimiterMap.get(databaseName);
+ String message;
+ if (oldRateLimiter == null) {
+ message = String.format("New query rate limiter added for database %s
with rate %s.", databaseName,
+ perBrokerQpsQuota);
+ } else {
+ boolean changeDetected = false;
+ double oldRate = oldRateLimiter.getRateLimiter() != null ?
oldRateLimiter.getRateLimiter().getRate() : -1;
+ message = String.format("Updated existing query rate limiter for
database %s from rate %s to %s", databaseName,
+ oldRate, perBrokerQpsQuota);
+ if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+ changeDetected = true;
+ message += ". Overall quota changed for the database from " +
oldRateLimiter.getOverallRate() + " to "
+ + databaseQpsQuota;
+ }
+ if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) {
+ changeDetected = true;
+ message += ". Quota split factor changed for the database from " +
oldRateLimiter.getOverallRate() + " to "
+ + quotaSplitFactor;
+ }
+ if (!changeDetected) {
+ LOGGER.info("No change detected with the query rate limiter for
database {}", databaseName);
+ return;
Review Comment:
`continue` to next database? perhaps add a UT for processing multiple dbs if
not having one yet
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ public synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (String databaseName : databaseNames) {
+ double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetDatabaseRateLimiter(databaseName);
+ continue;
+ }
+ int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName,
brokerResource);
+ double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+ QueryQuotaEntity oldRateLimiter =
_databaseRateLimiterMap.get(databaseName);
+ String message;
+ if (oldRateLimiter == null) {
+ message = String.format("New query rate limiter added for database %s
with rate %s.", databaseName,
+ perBrokerQpsQuota);
Review Comment:
nit: log this out, and continue.
```
if () {
continue;
}
// no need for else {, avoiding those indents
boolean changeDetected = false;
...
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ public synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
Review Comment:
you would need to add synchronized to updateDatabaseRateLimiter and
createDatabaseRateLimiter methods too, otherwise they check the map membership
outside the lock
But looking at the updating logic for table rate limiter, which has does not
synchronize at all, so is synchronization really needed for updating db rate?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -327,7 +328,12 @@ private TableAuthorizationResult
hasTableAccess(RequesterIdentity requesterIdent
/**
* Returns true if the QPS quota of the tables has exceeded.
*/
- private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext
requestContext) {
+ private boolean hasExceededQPSQuota(Set<String> tableNames, String database,
RequestContext requestContext) {
Review Comment:
nit: `hasExceededQPSQuota(@Nullable String database, ... tableNames, ...)`
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ public synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (String databaseName : databaseNames) {
+ double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetDatabaseRateLimiter(databaseName);
+ continue;
+ }
+ int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName,
brokerResource);
+ double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+ QueryQuotaEntity oldRateLimiter =
_databaseRateLimiterMap.get(databaseName);
+ String message;
+ if (oldRateLimiter == null) {
+ message = String.format("New query rate limiter added for database %s
with rate %s.", databaseName,
+ perBrokerQpsQuota);
+ } else {
+ boolean changeDetected = false;
+ double oldRate = oldRateLimiter.getRateLimiter() != null ?
oldRateLimiter.getRateLimiter().getRate() : -1;
+ message = String.format("Updated existing query rate limiter for
database %s from rate %s to %s", databaseName,
+ oldRate, perBrokerQpsQuota);
+ if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+ changeDetected = true;
+ message += ". Overall quota changed for the database from " +
oldRateLimiter.getOverallRate() + " to "
+ + databaseQpsQuota;
+ }
+ if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) {
Review Comment:
would suggest to keep the names consistent, e.g.
1) quota (preferred) vs. rate
2) numOnlineBroker (preferred) vs. splitFactor
##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java:
##########
@@ -129,6 +136,32 @@ public void onError(Exception e, ErrorCode code, ErrorType
type) {
}
}
+ private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
Review Comment:
This helix msg only updates db quota on broker. But it is used for
addDatabaseConfig controller side restful API. So when addDatabaseConfig,
broker may not create the db quota. Is this expected behavior?
```
public void addDatabaseConfig(DatabaseConfig databaseConfig) {
if (!ZKMetadataProvider.createDatabaseConfig(_propertyStore,
databaseConfig)) {
throw new RuntimeException("Failed to create database config for
database: " + databaseConfig.getDatabaseName());
}
sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
}
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +253,114 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ public synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (String databaseName : databaseNames) {
+ double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetDatabaseRateLimiter(databaseName);
+ continue;
+ }
+ int quotaSplitFactor = getPerBrokerQpsQuotaSplit(databaseName,
brokerResource);
+ double perBrokerQpsQuota = databaseQpsQuota / quotaSplitFactor;
+ QueryQuotaEntity oldRateLimiter =
_databaseRateLimiterMap.get(databaseName);
+ String message;
+ if (oldRateLimiter == null) {
+ message = String.format("New query rate limiter added for database %s
with rate %s.", databaseName,
+ perBrokerQpsQuota);
+ } else {
+ boolean changeDetected = false;
+ double oldRate = oldRateLimiter.getRateLimiter() != null ?
oldRateLimiter.getRateLimiter().getRate() : -1;
+ message = String.format("Updated existing query rate limiter for
database %s from rate %s to %s", databaseName,
+ oldRate, perBrokerQpsQuota);
+ if (oldRateLimiter.getOverallRate() != databaseQpsQuota) {
+ changeDetected = true;
+ message += ". Overall quota changed for the database from " +
oldRateLimiter.getOverallRate() + " to "
+ + databaseQpsQuota;
+ }
+ if (oldRateLimiter.getNumOnlineBrokers() != quotaSplitFactor) {
+ changeDetected = true;
+ message += ". Quota split factor changed for the database from " +
oldRateLimiter.getOverallRate() + " to "
+ + quotaSplitFactor;
+ }
+ if (!changeDetected) {
+ LOGGER.info("No change detected with the query rate limiter for
database {}", databaseName);
+ return;
+ }
+ }
+ QueryQuotaEntity queryQuotaEntity = new
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+ new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+ quotaSplitFactor, databaseQpsQuota, -1);
+ _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
+ LOGGER.info(message);
+ }
+ }
+
+ // Pulling this logic to a separate placeholder method so that the quota
split logic
+ // can be enhanced further in isolation.
+ private int getPerBrokerQpsQuotaSplit(String databaseName, ExternalView
brokerResource) {
Review Comment:
I would just call it getNumOnlineBrokers(), can comment a TODO that this
method should return the online brokers used by the db, instead of all brokers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]