mcvsubbu commented on a change in pull request #5066: Refactor existing Message
Handler to update query quota on broker (Part 1)
URL: https://github.com/apache/incubator-pinot/pull/5066#discussion_r379591183
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerMessageHandlerFactory.java
##########
@@ -19,53 +19,65 @@
package org.apache.pinot.broker.broker.helix;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
+import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.common.messages.TimeboundaryRefreshMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// Handle the TimeboundaryRefresh message. The Timeboundary refresh requests
are handled asynchronously: i.e., they are
-// first put into a request map first. The map dedups requests by their tables
thus multiple requests for the same
+// Handle the broker message, like TimeboundaryRefresh and UpdateQueryQuota.
+// The Timeboundary refresh requests are handled asynchronously: i.e., they
are first put into a request map first.
+// The map dedups requests by their tables thus multiple requests for the same
// table only needs to be executed once. A background thread periodically
checks the map and performs refreshing for
// all the tables in the map.
-public class TimeboundaryRefreshMessageHandlerFactory implements
MessageHandlerFactory {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TimeboundaryRefreshMessageHandlerFactory.class);
+// The query quota update can be done synchronously, as the table config won't
be changed frequently.
+public class BrokerMessageHandlerFactory implements MessageHandlerFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BrokerMessageHandlerFactory.class);
private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- // A map to store the unique requests (i.e., the table names) to refresh the
TimeBoundaryInfo of a pinot table.
- // Ideally a Hashset will suffice but Java util currently does not have
Hashset.
- private static ConcurrentHashMap<String, Boolean> _tablesToRefreshmap = new
ConcurrentHashMap<>();
- private boolean shuttingDown;
+ private final HelixExternalViewBasedQueryQuotaManager
_helixExternalViewBasedQueryQuotaManager;
+ // A set to store the unique requests (i.e., the table names) to refresh the
TimeBoundaryInfo of a pinot table.
+ private static Set<String> _tablesToRefreshSet =
ConcurrentHashMap.newKeySet();
+ private boolean _shuttingDown;
/**
*
* @param helixExternalViewBasedRouting The underlying Routing object to
execute TimeboundaryInfo refreshing.
+ * @param helixExternalViewBasedQueryQuotaManager The underlying object to
update rate limiter.
* @param sleepTimeInMilliseconds The sleep time for the background thread
to execute TimeboundaryInfo refreshing.
*/
- public
TimeboundaryRefreshMessageHandlerFactory(HelixExternalViewBasedRouting
helixExternalViewBasedRouting,
- long sleepTimeInMilliseconds) {
+ public BrokerMessageHandlerFactory(HelixExternalViewBasedRouting
helixExternalViewBasedRouting,
+ HelixExternalViewBasedQueryQuotaManager
helixExternalViewBasedQueryQuotaManager, long sleepTimeInMilliseconds) {
_helixExternalViewBasedRouting = helixExternalViewBasedRouting;
+ _helixExternalViewBasedQueryQuotaManager =
helixExternalViewBasedQueryQuotaManager;
+
// Start a background thread to execute the TimeboundaryInfo update
requests.
Thread tbiUpdateThread = new Thread(new
TimeboundaryRefreshMessageExecutor(sleepTimeInMilliseconds));
tbiUpdateThread.start();
- shuttingDown = false;
+ _shuttingDown = false;
}
@Override
public MessageHandler createHandler(Message message, NotificationContext
context) {
String msgSubType = message.getMsgSubType();
switch (msgSubType) {
case TimeboundaryRefreshMessage.REFRESH_TIME_BOUNDARY_MSG_SUB_TYPE:
- LOGGER.info("time refresh msg received {} for table {}",
message.getPartitionName());
+ LOGGER.info("time refresh msg received for table {}",
message.getPartitionName());
return new TimeboundaryRefreshMessageHandler(new
TimeboundaryRefreshMessage(message), context);
+ case TimeboundaryRefreshMessage.UPDATE_QUERY_QUOTA_MSG_SUB_TYPE:
+ LOGGER.info("update query quota msg received for table {}",
message.getPartitionName());
+ return new TableQueryQuotaUpdateMessageHandler(message, context);
default:
- throw new UnsupportedOperationException("Unsupported user defined
message sub type: " + msgSubType);
+ LOGGER.error("Unsupported user defined message sub type: {} for table
{}", msgSubType,
Review comment:
Change this to warning, and print it in the default message handler, not
here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]