mcvsubbu commented on a change in pull request #5066: Refactor existing Message 
Handler to update query quota on broker (Part 1)
URL: https://github.com/apache/incubator-pinot/pull/5066#discussion_r379591183
 
 

 ##########
 File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerMessageHandlerFactory.java
 ##########
 @@ -19,53 +19,65 @@
 package org.apache.pinot.broker.broker.helix;
 
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
+import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.common.messages.TimeboundaryRefreshMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-// Handle the TimeboundaryRefresh message. The Timeboundary refresh requests 
are handled asynchronously: i.e., they are
-// first put into a request map first. The map dedups requests by their tables 
thus multiple requests for the same
+// Handle the broker message, like TimeboundaryRefresh and UpdateQueryQuota.
+// The Timeboundary refresh requests are handled asynchronously: i.e., they 
are first put into a request map first.
+// The map dedups requests by their tables thus multiple requests for the same
 // table only needs to be executed once. A background thread periodically 
checks the map and performs refreshing for
 // all the tables in the map.
-public class TimeboundaryRefreshMessageHandlerFactory implements 
MessageHandlerFactory {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeboundaryRefreshMessageHandlerFactory.class);
+// The query quota update can be done synchronously, as the table config won't 
be changed frequently.
+public class BrokerMessageHandlerFactory implements MessageHandlerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerMessageHandlerFactory.class);
   private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  // A map to store the unique requests (i.e., the table names) to refresh the 
TimeBoundaryInfo of a pinot table.
-  // Ideally a Hashset will suffice but Java util currently does not have 
Hashset.
-  private static ConcurrentHashMap<String, Boolean> _tablesToRefreshmap = new 
ConcurrentHashMap<>();
-  private boolean shuttingDown;
+  private final HelixExternalViewBasedQueryQuotaManager 
_helixExternalViewBasedQueryQuotaManager;
+  // A set to store the unique requests (i.e., the table names) to refresh the 
TimeBoundaryInfo of a pinot table.
+  private static Set<String> _tablesToRefreshSet = 
ConcurrentHashMap.newKeySet();
+  private boolean _shuttingDown;
 
   /**
    *
    * @param helixExternalViewBasedRouting The underlying Routing object to 
execute TimeboundaryInfo refreshing.
+   * @param helixExternalViewBasedQueryQuotaManager The underlying object to 
update rate limiter.
    * @param sleepTimeInMilliseconds The sleep time for the background thread 
to execute TimeboundaryInfo refreshing.
    */
-  public 
TimeboundaryRefreshMessageHandlerFactory(HelixExternalViewBasedRouting 
helixExternalViewBasedRouting,
-      long sleepTimeInMilliseconds) {
+  public BrokerMessageHandlerFactory(HelixExternalViewBasedRouting 
helixExternalViewBasedRouting,
+      HelixExternalViewBasedQueryQuotaManager 
helixExternalViewBasedQueryQuotaManager, long sleepTimeInMilliseconds) {
     _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
+    _helixExternalViewBasedQueryQuotaManager = 
helixExternalViewBasedQueryQuotaManager;
+
     // Start a background thread to execute the TimeboundaryInfo update 
requests.
     Thread tbiUpdateThread = new Thread(new 
TimeboundaryRefreshMessageExecutor(sleepTimeInMilliseconds));
     tbiUpdateThread.start();
-    shuttingDown = false;
+    _shuttingDown = false;
   }
 
   @Override
   public MessageHandler createHandler(Message message, NotificationContext 
context) {
     String msgSubType = message.getMsgSubType();
     switch (msgSubType) {
       case TimeboundaryRefreshMessage.REFRESH_TIME_BOUNDARY_MSG_SUB_TYPE:
-        LOGGER.info("time refresh msg received {} for table {}", 
message.getPartitionName());
+        LOGGER.info("time refresh msg received for table {}", 
message.getPartitionName());
         return new TimeboundaryRefreshMessageHandler(new 
TimeboundaryRefreshMessage(message), context);
+      case TimeboundaryRefreshMessage.UPDATE_QUERY_QUOTA_MSG_SUB_TYPE:
+        LOGGER.info("update query quota msg received for table {}", 
message.getPartitionName());
+        return new TableQueryQuotaUpdateMessageHandler(message, context);
       default:
-        throw new UnsupportedOperationException("Unsupported user defined 
message sub type: " + msgSubType);
+        LOGGER.error("Unsupported user defined message sub type: {} for table 
{}", msgSubType,
 
 Review comment:
   Change this to warning, and print it in the default message handler, not 
here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to