This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 67450de Refactor existing Message Handler to update query quota on
broker (Part 1) (#5066)
67450de is described below
commit 67450decc72c0d89560a837a94bec7c50700b382
Author: Jialiang Li <[email protected]>
AuthorDate: Tue Mar 3 09:27:52 2020 -0800
Refactor existing Message Handler to update query quota on broker (Part 1)
(#5066)
* Refactor existing Message Handler to update query quota on broker
* Adjust behavior for unknown subtype message
---
...=> BrokerUserDefinedMessageHandlerFactory.java} | 63 +++++++++++++++++++---
.../broker/broker/helix/HelixBrokerStarter.java | 2 +-
.../HelixExternalViewBasedQueryQuotaManager.java | 12 +++++
.../common/messages/QueryQuotaUpdateMessage.java | 46 ++++++++++++++++
.../helix/SegmentMessageHandlerFactory.java | 62 ++++++++++-----------
5 files changed, 148 insertions(+), 37 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefineMessageHandlerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
similarity index 55%
rename from
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefineMessageHandlerFactory.java
rename to
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index bffef77..74f8f0e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefineMessageHandlerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -23,7 +23,9 @@ import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
+import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingManager;
+import org.apache.pinot.common.messages.QueryQuotaUpdateMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,13 +38,16 @@ import org.slf4j.LoggerFactory;
* <li>Refresh segment message: Refresh the routing properties for a given
segment</li>
* </ul>
*/
-public class BrokerUserDefineMessageHandlerFactory implements
MessageHandlerFactory {
- private static final Logger LOGGER =
LoggerFactory.getLogger(BrokerUserDefineMessageHandlerFactory.class);
+public class BrokerUserDefinedMessageHandlerFactory implements
MessageHandlerFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BrokerUserDefinedMessageHandlerFactory.class);
private final RoutingManager _routingManager;
+ private final HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
- public BrokerUserDefineMessageHandlerFactory(RoutingManager routingManager) {
+ public BrokerUserDefinedMessageHandlerFactory(RoutingManager routingManager,
+ HelixExternalViewBasedQueryQuotaManager queryQuotaManager) {
_routingManager = routingManager;
+ _queryQuotaManager = queryQuotaManager;
}
@Override
@@ -51,8 +56,12 @@ public class BrokerUserDefineMessageHandlerFactory
implements MessageHandlerFact
switch (msgSubType) {
case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
return new RefreshSegmentMessageHandler(new
SegmentRefreshMessage(message), context);
+ case QueryQuotaUpdateMessage.UPDATE_QUERY_QUOTA_MSG_SUB_TYPE:
+ return new QueryQuotaUpdateMessageHandler(new
QueryQuotaUpdateMessage(message), context);
default:
- throw new UnsupportedOperationException("Unsupported user defined
message sub type: " + msgSubType);
+ LOGGER.warn("Unsupported user defined message sub type: {} for table:
{}", msgSubType,
+ message.getPartitionName());
+ return new DefaultMessageHandler(message, context);
}
}
@@ -65,8 +74,7 @@ public class BrokerUserDefineMessageHandlerFactory implements
MessageHandlerFact
public void reset() {
}
- private class RefreshSegmentMessageHandler extends MessageHandler {
- private final String _tableNameWithType;
+ private class RefreshSegmentMessageHandler extends DefaultMessageHandler {
private final String _segmentName;
public RefreshSegmentMessageHandler(SegmentRefreshMessage
segmentRefreshMessage, NotificationContext context) {
@@ -89,4 +97,47 @@ public class BrokerUserDefineMessageHandlerFactory
implements MessageHandlerFact
_tableNameWithType, errorCode, errorType, e);
}
}
+
+ private class QueryQuotaUpdateMessageHandler extends DefaultMessageHandler {
+
+ public QueryQuotaUpdateMessageHandler(QueryQuotaUpdateMessage
queryQuotaUpdateMessage,
+ NotificationContext context) {
+ super(queryQuotaUpdateMessage, context);
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ _queryQuotaManager.initOrUpdateTableQueryQuota(_tableNameWithType);
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode errorCode, ErrorType errorType)
{
+ LOGGER.error("Caught exception while updating query quota of table: {}
(code: {}, type: {})", _tableNameWithType,
+ errorCode, errorType, e);
+ }
+ }
+
+ private static class DefaultMessageHandler extends MessageHandler {
+ String _tableNameWithType;
+
+ public DefaultMessageHandler(Message message, NotificationContext context)
{
+ super(message, context);
+ _tableNameWithType = message.getPartitionName();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode errorCode, ErrorType errorType)
{
+ LOGGER.error("Caught exception on table: {} (code: {}, type: {})",
_tableNameWithType, errorCode, errorType, e);
+ }
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 93e8888..3f8f3b1 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -239,7 +239,7 @@ public class HelixBrokerStarter {
// Register user-define message handler factory
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
- new BrokerUserDefineMessageHandlerFactory(_routingManager));
+ new BrokerUserDefinedMessageHandlerFactory(_routingManager,
queryQuotaManager));
_participantHelixManager.connect();
addInstanceTagIfNeeded();
_brokerMetrics
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index b467b08..afca520 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -84,6 +84,14 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
processQueryQuotaChange(brokerResourceEV);
}
+ public void initOrUpdateTableQueryQuota(String tableNameWithType) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ ExternalView brokerResourceEV = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ initTableQueryQuota(tableConfig, brokerResourceEV);
+ }
+
/**
* Initialize dynamic rate limiter with table query quota.
* @param tableConfig table config.
@@ -135,6 +143,10 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
_rateLimiterMap.remove(tableNameWithType);
}
+ public boolean containsRateLimiterForTable(String tableNameWithType) {
+ return _rateLimiterMap.containsKey(tableNameWithType);
+ }
+
/**
* Get QuotaConfig from property store.
* @param tableNameWithType table name with table type.
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
new file mode 100644
index 0000000..0dda796
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.utils.CommonConstants;
+
+
+// A message intended for a pinot Broker to ask it to update its rate limiter
of a table.
+public class QueryQuotaUpdateMessage extends Message {
+ public static final String UPDATE_QUERY_QUOTA_MSG_SUB_TYPE =
"UPDATE_QUERY_QUOTA";
+
+ public QueryQuotaUpdateMessage(String tableNameWithType) {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setResourceName(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ setPartitionName(tableNameWithType);
+ setMsgSubType(UPDATE_QUERY_QUOTA_MSG_SUB_TYPE);
+ // Give it infinite time to process the message, as long as session is
alive
+ setExecutionTimeout(-1);
+ }
+
+ public QueryQuotaUpdateMessage(Message message) {
+ super(message.getRecord());
+ String msgSubType = message.getMsgSubType();
+
Preconditions.checkArgument(msgSubType.equals(UPDATE_QUERY_QUOTA_MSG_SUB_TYPE),
+ "Invalid message sub type: " + msgSubType + " for
QueryQuotaUpdateMessage");
+ }
+}
\ No newline at end of file
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 7e1762d..d193c28 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -88,7 +88,9 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
return new SegmentReloadMessageHandler(new
SegmentReloadMessage(message), _metrics, context);
default:
- throw new UnsupportedOperationException("Unsupported user defined
message sub type: " + msgSubType);
+ LOGGER.warn("Unsupported user defined message sub type: {} for
segment: {}", msgSubType,
+ message.getPartitionName());
+ return new DefaultMessageHandler(message, _metrics, context);
}
}
@@ -103,19 +105,10 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
LOGGER.info("Reset called");
}
- private class SegmentRefreshMessageHandler extends MessageHandler {
- private final String _segmentName;
- private final String _tableNameWithType;
- private final ServerMetrics _metrics;
- private final Logger _logger;
-
- public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage,
ServerMetrics metrics,
+ private class SegmentRefreshMessageHandler extends DefaultMessageHandler {
+ SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage,
ServerMetrics metrics,
NotificationContext context) {
- super(refreshMessage, context);
- _segmentName = refreshMessage.getPartitionName();
- _tableNameWithType = refreshMessage.getResourceName();
- _metrics = metrics;
- _logger = LoggerFactory.getLogger(_tableNameWithType + "-" +
SegmentRefreshMessageHandler.class);
+ super(refreshMessage, metrics, context);
}
@Override
@@ -136,26 +129,12 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
}
return result;
}
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type) {
- _logger.error("onError: {}, {}", type, code, e);
- }
}
- private class SegmentReloadMessageHandler extends MessageHandler {
- private final String _segmentName;
- private final String _tableNameWithType;
- private final ServerMetrics _metrics;
- private final Logger _logger;
-
- public SegmentReloadMessageHandler(SegmentReloadMessage
segmentReloadMessage, ServerMetrics metrics,
+ private class SegmentReloadMessageHandler extends DefaultMessageHandler {
+ SegmentReloadMessageHandler(SegmentReloadMessage segmentReloadMessage,
ServerMetrics metrics,
NotificationContext context) {
- super(segmentReloadMessage, context);
- _segmentName = segmentReloadMessage.getPartitionName();
- _tableNameWithType = segmentReloadMessage.getResourceName();
- _metrics = metrics;
- _logger = LoggerFactory.getLogger(_tableNameWithType + "-" +
SegmentReloadMessageHandler.class);
+ super(segmentReloadMessage, metrics, context);
}
@Override
@@ -186,6 +165,29 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
}
return helixTaskResult;
}
+ }
+
+ private static class DefaultMessageHandler extends MessageHandler {
+ final String _segmentName;
+ final String _tableNameWithType;
+ final ServerMetrics _metrics;
+ final Logger _logger;
+
+ DefaultMessageHandler(Message message, ServerMetrics metrics,
NotificationContext context) {
+ super(message, context);
+ _segmentName = message.getPartitionName();
+ _tableNameWithType = message.getResourceName();
+ _metrics = metrics;
+ _logger = LoggerFactory.getLogger(_tableNameWithType + "-" +
this.getClass().getSimpleName());
+ }
+
+ @Override
+ public HelixTaskResult handleMessage()
+ throws InterruptedException {
+ HelixTaskResult helixTaskResult = new HelixTaskResult();
+ helixTaskResult.setSuccess(true);
+ return helixTaskResult;
+ }
@Override
public void onError(Exception e, ErrorCode errorCode, ErrorType errorType)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]