This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ef75b02 Update rate limiter when table config updated on controller
(Part 2) (#5067)
ef75b02 is described below
commit ef75b027d7af09f625eabb9f192bea7ca109e8eb
Author: Jialiang Li <[email protected]>
AuthorDate: Wed Mar 4 09:38:03 2020 -0800
Update rate limiter when table config updated on controller (Part 2) (#5067)
* [Part 2] Update rate limiter when table config updated
---
.../HelixExternalViewBasedQueryQuotaManager.java | 10 +++++--
.../common/messages/QueryQuotaUpdateMessage.java | 2 +-
.../helix/core/PinotHelixResourceManager.java | 33 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 3 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index afca520..eba8833 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -123,8 +123,14 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
}
- // Create rate limiter
- createRateLimiter(tableNameWithType, brokerResource,
tableConfig.getQuotaConfig());
+ // Create rate limiter if query quota config is specified.
+ QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
+ if (quotaConfig == null ||
Strings.isNullOrEmpty(quotaConfig.getMaxQueriesPerSecond())) {
+ LOGGER.info("No qps config specified for table: {}", tableNameWithType);
+ removeRateLimiter(tableNameWithType);
+ } else {
+ createRateLimiter(tableNameWithType, brokerResource, quotaConfig);
+ }
}
/**
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
index 0dda796..794654b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
@@ -43,4 +43,4 @@ public class QueryQuotaUpdateMessage extends Message {
Preconditions.checkArgument(msgSubType.equals(UPDATE_QUERY_QUOTA_MSG_SUB_TYPE),
"Invalid message sub type: " + msgSubType + " for
QueryQuotaUpdateMessage");
}
-}
\ No newline at end of file
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0f8ec1c..6434714 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -74,6 +74,7 @@ import
org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.messages.QueryQuotaUpdateMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -1342,6 +1343,9 @@ public class PinotHelixResourceManager {
default:
throw new InvalidTableConfigException("Unsupported table type: " +
tableType);
}
+
+ // Send update query quota message if quota is specified
+ sendUpdateQueryQuotaMessage(tableConfig);
}
public void updateMetadataConfigFor(String tableName, TableType type,
TableCustomConfig newConfigs)
@@ -1720,6 +1724,35 @@ public class PinotHelixResourceManager {
}
}
+ private void sendUpdateQueryQuotaMessage(TableConfig tableConfig) {
+ String tableNameWithType = tableConfig.getTableName();
+ QueryQuotaUpdateMessage refreshMessage = new
QueryQuotaUpdateMessage(tableNameWithType);
+
+ Criteria recipientCriteria = new Criteria();
+ // Currently Helix does not support send message to a Spectator. So we
walk around the problem by sending the
+ // message to participants. Note that brokers are also participants.
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+ recipientCriteria.setSessionSpecific(true);
+ // The brokerResource field in the EXTERNALVIEW stores the table name in
the Partition subfield.
+ recipientCriteria.setPartition(tableNameWithType);
+
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ LOGGER.info("Sending query quota update message for table {}:{} to
recipients {}",
+ tableNameWithType, refreshMessage, recipientCriteria);
+ // Helix sets the timeoutMs argument specified in 'send' call as the
processing timeout of the message.
+ int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage,
null, -1);
+ if (nMsgsSent > 0) {
+ // TODO Would be nice if we can get the name of the instances to which
messages were sent.
+ LOGGER.info("Sent {} query quota update msgs for table {}", nMsgsSent,
tableNameWithType);
+ } else {
+ // May be the case when none of the brokers are up yet. That is OK,
because when they come up they will get
+ // the latest query quota info from table config.
+ LOGGER.warn("Unable to send query quota update message for table {},
nMsgs={}", tableNameWithType, nMsgsSent);
+ }
+ }
+
private boolean updateExistedSegment(String tableNameWithType, String
segmentName) {
HelixDataAccessor helixDataAccessor =
_helixZkManager.getHelixDataAccessor();
PropertyKey idealStatePropertyKey =
_keyBuilder.idealStates(tableNameWithType);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]