This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 67450de  Refactor existing Message Handler to update query quota on 
broker (Part 1) (#5066)
67450de is described below

commit 67450decc72c0d89560a837a94bec7c50700b382
Author: Jialiang Li <[email protected]>
AuthorDate: Tue Mar 3 09:27:52 2020 -0800

    Refactor existing Message Handler to update query quota on broker (Part 1) 
(#5066)
    
    * Refactor existing Message Handler to update query quota on broker
    
    * Adjust behavior for unknown subtype message
---
 ...=> BrokerUserDefinedMessageHandlerFactory.java} | 63 +++++++++++++++++++---
 .../broker/broker/helix/HelixBrokerStarter.java    |  2 +-
 .../HelixExternalViewBasedQueryQuotaManager.java   | 12 +++++
 .../common/messages/QueryQuotaUpdateMessage.java   | 46 ++++++++++++++++
 .../helix/SegmentMessageHandlerFactory.java        | 62 ++++++++++-----------
 5 files changed, 148 insertions(+), 37 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefineMessageHandlerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
similarity index 55%
rename from 
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefineMessageHandlerFactory.java
rename to 
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index bffef77..74f8f0e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefineMessageHandlerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -23,7 +23,9 @@ import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
+import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingManager;
+import org.apache.pinot.common.messages.QueryQuotaUpdateMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,13 +38,16 @@ import org.slf4j.LoggerFactory;
  *   <li>Refresh segment message: Refresh the routing properties for a given 
segment</li>
  * </ul>
  */
-public class BrokerUserDefineMessageHandlerFactory implements 
MessageHandlerFactory {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerUserDefineMessageHandlerFactory.class);
+public class BrokerUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerUserDefinedMessageHandlerFactory.class);
 
   private final RoutingManager _routingManager;
+  private final HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
 
-  public BrokerUserDefineMessageHandlerFactory(RoutingManager routingManager) {
+  public BrokerUserDefinedMessageHandlerFactory(RoutingManager routingManager,
+      HelixExternalViewBasedQueryQuotaManager queryQuotaManager) {
     _routingManager = routingManager;
+    _queryQuotaManager = queryQuotaManager;
   }
 
   @Override
@@ -51,8 +56,12 @@ public class BrokerUserDefineMessageHandlerFactory 
implements MessageHandlerFact
     switch (msgSubType) {
       case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
         return new RefreshSegmentMessageHandler(new 
SegmentRefreshMessage(message), context);
+      case QueryQuotaUpdateMessage.UPDATE_QUERY_QUOTA_MSG_SUB_TYPE:
+        return new QueryQuotaUpdateMessageHandler(new 
QueryQuotaUpdateMessage(message), context);
       default:
-        throw new UnsupportedOperationException("Unsupported user defined 
message sub type: " + msgSubType);
+        LOGGER.warn("Unsupported user defined message sub type: {} for table: 
{}", msgSubType,
+            message.getPartitionName());
+        return new DefaultMessageHandler(message, context);
     }
   }
 
@@ -65,8 +74,7 @@ public class BrokerUserDefineMessageHandlerFactory implements 
MessageHandlerFact
   public void reset() {
   }
 
-  private class RefreshSegmentMessageHandler extends MessageHandler {
-    private final String _tableNameWithType;
+  private class RefreshSegmentMessageHandler extends DefaultMessageHandler {
     private final String _segmentName;
 
     public RefreshSegmentMessageHandler(SegmentRefreshMessage 
segmentRefreshMessage, NotificationContext context) {
@@ -89,4 +97,47 @@ public class BrokerUserDefineMessageHandlerFactory 
implements MessageHandlerFact
           _tableNameWithType, errorCode, errorType, e);
     }
   }
+
+  private class QueryQuotaUpdateMessageHandler extends DefaultMessageHandler {
+
+    public QueryQuotaUpdateMessageHandler(QueryQuotaUpdateMessage 
queryQuotaUpdateMessage,
+        NotificationContext context) {
+      super(queryQuotaUpdateMessage, context);
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      _queryQuotaManager.initOrUpdateTableQueryQuota(_tableNameWithType);
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) 
{
+      LOGGER.error("Caught exception while updating query quota of table: {} 
(code: {}, type: {})", _tableNameWithType,
+          errorCode, errorType, e);
+    }
+  }
+
+  private static class DefaultMessageHandler extends MessageHandler {
+    String _tableNameWithType;
+
+    public DefaultMessageHandler(Message message, NotificationContext context) 
{
+      super(message, context);
+      _tableNameWithType = message.getPartitionName();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) 
{
+      LOGGER.error("Caught exception on table: {} (code: {}, type: {})", 
_tableNameWithType, errorCode, errorType, e);
+    }
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 93e8888..3f8f3b1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -239,7 +239,7 @@ public class HelixBrokerStarter {
     // Register user-define message handler factory
     _participantHelixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-            new BrokerUserDefineMessageHandlerFactory(_routingManager));
+            new BrokerUserDefinedMessageHandlerFactory(_routingManager, 
queryQuotaManager));
     _participantHelixManager.connect();
     addInstanceTagIfNeeded();
     _brokerMetrics
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index b467b08..afca520 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -84,6 +84,14 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     processQueryQuotaChange(brokerResourceEV);
   }
 
+  public void initOrUpdateTableQueryQuota(String tableNameWithType) {
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    ExternalView brokerResourceEV = HelixHelper
+        .getExternalViewForResource(_helixManager.getClusterManagmentTool(), 
_helixManager.getClusterName(),
+            CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    initTableQueryQuota(tableConfig, brokerResourceEV);
+  }
+
   /**
    * Initialize dynamic rate limiter with table query quota.
    * @param tableConfig table config.
@@ -135,6 +143,10 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     _rateLimiterMap.remove(tableNameWithType);
   }
 
+  public boolean containsRateLimiterForTable(String tableNameWithType) {
+    return _rateLimiterMap.containsKey(tableNameWithType);
+  }
+
   /**
    * Get QuotaConfig from property store.
    * @param tableNameWithType table name with table type.
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
new file mode 100644
index 0000000..0dda796
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryQuotaUpdateMessage.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.utils.CommonConstants;
+
+
+// A message intended for a pinot Broker to ask it to update its rate limiter 
of a table.
+public class QueryQuotaUpdateMessage extends Message {
+  public static final String UPDATE_QUERY_QUOTA_MSG_SUB_TYPE = 
"UPDATE_QUERY_QUOTA";
+
+  public QueryQuotaUpdateMessage(String tableNameWithType) {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setResourceName(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    setPartitionName(tableNameWithType);
+    setMsgSubType(UPDATE_QUERY_QUOTA_MSG_SUB_TYPE);
+    // Give it infinite time to process the message, as long as session is 
alive
+    setExecutionTimeout(-1);
+  }
+
+  public QueryQuotaUpdateMessage(Message message) {
+    super(message.getRecord());
+    String msgSubType = message.getMsgSubType();
+    
Preconditions.checkArgument(msgSubType.equals(UPDATE_QUERY_QUOTA_MSG_SUB_TYPE),
+        "Invalid message sub type: " + msgSubType + " for 
QueryQuotaUpdateMessage");
+  }
+}
\ No newline at end of file
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 7e1762d..d193c28 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -88,7 +88,9 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
       case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
         return new SegmentReloadMessageHandler(new 
SegmentReloadMessage(message), _metrics, context);
       default:
-        throw new UnsupportedOperationException("Unsupported user defined 
message sub type: " + msgSubType);
+        LOGGER.warn("Unsupported user defined message sub type: {} for 
segment: {}", msgSubType,
+            message.getPartitionName());
+        return new DefaultMessageHandler(message, _metrics, context);
     }
   }
 
@@ -103,19 +105,10 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
     LOGGER.info("Reset called");
   }
 
-  private class SegmentRefreshMessageHandler extends MessageHandler {
-    private final String _segmentName;
-    private final String _tableNameWithType;
-    private final ServerMetrics _metrics;
-    private final Logger _logger;
-
-    public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, 
ServerMetrics metrics,
+  private class SegmentRefreshMessageHandler extends DefaultMessageHandler {
+    SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, 
ServerMetrics metrics,
         NotificationContext context) {
-      super(refreshMessage, context);
-      _segmentName = refreshMessage.getPartitionName();
-      _tableNameWithType = refreshMessage.getResourceName();
-      _metrics = metrics;
-      _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
SegmentRefreshMessageHandler.class);
+      super(refreshMessage, metrics, context);
     }
 
     @Override
@@ -136,26 +129,12 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
       }
       return result;
     }
-
-    @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type) {
-      _logger.error("onError: {}, {}", type, code, e);
-    }
   }
 
-  private class SegmentReloadMessageHandler extends MessageHandler {
-    private final String _segmentName;
-    private final String _tableNameWithType;
-    private final ServerMetrics _metrics;
-    private final Logger _logger;
-
-    public SegmentReloadMessageHandler(SegmentReloadMessage 
segmentReloadMessage, ServerMetrics metrics,
+  private class SegmentReloadMessageHandler extends DefaultMessageHandler {
+    SegmentReloadMessageHandler(SegmentReloadMessage segmentReloadMessage, 
ServerMetrics metrics,
         NotificationContext context) {
-      super(segmentReloadMessage, context);
-      _segmentName = segmentReloadMessage.getPartitionName();
-      _tableNameWithType = segmentReloadMessage.getResourceName();
-      _metrics = metrics;
-      _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
SegmentReloadMessageHandler.class);
+      super(segmentReloadMessage, metrics, context);
     }
 
     @Override
@@ -186,6 +165,29 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
       }
       return helixTaskResult;
     }
+  }
+
+  private static class DefaultMessageHandler extends MessageHandler {
+    final String _segmentName;
+    final String _tableNameWithType;
+    final ServerMetrics _metrics;
+    final Logger _logger;
+
+    DefaultMessageHandler(Message message, ServerMetrics metrics, 
NotificationContext context) {
+      super(message, context);
+      _segmentName = message.getPartitionName();
+      _tableNameWithType = message.getResourceName();
+      _metrics = metrics;
+      _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
this.getClass().getSimpleName());
+    }
+
+    @Override
+    public HelixTaskResult handleMessage()
+        throws InterruptedException {
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      helixTaskResult.setSuccess(true);
+      return helixTaskResult;
+    }
 
     @Override
     public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) 
{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to