This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ef75b02  Update rate limiter when table config updated on controller 
(Part 2) (#5067)
ef75b02 is described below

commit ef75b027d7af09f625eabb9f192bea7ca109e8eb
Author: Jialiang Li <[email protected]>
AuthorDate: Wed Mar 4 09:38:03 2020 -0800

    Update rate limiter when table config updated on controller (Part 2) (#5067)
    
    * [Part 2] Update rate limiter when table config updated
---
 .../HelixExternalViewBasedQueryQuotaManager.java   | 10 +++++--
 .../common/messages/QueryQuotaUpdateMessage.java   |  2 +-
 .../helix/core/PinotHelixResourceManager.java      | 33 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index afca520..eba8833 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -123,8 +123,14 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       }
     }
 
-    // Create rate limiter
-    createRateLimiter(tableNameWithType, brokerResource, 
tableConfig.getQuotaConfig());
+    // Create rate limiter if query quota config is specified.
+    QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
+    if (quotaConfig == null || 
Strings.isNullOrEmpty(quotaConfig.getMaxQueriesPerSecond())) {
+      LOGGER.info("No qps config specified for table: {}", tableNameWithType);
+      removeRateLimiter(tableNameWithType);
+    } else {
+      createRateLimiter(tableNameWithType, brokerResource, quotaConfig);
+    }
   }
 
   /**
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
index 0dda796..794654b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
@@ -43,4 +43,4 @@ public class QueryQuotaUpdateMessage extends Message {
     
Preconditions.checkArgument(msgSubType.equals(UPDATE_QUERY_QUOTA_MSG_SUB_TYPE),
         "Invalid message sub type: " + msgSubType + " for 
QueryQuotaUpdateMessage");
   }
-}
\ No newline at end of file
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0f8ec1c..6434714 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -74,6 +74,7 @@ import 
org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.SchemaNotFoundException;
 import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.messages.QueryQuotaUpdateMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -1342,6 +1343,9 @@ public class PinotHelixResourceManager {
       default:
         throw new InvalidTableConfigException("Unsupported table type: " + 
tableType);
     }
+
+    // Send update query quota message if quota is specified
+    sendUpdateQueryQuotaMessage(tableConfig);
   }
 
   public void updateMetadataConfigFor(String tableName, TableType type, 
TableCustomConfig newConfigs)
@@ -1720,6 +1724,35 @@ public class PinotHelixResourceManager {
     }
   }
 
+  private void sendUpdateQueryQuotaMessage(TableConfig tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    QueryQuotaUpdateMessage refreshMessage = new 
QueryQuotaUpdateMessage(tableNameWithType);
+
+    Criteria recipientCriteria = new Criteria();
+    // Currently Helix does not support send message to a Spectator. So we 
walk around the problem by sending the
+    // message to participants. Note that brokers are also participants.
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+    recipientCriteria.setSessionSpecific(true);
+    // The brokerResource field in the EXTERNALVIEW stores the table name in 
the Partition subfield.
+    recipientCriteria.setPartition(tableNameWithType);
+
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    LOGGER.info("Sending query quota update message for table {}:{} to 
recipients {}",
+        tableNameWithType, refreshMessage, recipientCriteria);
+    // Helix sets the timeoutMs argument specified in 'send' call as the 
processing timeout of the message.
+    int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, 
null, -1);
+    if (nMsgsSent > 0) {
+      // TODO Would be nice if we can get the name of the instances to which 
messages were sent.
+      LOGGER.info("Sent {} query quota update msgs for table {}", nMsgsSent, 
tableNameWithType);
+    } else {
+      // May be the case when none of the brokers are up yet. That is OK, 
because when they come up they will get
+      // the latest query quota info from table config.
+      LOGGER.warn("Unable to send query quota update message for table {}, 
nMsgs={}", tableNameWithType, nMsgsSent);
+    }
+  }
+
   private boolean updateExistedSegment(String tableNameWithType, String 
segmentName) {
     HelixDataAccessor helixDataAccessor = 
_helixZkManager.getHelixDataAccessor();
     PropertyKey idealStatePropertyKey = 
_keyBuilder.idealStates(tableNameWithType);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to